Commit a4317d44 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel
Browse files

#4685 get the hot indexer to work with txn's in the preparing state closes[t:4685]

git-svn-id: file:///svn/toku/tokudb@41614 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9172cb7a
Loading
Loading
Loading
Loading
+1 −8
Original line number Diff line number Diff line
@@ -127,14 +127,7 @@ int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum);
int  toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum);
int  toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum);

enum tokutxn_state {
    TOKUTXN_LIVE,         // initial txn state
    TOKUTXN_PREPARING,    // txn is preparing (or prepared)
    TOKUTXN_COMMITTING,   // txn in the process of committing
    TOKUTXN_ABORTING,     // txn in the process of aborting
    TOKUTXN_RETIRED,      // txn no longer exists
};
typedef enum tokutxn_state TOKUTXN_STATE;
#include "txn_state.h"

TOKUTXN_STATE toku_txn_get_state(TOKUTXN txn);

newbrt/txn_state.h

0 → 100644
+15 −0
Original line number Diff line number Diff line
#if !defined(TOKUTXN_STATE_H)
#define TOKUTXN_STATE_H

// this is a separate file so that the hotindexing tests can see the txn states

enum tokutxn_state {
    TOKUTXN_LIVE,         // initial txn state
    TOKUTXN_PREPARING,    // txn is preparing (or prepared)
    TOKUTXN_COMMITTING,   // txn in the process of committing
    TOKUTXN_ABORTING,     // txn in the process of aborting
    TOKUTXN_RETIRED,      // txn no longer exists
};
typedef enum tokutxn_state TOKUTXN_STATE;

#endif
+3 −1
Original line number Diff line number Diff line
@@ -10,6 +10,8 @@
#ifndef TOKU_INDEXER_INTERNAL_H
#define TOKU_INDEXER_INTERNAL_H

#include "txn_state.h"

// the indexer_commit_keys is an ordered set of keys described by a DBT in the keys array.
// the array is a resizeable array with max size "max_keys" and current size "current_keys".
// the ordered set is used by the hotindex undo function to collect the commit keys.
@@ -42,7 +44,7 @@ struct __toku_indexer_internal {

    // test functions
    int (*undo_do)(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule);
    int (*test_xid_state)(DB_INDEXER *indexer, TXNID xid);
    TOKUTXN_STATE (*test_xid_state)(DB_INDEXER *indexer, TXNID xid);
    int (*test_lock_key)(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key);
    int (*test_delete_provisional)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
    int (*test_delete_committed)(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS xids);
+55 −38
Original line number Diff line number Diff line
/* -*- mode: C; c-basic-offset: 4 -*- */
/*
 * Copyright (c) 2010-2011 Tokutek Inc.  All rights reserved.
 * Copyright (c) 2010-2012 Tokutek Inc.  All rights reserved.
 * The technology is licensed by the Massachusetts Institute of Technology, 
 * Rutgers State University of New Jersey, and the Research Foundation of 
 * State University of New York at Stony Brook under United States of America 
@@ -52,8 +52,7 @@ static void
indexer_commit_keys_add(struct indexer_commit_keys *keys, size_t length, void *ptr) {
    if (keys->current_keys >= keys->max_keys) {
        int new_max_keys = keys->max_keys == 0 ? 256 : keys->max_keys * 2;
        keys->keys = (DBT *) toku_realloc(keys->keys, new_max_keys * sizeof (DBT));
        resource_assert(keys->keys);
        keys->keys = (DBT *) toku_xrealloc(keys->keys, new_max_keys * sizeof (DBT));
        for (int i = keys->current_keys; i < new_max_keys; i++)
            toku_init_dbt_flags(&keys->keys[i], DB_DBT_REALLOC);
        keys->max_keys = new_max_keys;
@@ -125,7 +124,7 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {

        // placeholders in the committed stack are not allowed
        if (uxr_is_placeholder(uxr))
            invariant(0);
            assert(0);

        // undo
        if (xrindex > 0) {
@@ -143,7 +142,7 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
                        indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
                }
            } else
                invariant(0);
                assert(0);
        }
        if (result != 0)
            break;
@@ -161,7 +160,7 @@ indexer_undo_do_committed(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
                    indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
            }
        } else
            invariant(0);
            assert(0);

        // send commit messages if needed
        for (int i = 0; result == 0 && i < indexer_commit_keys_valid(&indexer->i->commit_keys); i++) 
@@ -206,13 +205,26 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
            outermost_xid = this_xid;
            outermost_xid_state = this_xid_state;
            result = indexer_set_xid(indexer, this_xid, &xids);    // always add the outermost xid to the XIDS list
        } else if (this_xid_state == TOKUTXN_LIVE)
        } else {
            switch (this_xid_state) {
            case TOKUTXN_LIVE:
                result = indexer_append_xid(indexer, this_xid, &xids); // append a live xid to the XIDS list
                break;
            case TOKUTXN_PREPARING:
                assert(0); // not allowed
            case TOKUTXN_COMMITTING:
            case TOKUTXN_ABORTING:
            case TOKUTXN_RETIRED:
                break; // nothing to do
            }
        }
        if (result != 0)
            break;

        if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed)
            invariant(this_xid_state == TOKUTXN_RETIRED);
        if (outermost_xid_state != TOKUTXN_LIVE && xrindex > num_committed) {
            // if the outermost is not live, then the inner state must be retired.  thats the way that the txn API works.
            assert(this_xid_state == TOKUTXN_RETIRED);
        }

        if (uxr_is_placeholder(uxr))
            continue;         // skip placeholders
@@ -229,20 +241,26 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
                result = indexer_generate_hot_key_val(indexer, hotdb, ule, prevuxr, &indexer->i->hotkey, NULL);
                if (result == 0) {
                    // send the delete message
                    if (outermost_xid_state == TOKUTXN_LIVE) {
                        invariant(this_xid_state != TOKUTXN_ABORTING);
                    switch (outermost_xid_state) {
                    case TOKUTXN_LIVE:
                    case TOKUTXN_PREPARING:
                        assert(this_xid_state != TOKUTXN_ABORTING);
                        result = indexer_brt_delete_provisional(indexer, hotdb, &indexer->i->hotkey, xids);
                        if (result == 0)
                            result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
                    } else {
                        invariant(outermost_xid_state == TOKUTXN_RETIRED || outermost_xid_state == TOKUTXN_COMMITTING);
                        break;
                    case TOKUTXN_COMMITTING:
                    case TOKUTXN_RETIRED:
                        result = indexer_brt_delete_committed(indexer, hotdb, &indexer->i->hotkey, xids);
                        if (result == 0)
                            indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
                        break;
                    case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
                        assert(0);
                    }
                }
            } else
                invariant(0);
                assert(0);
        }
        if (result != 0)
            break;
@@ -255,23 +273,27 @@ indexer_undo_do_provisional(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule) {
            result = indexer_generate_hot_key_val(indexer, hotdb, ule, uxr, &indexer->i->hotkey, &indexer->i->hotval);
            if (result == 0) {
                // send the insert message
                if (outermost_xid_state == TOKUTXN_LIVE) {
                    invariant(this_xid_state != TOKUTXN_ABORTING);
                switch (outermost_xid_state) {
                case TOKUTXN_LIVE:
                case TOKUTXN_PREPARING:
                    assert(this_xid_state != TOKUTXN_ABORTING);
                    result = indexer_brt_insert_provisional(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
                    if (result == 0) 
                        result = indexer_lock_key(indexer, hotdb, &indexer->i->hotkey, outermost_xid);
                } else {
                    invariant(outermost_xid_state == TOKUTXN_RETIRED || outermost_xid_state == TOKUTXN_COMMITTING);
                    break;
                case TOKUTXN_COMMITTING:
                case TOKUTXN_RETIRED:
                    result = indexer_brt_insert_committed(indexer, hotdb, &indexer->i->hotkey, &indexer->i->hotval, xids);
#if 0
                    // no need to do this because we do implicit commits on inserts
                    if (result == 0)
                    if (0 && result == 0)
                        indexer_commit_keys_add(&indexer->i->commit_keys, indexer->i->hotkey.size, indexer->i->hotkey.data);
#endif
                    break;
                case TOKUTXN_ABORTING: // can not happen since we stop processing the leaf entry if the outer most xr is aborting
                    assert(0);
                }
            }
        } else
            invariant(0);
            assert(0);

        if (result != 0)
            break;
@@ -360,24 +382,19 @@ indexer_generate_hot_key_val(DB_INDEXER *indexer, DB *hotdb, ULEHANDLE ule, UXRH
// then return TOKUTXN_RETIRED.
static TOKUTXN_STATE
indexer_xid_state(DB_INDEXER *indexer, TXNID xid) {
    TOKUTXN_STATE result = TOKUTXN_RETIRED;
    TOKUTXN_STATE result;
    // TEST
    if (indexer->i->test_xid_state) {
        int r = indexer->i->test_xid_state(indexer, xid);
        switch (r) {
        case 0: result = TOKUTXN_LIVE; break;
        case 1: result = TOKUTXN_COMMITTING; break;
        case 2: result = TOKUTXN_ABORTING; break;
        case 3: result = TOKUTXN_RETIRED; break;
        default: assert(0); break;
        }
        result = indexer->i->test_xid_state(indexer, xid);
    } else {
        DB_ENV *env = indexer->i->env;
        TOKUTXN txn = NULL;
        int r = toku_txnid2txn(env->i->logger, xid, &txn);
        invariant(r == 0);
        assert(r == 0);
        if (txn) 
            result = toku_txn_get_state(txn);
        else 
            result = TOKUTXN_RETIRED;
    }
    return result;
}
@@ -393,7 +410,7 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_
        DB_ENV *env = indexer->i->env;
        TOKUTXN txn = NULL;
        result = toku_txnid2txn(env->i->logger, outermost_live_xid, &txn);
        invariant(result == 0 && txn != NULL);
        assert(result == 0 && txn != NULL);
        result = toku_grab_write_lock(hotdb, key, txn);
    }
    return result;
@@ -404,7 +421,7 @@ indexer_lock_key(DB_INDEXER *indexer, DB *hotdb, DBT *key, TXNID outermost_live_
// return FALSE.
static BOOL
indexer_find_prev_xr(DB_INDEXER *UU(indexer), ULEHANDLE ule, uint64_t xrindex, uint64_t *prev_xrindex) {
    invariant(xrindex < ule_num_uxrs(ule));
    assert(xrindex < ule_num_uxrs(ule));
    BOOL  prev_found = FALSE;
    while (xrindex > 0) {
        xrindex -= 1;
@@ -427,7 +444,7 @@ indexer_get_innermost_live_txn(DB_INDEXER *indexer, XIDS xids) {
    TXNID xid = xids_get_xid(xids, (u_int8_t)(num_xids-1));
    TOKUTXN txn = NULL;
    int result = toku_txnid2txn(env->i->logger, xid, &txn);
    invariant(result == 0);
    assert(result == 0);
    return txn;
}

@@ -443,7 +460,7 @@ indexer_brt_delete_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, XIDS
        result = toku_ydb_check_avail_fs_space(indexer->i->env);
        if (result == 0) {
            TOKUTXN txn = indexer_get_innermost_live_txn(indexer, xids);
            invariant(txn != NULL);
            assert(txn != NULL);
            result = toku_brt_maybe_delete (hotdb->i->brt, hotkey, txn, FALSE, ZERO_LSN, TRUE);
        }
    }
@@ -477,7 +494,7 @@ indexer_brt_insert_provisional(DB_INDEXER *indexer, DB *hotdb, DBT *hotkey, DBT
        result = toku_ydb_check_avail_fs_space(indexer->i->env);
        if (result == 0) {
            TOKUTXN txn = indexer_get_innermost_live_txn(indexer, xids);
            invariant(txn != NULL);
            assert(txn != NULL);
            result = toku_brt_maybe_insert (hotdb->i->brt, hotkey, hotval, txn, FALSE, ZERO_LSN, TRUE, BRT_INSERT);
        }
    }
+17 −8
Original line number Diff line number Diff line
/* -*- mode: C; c-basic-offset: 4 -*- */
/*
 * Copyright (c) 2010-2012 Tokutek Inc.  All rights reserved.
 * The technology is licensed by the Massachusetts Institute of Technology, 
 * Rutgers State University of New Jersey, and the Research Foundation of 
 * State University of New York at Stony Brook under United States of America 
 * Serial No. 11/760379 and to the patents and/or patent applications resulting from it.
 */
#ident "$Id$"

// test the hotindexer undo do function
// read a description of the live transactions and a leafentry from a test file, run the undo do function,
// and print out the actions taken by the undo do function while processing the leafentry
@@ -15,10 +25,6 @@
#include "indexer-internal.h"
#include "xids-internal.h"

typedef enum {
    TOKUTXN_LIVE, TOKUTXN_COMMITTING, TOKUTXN_ABORTING, TOKUTXN_RETIRED,
} TOKUTXN_STATE; // see txn.h

struct txn {
    TXNID xid;
    TOKUTXN_STATE state;
@@ -53,7 +59,7 @@ live_add(struct live *live, TXNID xid, TOKUTXN_STATE state) {
}

static int
txn_state(struct live *live, TXNID xid) {
lookup_txn_state(struct live *live, TXNID xid) {
    int r = TOKUTXN_RETIRED;
    for (int i = 0; i < live->o; i++) {
        if (live->txns[i].xid == xid) {
@@ -196,10 +202,10 @@ put_callback(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_data, const DBT *
static DB_INDEXER *test_indexer = NULL;
static DB *test_hotdb = NULL;

static int
static TOKUTXN_STATE
test_xid_state(DB_INDEXER *indexer, TXNID xid) {
    invariant(indexer == test_indexer);
    int r = txn_state(&live_xids, xid);
    TOKUTXN_STATE r = lookup_txn_state(&live_xids, xid);
    return r;
}

@@ -207,7 +213,8 @@ static int
test_lock_key(DB_INDEXER *indexer, TXNID xid, DB *hotdb, DBT *key) {
    invariant(indexer == test_indexer);
    invariant(hotdb == test_hotdb);
    invariant(test_xid_state(indexer, xid) == TOKUTXN_LIVE);
    TOKUTXN_STATE txn_state = test_xid_state(indexer, xid);
    invariant(txn_state == TOKUTXN_LIVE || txn_state == TOKUTXN_PREPARING);
    printf("lock [%lu] ", xid);
    print_dbt(key);
    printf("\n");
@@ -342,6 +349,8 @@ read_test(char *testname, ULE ule) {
                TOKUTXN_STATE state = TOKUTXN_RETIRED;
                if (strcmp(fields[2], "live") == 0)
                    state = TOKUTXN_LIVE;
                else if (strcmp(fields[2], "preparing") == 0)
                    state = TOKUTXN_PREPARING;
                else if (strcmp(fields[2], "committing") == 0)
                    state = TOKUTXN_COMMITTING;
                else if (strcmp(fields[2], "aborting") == 0)
Loading