Loading src/loader.c +94 −82 Original line number Diff line number Diff line Loading @@ -168,6 +168,25 @@ static void free_loader(DB_LOADER *loader) static const char *loader_temp_prefix = "tokuld"; // #2536 static const char *loader_temp_suffix = "XXXXXX"; static int brt_loader_close_and_redirect(DB_LOADER *loader) { int r; // use the bulk loader // in case you've been looking - here is where the real work is done! r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra, loader->i->poll_func, loader->i->poll_extra); if ( r==0 ) { for (int i=0; i<loader->i->N; i++) { toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. r = toku_dictionary_redirect(loader->i->inames_in_env[i], loader->i->dbs[i]->i->brt, db_txn_struct_i(loader->i->txn)->tokutxn); toku_ydb_unlock(); if ( r!=0 ) break; } } return r; } // loader_flags currently has three possible values: // 0 use brt loader Loading @@ -190,7 +209,7 @@ int toku_loader_create_loader(DB_ENV *env, *blp = NULL; // set later when created DB_LOADER *loader; DB_LOADER *loader = NULL; XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) Loading Loading @@ -225,8 +244,7 @@ int toku_loader_create_loader(DB_ENV *env, // lock tables and check empty for(int i=0;i<N;i++) { if (!(loader_flags&DB_PRELOCKED_WRITE)) { BOOL using_puts = (loader->i->loader_flags & LOADER_USE_PUTS) != 0; r = toku_db_pre_acquire_table_lock(dbs[i], txn, !using_puts); r = toku_db_pre_acquire_table_lock(dbs[i], txn); if (r!=0) break; } r = !toku_brt_is_empty_fast(dbs[i]->i->brt); Loading @@ -244,17 +262,6 @@ int toku_loader_create_loader(DB_ENV *env, } // time to open the big kahuna if ( FALSE && (loader->i->loader_flags & LOADER_USE_PUTS) ) { XCALLOC_N(loader->i->N, loader->i->ekeys); XCALLOC_N(loader->i->N, loader->i->evals); for (int i=0; i<N; i++) { loader->i->ekeys[i].flags = DB_DBT_REALLOC; loader->i->evals[i].flags = DB_DBT_REALLOC; } loader->i->brt_loader = NULL; rval = 0; } else { char **XMALLOC_N(N, new_inames_in_env); BRT *XMALLOC_N(N, brts); for (int i=0; i<N; i++) { Loading Loading @@ -290,8 +297,27 @@ int toku_loader_create_loader(DB_ENV *env, } loader->i->inames_in_env = new_inames_in_env; toku_free(brts); if (loader->i->loader_flags & LOADER_USE_PUTS) { XCALLOC_N(loader->i->N, loader->i->ekeys); XCALLOC_N(loader->i->N, loader->i->evals); toku_ydb_unlock(); // the following function grabs the ydb lock, so we // first unlock before calling it rval = brt_loader_close_and_redirect(loader); toku_ydb_lock(); assert_zero(rval); for (int i=0; i<N; i++) { loader->i->ekeys[i].flags = DB_DBT_REALLOC; loader->i->evals[i].flags = DB_DBT_REALLOC; toku_brt_suppress_recovery_logs(dbs[i]->i->brt, db_txn_struct_i(txn)->tokutxn); } loader->i->brt_loader = NULL; // close the brtloader and skip to the redirection rval = 0; } rval = 0; } *blp = loader; create_exit: Loading Loading @@ -343,7 +369,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) goto cleanup; } if ( FALSE && (loader->i->loader_flags & LOADER_USE_PUTS) ) { if (loader->i->loader_flags & LOADER_USE_PUTS) { r = loader->i->env->put_multiple(loader->i->env, loader->i->src_db, // src_db loader->i->txn, Loading Loading @@ -394,7 +420,7 @@ int toku_loader_close(DB_LOADER *loader) if ( loader->i->error_callback != NULL ) { loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); } if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { r = toku_brt_loader_abort(loader->i->brt_loader, TRUE); } else { Loading @@ -402,22 +428,8 @@ int toku_loader_close(DB_LOADER *loader) } } else { // no error outstanding if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { // use the bulk loader // in case you've been looking - here is where the real work is done! r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra, loader->i->poll_func, loader->i->poll_extra); if ( r==0 ) { for (int i=0; i<loader->i->N; i++) { toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. r = toku_dictionary_redirect(loader->i->inames_in_env[i], loader->i->dbs[i]->i->brt, db_txn_struct_i(loader->i->txn)->tokutxn); toku_ydb_unlock(); if ( r!=0 ) break; } } if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { brt_loader_close_and_redirect(loader); } } toku_ydb_lock(); Loading @@ -441,7 +453,7 @@ int toku_loader_abort(DB_LOADER *loader) } } if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS) ) { if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) { r = toku_brt_loader_abort(loader->i->brt_loader, TRUE); } toku_ydb_lock(); Loading src/ydb.c +2 −2 Original line number Diff line number Diff line Loading @@ -2776,7 +2776,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna // further down the stack. DB* zombie = env_get_zombie_db_with_dname(env, dname); if (zombie) r = toku_db_pre_acquire_table_lock(zombie, child, TRUE); r = toku_db_pre_acquire_table_lock(zombie, child); if (r!=0 && r!=DB_LOCK_NOTGRANTED) toku_ydb_do_error(env, r, "Cannot remove dictionary.\n"); } Loading Loading @@ -2883,7 +2883,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam if (r==0) { zombie = env_get_zombie_db_with_dname(env, dname); if (zombie) r = toku_db_pre_acquire_table_lock(zombie, child, TRUE); r = toku_db_pre_acquire_table_lock(zombie, child); if (r!=0 && r!=DB_LOCK_NOTGRANTED) toku_ydb_do_error(env, r, "Cannot rename dictionary.\n"); } Loading src/ydb_db.c +3 −58 Original line number Diff line number Diff line Loading @@ -647,66 +647,11 @@ toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* key, u // needed by loader.c int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL UU(just_lock)) { toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { HANDLE_PANICKED_DB(db); if (!db->i->lt || !txn) return 0; int r; r = get_range_lock(db, txn, toku_lt_neg_infinity, toku_lt_infinity, LOCK_REQUEST_WRITE); // commented out code for log suppression and recovery suppression. // TODO: figure out right thing to do with this code. #if 0 if (r==0 && !just_lock && !toku_brt_is_recovery_logging_suppressed(db->i->brt) && toku_brt_is_empty_fast(db->i->brt) ) { //Try to suppress both rollback and recovery logs DB_LOADER *loader; DB *dbs[1] = {db}; uint32_t db_flags[1] = {DB_NOOVERWRITE}; uint32_t dbt_flags[1] = {0}; uint32_t loader_flags = DB_PRELOCKED_WRITE; //Don't recursively prelock DB_ENV *env = db->dbenv; DB_TXN *child = NULL; { // begin child int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(rt==0); } toku_ydb_unlock(); //Cannot hold ydb lock when creating loader int r_loader = env->create_loader(env, child, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags); if (r_loader==0) { r_loader = loader->set_error_callback(loader, NULL, NULL); assert(r_loader==0); r_loader = loader->set_poll_function(loader, NULL, NULL); assert(r_loader==0); // close the loader r_loader = loader->close(loader); if (r_loader==0) { toku_brt_suppress_recovery_logs(db->i->brt, db_txn_struct_i(child)->tokutxn); } } else if (r_loader != DB_LOCK_NOTGRANTED) { //Lock not granted is not an error. //It just means we cannot use the loader optimization. assert(r==0); r = r_loader; } if (r_loader == 0) { // commit r = locked_txn_commit(child, 0); assert(r==0); STATUS_VALUE(YDB_LAYER_LOGSUPPRESS)++; // accountability } else { // abort r = locked_txn_abort(child); assert(r==0); STATUS_VALUE(YDB_LAYER_LOGSUPPRESS_FAIL)++; // accountability } toku_ydb_lock(); //Reaquire ydb lock. } #endif return r; } Loading Loading @@ -766,7 +711,7 @@ toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) { if (r != 0) { return r; } r = toku_db_pre_acquire_table_lock(db, txn, TRUE); r = toku_db_pre_acquire_table_lock(db, txn); if (r != 0) { return r; } Loading Loading @@ -1003,7 +948,7 @@ toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float static int db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { return toku_db_pre_acquire_table_lock(db, txn, TRUE); return toku_db_pre_acquire_table_lock(db, txn); } int toku_close_db_internal (DB * db, bool oplsn_valid, LSN oplsn) { Loading src/ydb_db.h +1 −1 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ toku_db_get_compare_fun(DB* db) { int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn); int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode); int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock); int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn); int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags); int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags); int toku_db_close(DB * db, u_int32_t flags, bool oplsn_valid, LSN oplsn); Loading src/ydb_write.c +1 −1 Original line number Diff line number Diff line Loading @@ -281,7 +281,7 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn, BOOL do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE)); if (do_locking) { r = toku_db_pre_acquire_table_lock(db, txn, TRUE); r = toku_db_pre_acquire_table_lock(db, txn); if (r != 0) { goto cleanup; } } Loading Loading
src/loader.c +94 −82 Original line number Diff line number Diff line Loading @@ -168,6 +168,25 @@ static void free_loader(DB_LOADER *loader) static const char *loader_temp_prefix = "tokuld"; // #2536 static const char *loader_temp_suffix = "XXXXXX"; static int brt_loader_close_and_redirect(DB_LOADER *loader) { int r; // use the bulk loader // in case you've been looking - here is where the real work is done! r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra, loader->i->poll_func, loader->i->poll_extra); if ( r==0 ) { for (int i=0; i<loader->i->N; i++) { toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. r = toku_dictionary_redirect(loader->i->inames_in_env[i], loader->i->dbs[i]->i->brt, db_txn_struct_i(loader->i->txn)->tokutxn); toku_ydb_unlock(); if ( r!=0 ) break; } } return r; } // loader_flags currently has three possible values: // 0 use brt loader Loading @@ -190,7 +209,7 @@ int toku_loader_create_loader(DB_ENV *env, *blp = NULL; // set later when created DB_LOADER *loader; DB_LOADER *loader = NULL; XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func) XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL) Loading Loading @@ -225,8 +244,7 @@ int toku_loader_create_loader(DB_ENV *env, // lock tables and check empty for(int i=0;i<N;i++) { if (!(loader_flags&DB_PRELOCKED_WRITE)) { BOOL using_puts = (loader->i->loader_flags & LOADER_USE_PUTS) != 0; r = toku_db_pre_acquire_table_lock(dbs[i], txn, !using_puts); r = toku_db_pre_acquire_table_lock(dbs[i], txn); if (r!=0) break; } r = !toku_brt_is_empty_fast(dbs[i]->i->brt); Loading @@ -244,17 +262,6 @@ int toku_loader_create_loader(DB_ENV *env, } // time to open the big kahuna if ( FALSE && (loader->i->loader_flags & LOADER_USE_PUTS) ) { XCALLOC_N(loader->i->N, loader->i->ekeys); XCALLOC_N(loader->i->N, loader->i->evals); for (int i=0; i<N; i++) { loader->i->ekeys[i].flags = DB_DBT_REALLOC; loader->i->evals[i].flags = DB_DBT_REALLOC; } loader->i->brt_loader = NULL; rval = 0; } else { char **XMALLOC_N(N, new_inames_in_env); BRT *XMALLOC_N(N, brts); for (int i=0; i<N; i++) { Loading Loading @@ -290,8 +297,27 @@ int toku_loader_create_loader(DB_ENV *env, } loader->i->inames_in_env = new_inames_in_env; toku_free(brts); if (loader->i->loader_flags & LOADER_USE_PUTS) { XCALLOC_N(loader->i->N, loader->i->ekeys); XCALLOC_N(loader->i->N, loader->i->evals); toku_ydb_unlock(); // the following function grabs the ydb lock, so we // first unlock before calling it rval = brt_loader_close_and_redirect(loader); toku_ydb_lock(); assert_zero(rval); for (int i=0; i<N; i++) { loader->i->ekeys[i].flags = DB_DBT_REALLOC; loader->i->evals[i].flags = DB_DBT_REALLOC; toku_brt_suppress_recovery_logs(dbs[i]->i->brt, db_txn_struct_i(txn)->tokutxn); } loader->i->brt_loader = NULL; // close the brtloader and skip to the redirection rval = 0; } rval = 0; } *blp = loader; create_exit: Loading Loading @@ -343,7 +369,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) goto cleanup; } if ( FALSE && (loader->i->loader_flags & LOADER_USE_PUTS) ) { if (loader->i->loader_flags & LOADER_USE_PUTS) { r = loader->i->env->put_multiple(loader->i->env, loader->i->src_db, // src_db loader->i->txn, Loading Loading @@ -394,7 +420,7 @@ int toku_loader_close(DB_LOADER *loader) if ( loader->i->error_callback != NULL ) { loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra); } if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { r = toku_brt_loader_abort(loader->i->brt_loader, TRUE); } else { Loading @@ -402,22 +428,8 @@ int toku_loader_close(DB_LOADER *loader) } } else { // no error outstanding if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { // use the bulk loader // in case you've been looking - here is where the real work is done! r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra, loader->i->poll_func, loader->i->poll_extra); if ( r==0 ) { for (int i=0; i<loader->i->N; i++) { toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. r = toku_dictionary_redirect(loader->i->inames_in_env[i], loader->i->dbs[i]->i->brt, db_txn_struct_i(loader->i->txn)->tokutxn); toku_ydb_unlock(); if ( r!=0 ) break; } } if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) { brt_loader_close_and_redirect(loader); } } toku_ydb_lock(); Loading @@ -441,7 +453,7 @@ int toku_loader_abort(DB_LOADER *loader) } } if (TRUE || !(loader->i->loader_flags & LOADER_USE_PUTS) ) { if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) { r = toku_brt_loader_abort(loader->i->brt_loader, TRUE); } toku_ydb_lock(); Loading
src/ydb.c +2 −2 Original line number Diff line number Diff line Loading @@ -2776,7 +2776,7 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna // further down the stack. DB* zombie = env_get_zombie_db_with_dname(env, dname); if (zombie) r = toku_db_pre_acquire_table_lock(zombie, child, TRUE); r = toku_db_pre_acquire_table_lock(zombie, child); if (r!=0 && r!=DB_LOCK_NOTGRANTED) toku_ydb_do_error(env, r, "Cannot remove dictionary.\n"); } Loading Loading @@ -2883,7 +2883,7 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam if (r==0) { zombie = env_get_zombie_db_with_dname(env, dname); if (zombie) r = toku_db_pre_acquire_table_lock(zombie, child, TRUE); r = toku_db_pre_acquire_table_lock(zombie, child); if (r!=0 && r!=DB_LOCK_NOTGRANTED) toku_ydb_do_error(env, r, "Cannot rename dictionary.\n"); } Loading
src/ydb_db.c +3 −58 Original line number Diff line number Diff line Loading @@ -647,66 +647,11 @@ toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)), DBT* key, u // needed by loader.c int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL UU(just_lock)) { toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { HANDLE_PANICKED_DB(db); if (!db->i->lt || !txn) return 0; int r; r = get_range_lock(db, txn, toku_lt_neg_infinity, toku_lt_infinity, LOCK_REQUEST_WRITE); // commented out code for log suppression and recovery suppression. // TODO: figure out right thing to do with this code. #if 0 if (r==0 && !just_lock && !toku_brt_is_recovery_logging_suppressed(db->i->brt) && toku_brt_is_empty_fast(db->i->brt) ) { //Try to suppress both rollback and recovery logs DB_LOADER *loader; DB *dbs[1] = {db}; uint32_t db_flags[1] = {DB_NOOVERWRITE}; uint32_t dbt_flags[1] = {0}; uint32_t loader_flags = DB_PRELOCKED_WRITE; //Don't recursively prelock DB_ENV *env = db->dbenv; DB_TXN *child = NULL; { // begin child int rt = toku_txn_begin(env, txn, &child, DB_TXN_NOSYNC, 1, true); assert(rt==0); } toku_ydb_unlock(); //Cannot hold ydb lock when creating loader int r_loader = env->create_loader(env, child, &loader, NULL, 1, dbs, db_flags, dbt_flags, loader_flags); if (r_loader==0) { r_loader = loader->set_error_callback(loader, NULL, NULL); assert(r_loader==0); r_loader = loader->set_poll_function(loader, NULL, NULL); assert(r_loader==0); // close the loader r_loader = loader->close(loader); if (r_loader==0) { toku_brt_suppress_recovery_logs(db->i->brt, db_txn_struct_i(child)->tokutxn); } } else if (r_loader != DB_LOCK_NOTGRANTED) { //Lock not granted is not an error. //It just means we cannot use the loader optimization. assert(r==0); r = r_loader; } if (r_loader == 0) { // commit r = locked_txn_commit(child, 0); assert(r==0); STATUS_VALUE(YDB_LAYER_LOGSUPPRESS)++; // accountability } else { // abort r = locked_txn_abort(child); assert(r==0); STATUS_VALUE(YDB_LAYER_LOGSUPPRESS_FAIL)++; // accountability } toku_ydb_lock(); //Reaquire ydb lock. } #endif return r; } Loading Loading @@ -766,7 +711,7 @@ toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) { if (r != 0) { return r; } r = toku_db_pre_acquire_table_lock(db, txn, TRUE); r = toku_db_pre_acquire_table_lock(db, txn); if (r != 0) { return r; } Loading Loading @@ -1003,7 +948,7 @@ toku_db_verify_with_progress(DB *db, int (*progress_callback)(void *extra, float static int db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { return toku_db_pre_acquire_table_lock(db, txn, TRUE); return toku_db_pre_acquire_table_lock(db, txn); } int toku_close_db_internal (DB * db, bool oplsn_valid, LSN oplsn) { Loading
src/ydb_db.h +1 −1 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ toku_db_get_compare_fun(DB* db) { int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn); int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode); int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock); int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn); int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags); int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags); int toku_db_close(DB * db, u_int32_t flags, bool oplsn_valid, LSN oplsn); Loading
src/ydb_write.c +1 −1 Original line number Diff line number Diff line Loading @@ -281,7 +281,7 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn, BOOL do_locking = (db->i->lt && !(lock_flags & DB_PRELOCKED_WRITE)); if (do_locking) { r = toku_db_pre_acquire_table_lock(db, txn, TRUE); r = toku_db_pre_acquire_table_lock(db, txn); if (r != 0) { goto cleanup; } } Loading