Loading src/ydb-internal.h +1 −1 Original line number Diff line number Diff line Loading @@ -71,7 +71,7 @@ struct __toku_db_env_internal { CACHETABLE cachetable; TOKULOGGER logger; toku_ltm* ltm; struct toku_list open_txns; int open_txns; // Number of open transactions DB *directory; // Maps dnames to inames DB *persistent_environment; // Stores environment settings, can be used for upgrade OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location) Loading src/ydb.c +41 −35 Original line number Diff line number Diff line Loading @@ -282,19 +282,19 @@ env_opened(DB_ENV *env) { static void env_init_open_txn(DB_ENV *env) { toku_list_init(&env->i->open_txns); env->i->open_txns = 0; } // add a txn to the list of open txn's static void env_add_open_txn(DB_ENV *env, DB_TXN *txn) { toku_list_push(&env->i->open_txns, (struct toku_list *) (void *) &txn->open_txns); env_add_open_txn(DB_ENV *env, DB_TXN *txn UU()) { (void) __sync_fetch_and_add(&env->i->open_txns, 1); } // remove a txn from the list of open txn's static void env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) { toku_list_remove((struct toku_list *) (void *) &txn->open_txns); env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn UU()) { (void) __sync_fetch_and_sub(&env->i->open_txns, 1); } static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*, Loading Loading @@ -534,7 +534,8 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags); static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode); static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags); static int toku_db_close(DB * db, u_int32_t flags); static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal); static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); static int toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock); static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock); static int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode); Loading Loading @@ -1054,7 +1055,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { DB_TXN *txn=NULL; if (using_txns) { r = toku_txn_begin(env, 0, &txn, 0, 1); r = toku_txn_begin_internal(env, 0, &txn, 0, 1, true); assert(r==0); } Loading Loading @@ -1138,7 +1139,7 @@ toku_env_close(DB_ENV * env, u_int32_t flags) { // if panicked, or if any open transactions, or any open dbs, then do nothing. if (toku_env_is_panicked(env)) goto panic_and_quit_early; if (!toku_list_empty(&env->i->open_txns)) { if (env->i->open_txns != 0) { err_msg = "Cannot close environment due to open transactions\n"; r = toku_ydb_do_error(env, EINVAL, "%s", err_msg); goto panic_and_quit_early; Loading Loading @@ -2418,9 +2419,6 @@ env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* fi return -1; // placate compiler } static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); static int toku_db_lt_panic(DB* db, int r); static toku_dbt_cmp toku_db_get_compare_fun(DB* db); Loading @@ -2436,6 +2434,8 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { MALLOC(result); if (result == 0) { r = ENOMEM; goto cleanup; } memset(result, 0, sizeof *result); // locked methods result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_locked_env_err; #define SENV(name) result->name = locked_env_ ## name SENV(dbremove); Loading @@ -2453,22 +2453,9 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { SENV(cleaner_get_period); SENV(cleaner_set_iterations); SENV(cleaner_get_iterations); result->checkpointing_postpone = env_checkpointing_postpone; result->checkpointing_resume = env_checkpointing_resume; result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation; result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation; result->get_engine_status_num_rows = env_get_engine_status_num_rows; result->get_engine_status = env_get_engine_status; result->get_engine_status_text = env_get_engine_status_text; result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert result->get_iname = env_get_iname; SENV(open); SENV(close); result->txn_checkpoint = toku_env_txn_checkpoint; SENV(log_flush); result->set_errcall = toku_env_set_errcall; result->set_errfile = toku_env_set_errfile; result->set_errpfx = toku_env_set_errpfx; //SENV(set_noticecall); SENV(set_flags); SENV(set_data_dir); Loading @@ -2492,7 +2479,6 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { #endif SENV(log_archive); SENV(txn_stat); result->txn_begin = locked_txn_begin; SENV(set_redzone); SENV(create_indexer); SENV(create_loader); Loading @@ -2500,6 +2486,22 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { SENV(set_lock_timeout); #undef SENV // unlocked methods result->txn_checkpoint = toku_env_txn_checkpoint; result->checkpointing_postpone = env_checkpointing_postpone; result->checkpointing_resume = env_checkpointing_resume; result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation; result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation; result->get_engine_status_num_rows = env_get_engine_status_num_rows; result->get_engine_status = env_get_engine_status; result->get_engine_status_text = env_get_engine_status_text; result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert result->get_iname = env_get_iname; result->set_errcall = toku_env_set_errcall; result->set_errfile = toku_env_set_errfile; result->set_errpfx = toku_env_set_errpfx; result->txn_begin = toku_txn_begin; MALLOC(result->i); if (result->i == 0) { r = ENOMEM; goto cleanup; } memset(result->i, 0, sizeof *result->i); Loading Loading @@ -2757,9 +2759,12 @@ toku_txn_abort(DB_TXN * txn, return r; } // Create a new transaction. // Called without holding the ydb lock. static int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { toku_ydb_lock(); int r = toku_txn_begin(env, stxn, txn, flags, 0); toku_ydb_unlock(); return r; toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { int r = toku_txn_begin_internal(env, stxn, txn, flags, 0, false); return r; } static u_int32_t Loading Loading @@ -2831,7 +2836,7 @@ locked_txn_abort(DB_TXN *txn) { } static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal) { toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) { HANDLE_PANICKED_ENV(env); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists. if (!toku_logger_is_open(env->i->logger)) return toku_ydb_do_error(env, EINVAL, "Environment does not have logging enabled\n"); Loading Loading @@ -2925,7 +2930,6 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int i #undef STXN result->txn_stat = locked_txn_stat; result->parent = stxn; #if !TOKUDB_NATIVE_H MALLOC(db_txn_struct_i(result)); Loading Loading @@ -2965,12 +2969,14 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int i break; } } if (!holds_ydb_lock) toku_ydb_lock(); r = toku_txn_begin_txn(result, stxn ? db_txn_struct_i(stxn)->tokutxn : 0, &db_txn_struct_i(result)->tokutxn, env->i->logger, snapshot_type ); if (!holds_ydb_lock) toku_ydb_unlock(); if (r != 0) return r; Loading Loading @@ -4683,7 +4689,7 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP DB_TXN *child = NULL; // begin child (unless transactionless) if (using_txns) { r = toku_txn_begin(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1); r = toku_txn_begin_internal(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1, true); assert(r==0); } Loading Loading @@ -5372,7 +5378,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna DB_TXN *child = NULL; // begin child (unless transactionless) if (using_txns) { r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(r==0); } Loading Loading @@ -5486,7 +5492,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam DB_TXN *child = NULL; // begin child (unless transactionless) if (using_txns) { r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(r==0); } Loading Loading @@ -5728,7 +5734,7 @@ toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) { { // begin child int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); int rt = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(rt==0); } Loading Loading @@ -5781,7 +5787,7 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c } BOOL nosync = (BOOL)(!force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT)); u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0); int r = toku_txn_begin(env, NULL, txn, txn_flags, 1); int r = toku_txn_begin_internal(env, NULL, txn, txn_flags, 1, true); if (r!=0) return r; *changed = TRUE; return 0; Loading Loading @@ -6580,7 +6586,7 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname // begin child (unless transactionless) if (using_txns) { rval = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); rval = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(rval == 0); xid = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn); } Loading Loading
src/ydb-internal.h +1 −1 Original line number Diff line number Diff line Loading @@ -71,7 +71,7 @@ struct __toku_db_env_internal { CACHETABLE cachetable; TOKULOGGER logger; toku_ltm* ltm; struct toku_list open_txns; int open_txns; // Number of open transactions DB *directory; // Maps dnames to inames DB *persistent_environment; // Stores environment settings, can be used for upgrade OMT open_dbs; // Stores open db handles, sorted first by dname and then by numerical value of pointer to the db (arbitrarily assigned memory location) Loading
src/ydb.c +41 −35 Original line number Diff line number Diff line Loading @@ -282,19 +282,19 @@ env_opened(DB_ENV *env) { static void env_init_open_txn(DB_ENV *env) { toku_list_init(&env->i->open_txns); env->i->open_txns = 0; } // add a txn to the list of open txn's static void env_add_open_txn(DB_ENV *env, DB_TXN *txn) { toku_list_push(&env->i->open_txns, (struct toku_list *) (void *) &txn->open_txns); env_add_open_txn(DB_ENV *env, DB_TXN *txn UU()) { (void) __sync_fetch_and_add(&env->i->open_txns, 1); } // remove a txn from the list of open txn's static void env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) { toku_list_remove((struct toku_list *) (void *) &txn->open_txns); env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn UU()) { (void) __sync_fetch_and_sub(&env->i->open_txns, 1); } static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*, Loading Loading @@ -534,7 +534,8 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags); static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode); static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags); static int toku_db_close(DB * db, u_int32_t flags); static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal); static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); static int toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock); static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*, bool release_multi_operation_client_lock); static int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode); Loading Loading @@ -1054,7 +1055,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { DB_TXN *txn=NULL; if (using_txns) { r = toku_txn_begin(env, 0, &txn, 0, 1); r = toku_txn_begin_internal(env, 0, &txn, 0, 1, true); assert(r==0); } Loading Loading @@ -1138,7 +1139,7 @@ toku_env_close(DB_ENV * env, u_int32_t flags) { // if panicked, or if any open transactions, or any open dbs, then do nothing. if (toku_env_is_panicked(env)) goto panic_and_quit_early; if (!toku_list_empty(&env->i->open_txns)) { if (env->i->open_txns != 0) { err_msg = "Cannot close environment due to open transactions\n"; r = toku_ydb_do_error(env, EINVAL, "%s", err_msg); goto panic_and_quit_early; Loading Loading @@ -2418,9 +2419,6 @@ env_crash(DB_ENV * UU(db_env), const char* msg, const char * fun, const char* fi return -1; // placate compiler } static int locked_txn_begin(DB_ENV * env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags); static int toku_db_lt_panic(DB* db, int r); static toku_dbt_cmp toku_db_get_compare_fun(DB* db); Loading @@ -2436,6 +2434,8 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { MALLOC(result); if (result == 0) { r = ENOMEM; goto cleanup; } memset(result, 0, sizeof *result); // locked methods result->err = (void (*)(const DB_ENV * env, int error, const char *fmt, ...)) toku_locked_env_err; #define SENV(name) result->name = locked_env_ ## name SENV(dbremove); Loading @@ -2453,22 +2453,9 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { SENV(cleaner_get_period); SENV(cleaner_set_iterations); SENV(cleaner_get_iterations); result->checkpointing_postpone = env_checkpointing_postpone; result->checkpointing_resume = env_checkpointing_resume; result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation; result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation; result->get_engine_status_num_rows = env_get_engine_status_num_rows; result->get_engine_status = env_get_engine_status; result->get_engine_status_text = env_get_engine_status_text; result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert result->get_iname = env_get_iname; SENV(open); SENV(close); result->txn_checkpoint = toku_env_txn_checkpoint; SENV(log_flush); result->set_errcall = toku_env_set_errcall; result->set_errfile = toku_env_set_errfile; result->set_errpfx = toku_env_set_errpfx; //SENV(set_noticecall); SENV(set_flags); SENV(set_data_dir); Loading @@ -2492,7 +2479,6 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { #endif SENV(log_archive); SENV(txn_stat); result->txn_begin = locked_txn_begin; SENV(set_redzone); SENV(create_indexer); SENV(create_loader); Loading @@ -2500,6 +2486,22 @@ toku_env_create(DB_ENV ** envp, u_int32_t flags) { SENV(set_lock_timeout); #undef SENV // unlocked methods result->txn_checkpoint = toku_env_txn_checkpoint; result->checkpointing_postpone = env_checkpointing_postpone; result->checkpointing_resume = env_checkpointing_resume; result->checkpointing_begin_atomic_operation = env_checkpointing_begin_atomic_operation; result->checkpointing_end_atomic_operation = env_checkpointing_end_atomic_operation; result->get_engine_status_num_rows = env_get_engine_status_num_rows; result->get_engine_status = env_get_engine_status; result->get_engine_status_text = env_get_engine_status_text; result->crash = env_crash; // handlerton's call to fractal tree layer on failed assert result->get_iname = env_get_iname; result->set_errcall = toku_env_set_errcall; result->set_errfile = toku_env_set_errfile; result->set_errpfx = toku_env_set_errpfx; result->txn_begin = toku_txn_begin; MALLOC(result->i); if (result->i == 0) { r = ENOMEM; goto cleanup; } memset(result->i, 0, sizeof *result->i); Loading Loading @@ -2757,9 +2759,12 @@ toku_txn_abort(DB_TXN * txn, return r; } // Create a new transaction. // Called without holding the ydb lock. static int locked_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { toku_ydb_lock(); int r = toku_txn_begin(env, stxn, txn, flags, 0); toku_ydb_unlock(); return r; toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags) { int r = toku_txn_begin_internal(env, stxn, txn, flags, 0, false); return r; } static u_int32_t Loading Loading @@ -2831,7 +2836,7 @@ locked_txn_abort(DB_TXN *txn) { } static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal) { toku_txn_begin_internal(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, bool internal, bool holds_ydb_lock) { HANDLE_PANICKED_ENV(env); HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists. if (!toku_logger_is_open(env->i->logger)) return toku_ydb_do_error(env, EINVAL, "Environment does not have logging enabled\n"); Loading Loading @@ -2925,7 +2930,6 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int i #undef STXN result->txn_stat = locked_txn_stat; result->parent = stxn; #if !TOKUDB_NATIVE_H MALLOC(db_txn_struct_i(result)); Loading Loading @@ -2965,12 +2969,14 @@ toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int i break; } } if (!holds_ydb_lock) toku_ydb_lock(); r = toku_txn_begin_txn(result, stxn ? db_txn_struct_i(stxn)->tokutxn : 0, &db_txn_struct_i(result)->tokutxn, env->i->logger, snapshot_type ); if (!holds_ydb_lock) toku_ydb_unlock(); if (r != 0) return r; Loading Loading @@ -4683,7 +4689,7 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP DB_TXN *child = NULL; // begin child (unless transactionless) if (using_txns) { r = toku_txn_begin(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1); r = toku_txn_begin_internal(db->dbenv, txn, &child, DB_TXN_NOSYNC, 1, true); assert(r==0); } Loading Loading @@ -5372,7 +5378,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna DB_TXN *child = NULL; // begin child (unless transactionless) if (using_txns) { r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(r==0); } Loading Loading @@ -5486,7 +5492,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam DB_TXN *child = NULL; // begin child (unless transactionless) if (using_txns) { r = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); r = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(r==0); } Loading Loading @@ -5728,7 +5734,7 @@ toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) { { // begin child int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); int rt = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(rt==0); } Loading Loading @@ -5781,7 +5787,7 @@ toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, BOOL force_auto_c } BOOL nosync = (BOOL)(!force_auto_commit && !(env->i->open_flags & DB_AUTO_COMMIT)); u_int32_t txn_flags = DB_TXN_NOWAIT | (nosync ? DB_TXN_NOSYNC : 0); int r = toku_txn_begin(env, NULL, txn, txn_flags, 1); int r = toku_txn_begin_internal(env, NULL, txn, txn_flags, 1, true); if (r!=0) return r; *changed = TRUE; return 0; Loading Loading @@ -6580,7 +6586,7 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname // begin child (unless transactionless) if (using_txns) { rval = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1); rval = toku_txn_begin_internal(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(rval == 0); xid = toku_txn_get_txnid(db_txn_struct_i(child)->tokutxn); } Loading