Commit 28a629b8 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel
Browse files

#4455 add unlocked txn destructor refs[t:4455]

git-svn-id: file:///svn/toku/tokudb@40407 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8d0739c2
Loading
Loading
Loading
Loading
+16 −22
Original line number Diff line number Diff line
@@ -249,16 +249,17 @@ void toku_rollback_txn_close (TOKUTXN txn) {
    assert(txn->spilled_rollback_tail.b == ROLLBACK_NONE.b);
    assert(txn->current_rollback.b == ROLLBACK_NONE.b);
    int r;
    r = toku_pthread_mutex_lock(&txn->logger->txn_list_lock); assert_zero(r);
    TOKULOGGER logger = txn->logger;
    r = toku_pthread_mutex_lock(&logger->txn_list_lock); assert_zero(r);
    {
        {
            //Remove txn from list (omt) of live transactions
            OMTVALUE txnagain;
            u_int32_t idx;
            r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx);
            r = toku_omt_find_zero(logger->live_txns, find_xid, txn, &txnagain, &idx);
            assert(r==0);
            assert(txn==txnagain);
            r = toku_omt_delete_at(txn->logger->live_txns, idx);
            r = toku_omt_delete_at(logger->live_txns, idx);
            assert(r==0);
        }

@@ -266,10 +267,10 @@ void toku_rollback_txn_close (TOKUTXN txn) {
            OMTVALUE txnagain;
            u_int32_t idx;
            //Remove txn from list of live root txns
            r = toku_omt_find_zero(txn->logger->live_root_txns, find_xid, txn, &txnagain, &idx);
            r = toku_omt_find_zero(logger->live_root_txns, find_xid, txn, &txnagain, &idx);
            assert(r==0);
            assert(txn==txnagain);
            r = toku_omt_delete_at(txn->logger->live_root_txns, idx);
            r = toku_omt_delete_at(logger->live_root_txns, idx);
            assert(r==0);
        }
        //
@@ -284,11 +285,11 @@ void toku_rollback_txn_close (TOKUTXN txn) {
                u_int32_t idx;
                OMTVALUE v;
                //Free memory used for snapshot_txnids
                r = toku_omt_find_zero(txn->logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
                r = toku_omt_find_zero(logger->snapshot_txnids, toku_find_xid_by_xid, (OMTVALUE) txn->txnid64, &v, &idx);
                invariant(r==0);
                TXNID xid = (TXNID) v;
                invariant(xid == txn->txnid64);
                r = toku_omt_delete_at(txn->logger->snapshot_txnids, idx);
                r = toku_omt_delete_at(logger->snapshot_txnids, idx);
                invariant(r==0);
            }
            live_list_reverse_note_txn_end(txn);
@@ -304,34 +305,28 @@ void toku_rollback_txn_close (TOKUTXN txn) {
            }
        }
    }
    r = toku_pthread_mutex_unlock(&txn->logger->txn_list_lock); assert_zero(r);

    assert(txn->logger->oldest_living_xid <= txn->txnid64);
    if (txn->txnid64 == txn->logger->oldest_living_xid) {
        TOKULOGGER logger = txn->logger;
    r = toku_pthread_mutex_unlock(&logger->txn_list_lock); assert_zero(r);

    assert(logger->oldest_living_xid <= txn->txnid64);
    if (txn->txnid64 == logger->oldest_living_xid) {
        OMTVALUE oldest_txnv;
        r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv);
        if (r==0) {
            TOKUTXN oldest_txn = oldest_txnv;
            assert(oldest_txn != txn); // We just removed it
            assert(oldest_txn->txnid64 > txn->logger->oldest_living_xid); //Must be newer than the previous oldest
            txn->logger->oldest_living_xid = oldest_txn->txnid64;
            txn->logger->oldest_living_starttime = oldest_txn->starttime;
            assert(oldest_txn->txnid64 > logger->oldest_living_xid); //Must be newer than the previous oldest
            logger->oldest_living_xid = oldest_txn->txnid64;
            logger->oldest_living_starttime = oldest_txn->starttime;
        }
        else {
            //No living transactions
            assert(r==EINVAL);
            txn->logger->oldest_living_xid = TXNID_NONE_LIVING;
            txn->logger->oldest_living_starttime = 0;
            logger->oldest_living_xid = TXNID_NONE_LIVING;
            logger->oldest_living_starttime = 0;
        }
    }

    note_txn_closing(txn);
    xids_destroy(&txn->xids);
    toku_txn_ignore_free(txn); // 2954
    toku_free(txn);
    return;
}

void* toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
@@ -816,7 +811,6 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
    toku_omt_iterate(txn->open_brts, remove_txn, txn);
    toku_omt_destroy(&txn->open_brts);
}

// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
+20 −5
Original line number Diff line number Diff line
@@ -488,19 +488,26 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn) {
    *do_fsync_lsn = ttxn->do_fsync_lsn;
}


void toku_txn_close_txn(TOKUTXN txn) {
    TOKULOGGER logger = txn->logger;
    toku_txn_rollback_txn(txn);
    toku_txn_destroy_txn(txn);
}

void toku_txn_rollback_txn(TOKUTXN txn) {
    toku_rollback_txn_close(txn); 
    txn = NULL; // txn is no longer valid
}

void toku_txn_destroy_txn(TOKUTXN txn) {
    if (garbage_collection_debug)
        verify_snapshot_system(logger);
        verify_snapshot_system(txn->logger);

    toku_omt_destroy(&txn->open_brts);
    xids_destroy(&txn->xids);
    toku_txn_ignore_free(txn); // 2954
    toku_free(txn);

    STATUS_VALUE(TXN_CLOSE)++;
    STATUS_VALUE(TXN_NUM_OPEN)--;
    return;
}

XIDS toku_txn_get_xids (TOKUTXN txn) {
@@ -777,4 +784,12 @@ toku_txn_get_state(TOKUTXN txn) {
    return txn->state;
}

#include <valgrind/drd.h>

void __attribute__((__constructor__)) toku_txn_drd_ignore(void);
void
toku_txn_drd_ignore(void) {
    DRD_IGNORE_VAR(txn_status);
}

#undef STATUS_VALUE
+8 −0
Original line number Diff line number Diff line
@@ -49,8 +49,16 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
int toku_txn_maybe_fsync_log(TOKULOGGER logger, LSN do_fsync_lsn, BOOL do_fsync, YIELDF yield, void *yieldv);

void toku_txn_get_fsync_info(TOKUTXN ttxn, BOOL* do_fsync, LSN* do_fsync_lsn);

// Rollback and destroy a txn
void toku_txn_close_txn(TOKUTXN txn);

// Remove the txn from any live txn lists
void toku_txn_rollback_txn(TOKUTXN txn);

// Free the memory of a txn
void toku_txn_destroy_txn(TOKUTXN txn);

XIDS toku_txn_get_xids (TOKUTXN);

// Returns TRUE if a is older than b
+59 −51
Original line number Diff line number Diff line
@@ -9,18 +9,6 @@
#include "log_header.h"
#include "ydb_txn.h"

// add a txn to the list of open txn's
static void 
env_add_open_txn(DB_ENV *env, DB_TXN *txn UU()) {
    (void) __sync_fetch_and_add(&env->i->open_txns, 1);
}

// remove a txn from the list of open txn's
static void 
env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn UU()) {
    (void) __sync_fetch_and_sub(&env->i->open_txns, 1);
}

static int 
toku_txn_release_locks(DB_TXN* txn) {
    assert(txn);
@@ -58,8 +46,18 @@ ydb_yield (voidfp f, void *fv, void *UU(v)) {
    toku_ydb_lock();
}

int 
toku_txn_commit(DB_TXN * txn, u_int32_t flags,
static void
toku_txn_destroy(DB_TXN *txn) {
    (void) __sync_fetch_and_sub(&txn->mgrp->i->open_txns, 1);
    toku_txn_destroy_txn(db_txn_struct_i(txn)->tokutxn);
#if !TOKUDB_NATIVE_H
    toku_free(db_txn_struct_i(txn));
#endif
    toku_free(txn);
}

static int 
toku_txn_commit_only(DB_TXN * txn, u_int32_t flags,
                     TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
                     bool release_multi_operation_client_lock) {
    if (!txn) return EINVAL;
@@ -80,7 +78,7 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
        assert(db_txn_struct_i(txn->parent)->child == txn);
        db_txn_struct_i(txn->parent)->child=NULL;
    }
    env_remove_open_txn(txn->mgrp, txn);

    //toku_ydb_notef("flags=%d\n", flags);
    if (flags & DB_TXN_SYNC) {
        toku_txn_force_fsync_on_commit(db_txn_struct_i(txn)->tokutxn);
@@ -90,18 +88,17 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
    flags &= ~DB_TXN_NOSYNC;

    int r;
    if (flags!=0)
    if (flags!=0) {
	// frees the tokutxn
	// Calls ydb_yield(NULL) occasionally
        //r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
        r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra,
			       release_multi_operation_client_lock);
    else
    } else {
	// frees the tokutxn
	// Calls ydb_yield(NULL) occasionally
        //r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
        r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL,
				poll, poll_extra, release_multi_operation_client_lock);
    }

    if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
	env_panic(txn->mgrp, r, "Error during commit.\n");
@@ -142,11 +139,9 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
    //  in the test_stress tests.
    //
    toku_txn_get_fsync_info(ttxn, &do_fsync, &do_fsync_lsn);
    toku_txn_close_txn(ttxn);
    toku_txn_rollback_txn(ttxn);
    toku_txn_maybe_fsync_log(logger, do_fsync_lsn, do_fsync, ydb_yield, NULL);

    // the toxutxn is freed, and we must free the rest. */

    //Promote list to parent (dbs that must close before abort)
    if (txn->parent) {
        //Combine lists.
@@ -162,15 +157,19 @@ toku_txn_commit(DB_TXN * txn, u_int32_t flags,
        }
    }

    // The txn is no good after the commit even if the commit fails, so free it up.
#if !TOKUDB_NATIVE_H
    toku_free(db_txn_struct_i(txn));
#endif
    toku_free(txn);    txn = NULL;
    if (flags!=0) return EINVAL;
    return r;
}

int 
toku_txn_commit(DB_TXN * txn, u_int32_t flags,
                TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
                bool release_multi_operation_client_lock) {
    int r = toku_txn_commit_only(txn, flags, poll, poll_extra, release_multi_operation_client_lock);
    toku_txn_destroy(txn);
    return r;
}

static u_int32_t 
toku_txn_id(DB_TXN * txn) {
    HANDLE_PANICKED_ENV(txn->mgrp);
@@ -179,8 +178,8 @@ toku_txn_id(DB_TXN * txn) {
    return -1;
}

int 
toku_txn_abort(DB_TXN * txn,
static int 
toku_txn_abort_only(DB_TXN * txn,
                    TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
                    bool release_multi_operation_client_lock) {
    HANDLE_PANICKED_ENV(txn->mgrp);
@@ -200,12 +199,10 @@ toku_txn_abort(DB_TXN * txn,
        assert(db_txn_struct_i(txn->parent)->child == txn);
        db_txn_struct_i(txn->parent)->child=NULL;
    }
    env_remove_open_txn(txn->mgrp, txn);

    //All dbs that must close before abort, must now be closed
    assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort));

    //int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
    int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra, release_multi_operation_client_lock);
    if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
	env_panic(txn->mgrp, r, "Error during abort.\n");
@@ -213,13 +210,16 @@ toku_txn_abort(DB_TXN * txn,
    HANDLE_PANICKED_ENV(txn->mgrp);
    assert_zero(r);
    r = toku_txn_release_locks(txn);
    //toku_logger_txn_close(db_txn_struct_i(txn)->tokutxn);
    toku_txn_close_txn(db_txn_struct_i(txn)->tokutxn);
    toku_txn_rollback_txn(db_txn_struct_i(txn)->tokutxn);
    return r;
}

#if !TOKUDB_NATIVE_H
    toku_free(db_txn_struct_i(txn));
#endif
    toku_free(txn);
int 
toku_txn_abort(DB_TXN * txn,
               TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra,
               bool release_multi_operation_client_lock) {
    int r = toku_txn_abort_only(txn, poll, poll_extra, release_multi_operation_client_lock);
    toku_txn_destroy(txn);
    return r;
}   
 
@@ -233,7 +233,10 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) {

static u_int32_t 
locked_txn_id(DB_TXN *txn) {
    toku_ydb_lock(); u_int32_t r = toku_txn_id(txn); toku_ydb_unlock(); return r;
    toku_ydb_lock(); 
    u_int32_t r = toku_txn_id(txn); 
    toku_ydb_unlock(); 
    return r;
}

static int 
@@ -244,7 +247,10 @@ toku_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {

static int 
locked_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
    toku_ydb_lock(); int r = toku_txn_stat(txn, txn_stat); toku_ydb_unlock(); return r;
    toku_ydb_lock(); 
    int r = toku_txn_stat(txn, txn_stat); 
    toku_ydb_unlock(); 
    return r;
}

static int
@@ -270,8 +276,9 @@ locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
    }
    toku_multi_operation_client_lock(); //Cannot checkpoint during a commit.
    toku_ydb_lock();
    r = toku_txn_commit(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
    r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
    toku_ydb_unlock();
    toku_txn_destroy(txn);
    return r;
}

@@ -280,22 +287,21 @@ locked_txn_abort_with_progress(DB_TXN *txn,
                               TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
    toku_multi_operation_client_lock(); //Cannot checkpoint during an abort.
    toku_ydb_lock();
    int r = toku_txn_abort(txn, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lokc
    int r = toku_txn_abort_only(txn, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
    toku_ydb_unlock();
    toku_txn_destroy(txn);
    return r;
}

int 
locked_txn_commit(DB_TXN *txn, u_int32_t flags) {
    int r;
    r = locked_txn_commit_with_progress(txn, flags, NULL, NULL);
    int r = locked_txn_commit_with_progress(txn, flags, NULL, NULL);
    return r;
}

int 
locked_txn_abort(DB_TXN *txn) {
    int r;
    r = locked_txn_abort_with_progress(txn, NULL, NULL);
    int r = locked_txn_abort_with_progress(txn, NULL, NULL);
    return r;
}

@@ -449,7 +455,9 @@ toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t fla
        assert(!db_txn_struct_i(result->parent)->child);
        db_txn_struct_i(result->parent)->child = result;
    }
    env_add_open_txn(env, result);

    (void) __sync_fetch_and_add(&env->i->open_txns, 1);

    *txn = result;
    return 0;
}