Commit d9ec343b authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel
Browse files

#4455 add txn create and start to the tokutxn API. this allows a txn to be...

#4455 add txn create and start to the tokutxn API.  this allows a txn to be created without holding any locks refs[t:4455]

git-svn-id: file:///svn/toku/tokudb@40438 c7de825b-a66e-492c-adef-691d508d4ae1
parent 6aa2c88f
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -474,7 +474,7 @@ recover_transaction(TOKUTXN *txnp, TXNID xid, TXNID parentxid, TOKULOGGER logger
        assert(r == 0);
        assert(txn==NULL);
    }
    r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE);
    r = toku_txn_begin_with_xid(parent, &txn, logger, xid, TXN_SNAPSHOT_NONE, NULL);
    assert(r == 0);
    if (txnp) *txnp = txn;
    return 0;
+1 −1
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ test_main(int argc, const char *argv[]) {

    int r;

    r = toku_txn_ignore_init(txn);             CKERR(r);
    toku_txn_ignore_init(txn);
    
    FILENUM f1 = {1};
    FILENUM f2 = {2};
+106 −91
Original line number Diff line number Diff line
@@ -58,7 +58,6 @@ toku_txn_get_status(TOKULOGGER logger, TXN_STATUS s) {
    *s = txn_status;
}


int 
toku_txn_begin_txn (
    DB_TXN  *container_db_txn,
@@ -68,17 +67,23 @@ toku_txn_begin_txn (
    TXN_SNAPSHOT_TYPE snapshot_type
    ) 
{
    int r;
    r = toku_txn_begin_with_xid(parent_tokutxn, 
				tokutxn, logger, 
				0, 
				snapshot_type
				);
    if (r == 0) {
	// container_db_txn set here, not in helper function toku_txn_begin_with_xid()
	// because helper function is used by recovery, which does not have DB_TXN
	(*tokutxn)->container_db_txn = container_db_txn; // internal struct points to container
    int r = toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, TXNID_NONE, snapshot_type, container_db_txn);
    return r;
}

int 
toku_txn_begin_with_xid (
    TOKUTXN parent_tokutxn, 
    TOKUTXN *tokutxn, 
    TOKULOGGER logger, 
    TXNID xid, 
    TXN_SNAPSHOT_TYPE snapshot_type,
    DB_TXN *container_db_txn
    ) 
{
    int r = toku_txn_create_txn(tokutxn, parent_tokutxn, logger, xid, snapshot_type, container_db_txn);
    if (r == 0)
        r = toku_txn_start_txn(*tokutxn);
    return r;
}

@@ -96,7 +101,6 @@ fill_xids (OMTVALUE xev, u_int32_t idx, void *varray) {
    return 0;
}


// Create list of root transactions that were live when this txn began.
static int
setup_live_root_txn_list(TOKUTXN txn) {
@@ -130,7 +134,6 @@ snapshot_txnids_note_txn(TOKUTXN txn) {
    return r;
}


// If live txn is not in reverse live list, then add it.
// If live txn is in reverse live list, update it by setting second xid in pair to new txn that is being started.
static int
@@ -163,7 +166,6 @@ live_list_reverse_note_txn_start_iter(OMTVALUE live_xidv, u_int32_t UU(index), v
    return r;
}


// Maintain the reverse live list.  The reverse live list is a list of xid pairs.  The first xid in the pair
// is a txn that was live when some txn began, and the second xid in the pair is the newest still-live xid to 
// have that first xid in its live list.  (The first xid may be closed, it only needed to be live when the 
@@ -180,12 +182,14 @@ live_list_reverse_note_txn_start(TOKUTXN txn) {
    return r;
}

int toku_txn_begin_with_xid (
    TOKUTXN parent_tokutxn, 
int 
toku_txn_create_txn (
    TOKUTXN *tokutxn, 
    TOKUTXN parent_tokutxn, 
    TOKULOGGER logger, 
    TXNID xid, 
    TXN_SNAPSHOT_TYPE snapshot_type
    TXN_SNAPSHOT_TYPE snapshot_type,
    DB_TXN *container_db_txn
    ) 
{
    if (logger->is_panicked) return EINVAL;
@@ -196,24 +200,11 @@ int toku_txn_begin_with_xid (
    TOKUTXN MALLOC(result);
    if (result == 0) 
        return errno;
    int r;
    LSN first_lsn;
    result->starttime = time(NULL);  // getting timestamp in seconds is a cheap call
    if (xid == 0) {
        r = toku_log_xbegin(logger, &first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
        if (r!=0) goto died;
    } else
        first_lsn.lsn = xid;
    int r;
    r = toku_omt_create(&result->open_brts);
    if (r!=0) goto died;
    result->txnid64 = first_lsn.lsn;
    XIDS parent_xids;
    if (parent_tokutxn==NULL)
        parent_xids = xids_get_root_xids();
    else
        parent_xids = parent_tokutxn->xids;
    if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
        goto died;

    result->logger = logger;
    result->parent = parent_tokutxn;
    result->num_rollentries = 0;
@@ -230,19 +221,69 @@ int toku_txn_begin_with_xid (
    result->pinned_inprogress_rollback_log = NULL;
    result->snapshot_type = snapshot_type;
    result->snapshot_txnid64 = TXNID_NONE;
    result->container_db_txn = container_db_txn;

    result->rollentry_raw_count = 0;
    result->force_fsync_on_commit = FALSE;
    result->recovered_from_checkpoint = FALSE;
    toku_list_init(&result->checkpoint_before_commit);
    result->state = TOKUTXN_LIVE;
    result->do_fsync = FALSE;

    toku_txn_ignore_init(result); // 2954

    result->txnid64 = xid;
    result->xids = NULL;

    *tokutxn = result;

    STATUS_VALUE(TXN_BEGIN)++;
    STATUS_VALUE(TXN_NUM_OPEN)++;
    if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
	STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);

    if (garbage_collection_debug) {
        verify_snapshot_system(logger);
    }
    return 0;

died:
    // TODO memory leak
    toku_logger_panic(logger, r);
    return r; 
}

int
toku_txn_start_txn(TOKUTXN txn) {
    TOKULOGGER logger = txn->logger;
    TOKUTXN parent = txn->parent;
    int r;
    if (txn->txnid64 == TXNID_NONE) {
        LSN first_lsn;
        r = toku_log_xbegin(logger, &first_lsn, 0, parent ? parent->txnid64 : 0);
        if (r!=0) goto died;
        txn->txnid64 = first_lsn.lsn;
    } 
    XIDS parent_xids;
    if (parent == NULL)
        parent_xids = xids_get_root_xids();
    else
        parent_xids = parent->xids;
    if ((r = xids_create_child(parent_xids, &txn->xids, txn->txnid64)))
        goto died;

    if (toku_omt_size(logger->live_txns) == 0) {
        assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
        logger->oldest_living_xid = result->txnid64;
        logger->oldest_living_starttime = result->starttime;
        logger->oldest_living_xid = txn->txnid64;
        logger->oldest_living_starttime = txn->starttime;
    }
    assert(logger->oldest_living_xid <= result->txnid64);
    assert(logger->oldest_living_xid <= txn->txnid64);

    r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
    {
        //Add txn to list (omt) of live transactions
        //We know it is the newest one.
        r = toku_omt_insert_at(logger->live_txns, result, toku_omt_size(logger->live_txns));
        r = toku_omt_insert_at(logger->live_txns, txn, toku_omt_size(logger->live_txns));
        if (r!=0) goto died;

        //
@@ -261,34 +302,34 @@ int toku_txn_begin_with_xid (
        //

        // add ancestor information, and maintain global live root txn list
        if (parent_tokutxn==NULL) {
        if (parent == NULL) {
            //Add txn to list (omt) of live root txns
            r = toku_omt_insert_at(logger->live_root_txns, result, toku_omt_size(logger->live_root_txns)); //We know it is the newest one.
            r = toku_omt_insert_at(logger->live_root_txns, txn, toku_omt_size(logger->live_root_txns)); //We know it is the newest one.
            if (r!=0) goto died;
            result->ancestor_txnid64 = result->txnid64;
            txn->ancestor_txnid64 = txn->txnid64;
        }
        else {
            result->ancestor_txnid64 = result->parent->ancestor_txnid64;
            txn->ancestor_txnid64 = parent->ancestor_txnid64;
        }

        // setup information for snapshot reads
        if (snapshot_type != TXN_SNAPSHOT_NONE) {
        if (txn->snapshot_type != TXN_SNAPSHOT_NONE) {
            // in this case, either this is a root level transaction that needs its live list setup, or it
            // is a child transaction that specifically asked for its own snapshot
            if (parent_tokutxn==NULL || snapshot_type == TXN_SNAPSHOT_CHILD) {
                r = setup_live_root_txn_list(result);  
            if (parent == NULL || txn->snapshot_type == TXN_SNAPSHOT_CHILD) {
                r = setup_live_root_txn_list(txn);  
                assert_zero(r);
                result->snapshot_txnid64 = result->txnid64;
                r = snapshot_txnids_note_txn(result);
                txn->snapshot_txnid64 = txn->txnid64;
                r = snapshot_txnids_note_txn(txn);
                assert_zero(r);
                r = live_list_reverse_note_txn_start(result);
                r = live_list_reverse_note_txn_start(txn);
                assert_zero(r);
            }
            // in this case, it is a child transaction that specified its snapshot to be that 
            // of the root transaction
            else if (snapshot_type == TXN_SNAPSHOT_ROOT) {
                result->live_root_txn_list = result->parent->live_root_txn_list;
                result->snapshot_txnid64 = result->parent->snapshot_txnid64;
            else if (txn->snapshot_type == TXN_SNAPSHOT_ROOT) {
                txn->live_root_txn_list = parent->live_root_txn_list;
                txn->snapshot_txnid64 = parent->snapshot_txnid64;
            }
            else {
                assert(FALSE);
@@ -296,26 +337,6 @@ int toku_txn_begin_with_xid (
        }
    }
    r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);

    result->rollentry_raw_count = 0;
    result->force_fsync_on_commit = FALSE;
    result->recovered_from_checkpoint = FALSE;
    toku_list_init(&result->checkpoint_before_commit);
    result->state = TOKUTXN_LIVE;
    result->do_fsync = FALSE;

    // 2954
    r = toku_txn_ignore_init(result);
    if (r != 0) goto died;

    *tokutxn = result;
    STATUS_VALUE(TXN_BEGIN)++;
    STATUS_VALUE(TXN_NUM_OPEN)++;
    if (STATUS_VALUE(TXN_NUM_OPEN) > STATUS_VALUE(TXN_MAX_OPEN))
	STATUS_VALUE(TXN_MAX_OPEN) = STATUS_VALUE(TXN_NUM_OPEN);
    if (garbage_collection_debug) {
        verify_snapshot_system(logger);
    }
    return 0;

died:
@@ -489,11 +510,11 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn) {
}

void toku_txn_close_txn(TOKUTXN txn) {
    toku_txn_rollback_txn(txn);
    toku_txn_complete_txn(txn);
    toku_txn_destroy_txn(txn);
}

void toku_txn_rollback_txn(TOKUTXN txn) {
void toku_txn_complete_txn(TOKUTXN txn) {
    toku_rollback_txn_close(txn); 
}

@@ -501,6 +522,7 @@ void toku_txn_destroy_txn(TOKUTXN txn) {
    if (garbage_collection_debug)
        verify_snapshot_system(txn->logger);

    if (txn->open_brts)
        toku_omt_destroy(&txn->open_brts);
    xids_destroy(&txn->xids);
    toku_txn_ignore_free(txn); // 2954
@@ -685,20 +707,16 @@ verify_snapshot_system(TOKULOGGER logger) {
//      ENOMEM if can't alloc memory
//      EINVAL if txn = NULL
//      -1 on other errors
int toku_txn_ignore_init(TOKUTXN txn)
{
    if ( !txn ) return EINVAL;
void toku_txn_ignore_init(TOKUTXN txn) {
    assert(txn);
    TXN_IGNORE txni = &(txn->ignore_errors);

    txni->fns_allocated = 0;
    txni->filenums.num = 0;
    txni->filenums.filenums = NULL;

    return 0;
}

void toku_txn_ignore_free(TOKUTXN txn)
{
void toku_txn_ignore_free(TOKUTXN txn) {
    assert(txn);
    TXN_IGNORE txni = &(txn->ignore_errors);
    toku_free(txni->filenums.filenums);
    txni->filenums.num = 0;
@@ -710,9 +728,8 @@ void toku_txn_ignore_free(TOKUTXN txn)
//      ENOMEM if can't alloc memory
//      EINVAL if txn = NULL
//      -1 on other errors
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) 
{
    if ( !txn ) return EINVAL;
int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum) {
    assert(txn);
    // check for dups
    if ( toku_txn_ignore_contains(txn, filenum) == 0 ) return 0;
    // alloc more space if needed
@@ -741,9 +758,8 @@ int toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum)
//      EINVAL if txn = NULL
//      -1 on other errors
// THIS FUNCTION IS NOT USED IN FUNCTIONAL CODE, BUT IS USEFUL FOR TESTING
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum)
{
    if ( !txn ) return EINVAL; 
int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum) {
    assert(txn);
    TXN_IGNORE txni = &(txn->ignore_errors);
    int found_fn = 0;
    if ( txni->filenums.num == 0 ) return ENOENT;
@@ -767,9 +783,8 @@ int toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum)
//      ENOENT if not found
//      EINVAL if txn = NULL
//      -1 on other errors
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum) 
{
    if ( !txn ) return EINVAL;
int toku_txn_ignore_contains(TOKUTXN txn, FILENUM filenum) {
    assert(txn);
    TXN_IGNORE txni = &(txn->ignore_errors);
    for(uint32_t i=0; i<txni->filenums.num; i++) {
        if ( txni->filenums.filenums[i].fileid == filenum.fileid ) {
+12 −6
Original line number Diff line number Diff line
@@ -26,9 +26,16 @@ int toku_txn_begin_with_xid (
    TOKUTXN *tokutxn, 
    TOKULOGGER logger, 
    TXNID xid, 
    TXN_SNAPSHOT_TYPE snapshot_type
    TXN_SNAPSHOT_TYPE snapshot_type,
    DB_TXN *container_db_txn
    );

// Allocate and initialize a txn
int toku_txn_create_txn(TOKUTXN *txn_ptr, TOKUTXN parent, TOKULOGGER logger, TXNID xid, TXN_SNAPSHOT_TYPE snapshot_type, DB_TXN *container_db_txn);

// Assign a txnid. Log the txn begin in the recovery log. Initialize the txn live lists.
int toku_txn_start_txn(TOKUTXN txn);

int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);

int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
@@ -50,11 +57,11 @@ int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync,

void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn);

// Rollback and destroy a txn
// Complete and destroy a txn
void toku_txn_close_txn(TOKUTXN txn);

// Remove the txn from any live txn lists
void toku_txn_rollback_txn(TOKUTXN txn);
// Remove a txn from any live txn lists
void toku_txn_complete_txn(TOKUTXN txn);

// Free the memory of a txn
void toku_txn_destroy_txn(TOKUTXN txn);
@@ -73,7 +80,6 @@ BOOL toku_txnid_newer(TXNID a, TXNID b);
// Force fsync on commit
void toku_txn_force_fsync_on_commit(TOKUTXN txn);


typedef enum {
    TXN_BEGIN,             // total number of transactions begun (does not include recovered txns)
    TXN_COMMIT,            // successful commits
@@ -108,7 +114,7 @@ typedef struct tokutxn_filenum_ignore_errors {
    FILENUMS filenums;
} TXN_IGNORE_S, *TXN_IGNORE;

int  toku_txn_ignore_init(TOKUTXN txn);
void toku_txn_ignore_init(TOKUTXN txn);
void toku_txn_ignore_free(TOKUTXN txn);
int  toku_txn_ignore_add(TOKUTXN txn, FILENUM filenum);
int  toku_txn_ignore_remove(TOKUTXN txn, FILENUM filenum);
+4 −4
Original line number Diff line number Diff line
@@ -939,7 +939,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {

    DB_TXN *txn=NULL;
    if (using_txns) {
        r = toku_txn_begin_internal(env, 0, &txn, 0, 1, true);
        r = toku_txn_begin(env, 0, &txn, 0, 1, true);
        assert_zero(r);
    }

@@ -2380,7 +2380,7 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) {
    result->set_errcall = toku_env_set_errcall;
    result->set_errfile = toku_env_set_errfile;
    result->set_errpfx = toku_env_set_errpfx;
    result->txn_begin = toku_txn_begin;
    result->txn_begin = locked_txn_begin;

    MALLOC(result->i);
    if (result->i == 0) { r = ENOMEM; goto cleanup; }
@@ -2653,7 +2653,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
    DB_TXN *child = NULL;
    // begin child (unless transactionless)
    if (using_txns) {
	r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true);
	r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
	assert_zero(r);
    }

@@ -2757,7 +2757,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
    DB_TXN *child = NULL;
    // begin child (unless transactionless)
    if (using_txns) {
	r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true);
	r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true);
	assert_zero(r);
    }

Loading