Commit 94057e73 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel
Browse files

[t:4635] fixing the loader to work with compression types, and fixing some tests

git-svn-id: file:///svn/toku/tokudb@41510 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3ed34102
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -476,7 +476,7 @@ void toku_verify_or_set_counts(BRTNODE);

void 
toku_brt_header_init(struct brt_header *h,
                     BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize);
                     BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method compression_method);

int toku_serialize_brt_header_size (struct brt_header *h);
int toku_serialize_brt_header_to (int fd, struct brt_header *h);
+2 −2
Original line number Diff line number Diff line
@@ -6855,7 +6855,7 @@ toku_brt_header_note_hot_complete(BRT brt, BOOL success, MSN msn_at_start_of_hot

void
toku_brt_header_init(struct brt_header *h, 
                     BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize) {
                     BLOCKNUM root_blocknum_on_disk, LSN checkpoint_lsn, TXNID root_xid_that_created, uint32_t target_nodesize, uint32_t target_basementnodesize, enum toku_compression_method compression_method) {
    memset(h, 0, sizeof *h);
    h->layout_version   = BRT_LAYOUT_VERSION;
    h->layout_version_original = BRT_LAYOUT_VERSION;
@@ -6872,7 +6872,7 @@ toku_brt_header_init(struct brt_header *h,
    h->root_blocknum    = root_blocknum_on_disk;
    h->flags            = 0;
    h->root_xid_that_created = root_xid_that_created;
    h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
    h->compression_method = compression_method;
}

#include <valgrind/helgrind.h>
+3 −1
Original line number Diff line number Diff line
@@ -189,6 +189,7 @@ struct fractal_thread_args {
    int                      which_db;
    uint32_t                 target_nodesize;
    uint32_t                 target_basementnodesize;
    enum toku_compression_method target_compression_method;
};

void toku_brt_loader_set_n_rows(BRTLOADER bl, u_int64_t n_rows);
@@ -222,7 +223,8 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
				       uint64_t                 total_disksize_estimate,
                                       int                      which_db,
                                       uint32_t                 target_nodesize,
                                       uint32_t                 target_basementnodesize);
                                       uint32_t                 target_basementnodesize,
                                       enum toku_compression_method target_compression_method);

int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);

+14 −8
Original line number Diff line number Diff line
@@ -2270,7 +2270,8 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
					 uint64_t total_disksize_estimate,
                                         int which_db,
                                         uint32_t target_nodesize,
                                         uint32_t target_basementnodesize)
                                         uint32_t target_basementnodesize,
                                         enum toku_compression_method target_compression_method)
// Effect: Consume a sequence of rowsets work from a queue, creating a fractal tree.  Closes fd.
{
    // set the number of fractal tree writer threads so that we can partition memory in the merger
@@ -2298,7 +2299,7 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
        root_xid_that_created = bl->root_xids_that_created[which_db];

    struct brt_header h; 
    toku_brt_header_init(&h, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize);
    toku_brt_header_init(&h, (BLOCKNUM){0}, bl->load_lsn, root_xid_that_created, target_nodesize, target_basementnodesize, target_compression_method);

    struct dbout out;
    dbout_init(&out, &h);
@@ -2539,15 +2540,16 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
				       uint64_t                 total_disksize_estimate,
                                       int                      which_db,
                                       uint32_t                 target_nodesize,
                                       uint32_t                 target_basementnodesize)
                                       uint32_t                 target_basementnodesize,
                                       enum toku_compression_method target_compression_method)
// This is probably only for testing.
{
    target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
    target_basementnodesize = target_basementnodesize == 0 ? default_loader_basementnodesize : target_basementnodesize;
#if defined(__cilkplusplus)
    return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize);
    return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method);
#else
    return           toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize);
    return           toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method);
#endif
}

@@ -2556,9 +2558,9 @@ static void* fractal_thread (void *ftav) {
    BL_TRACE(blt_start_fractal_thread);
    struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus)
    int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize);
    int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method);
#else
    int r =           toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize);
    int r =           toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method);
#endif
    fta->errno_result = r;
    return NULL;
@@ -2596,10 +2598,13 @@ static int loader_do_i (BRTLOADER bl,
        }

        uint32_t target_nodesize, target_basementnodesize;
        enum toku_compression_method target_compression_method;
        r = dest_db->get_pagesize(dest_db, &target_nodesize);
        invariant_zero(r);
        r = dest_db->get_readpagesize(dest_db, &target_basementnodesize);
        invariant_zero(r);
        r = dest_db->get_compression_method(dest_db, &target_compression_method);
        invariant_zero(r);

	// This structure must stay live until the join below.
        struct fractal_thread_args fta = { bl,
@@ -2612,6 +2617,7 @@ static int loader_do_i (BRTLOADER bl,
                                           which_db,
                                           target_nodesize,
                                           target_basementnodesize,
                                           target_compression_method,
        };

	r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
@@ -2998,7 +3004,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu

    BRTNODE XMALLOC(node);
    toku_initialize_empty_brtnode(node, make_blocknum(blocknum_of_new_node), height, n_children, 
				  BRT_LAYOUT_VERSION, target_nodesize, 0, NULL);
				  BRT_LAYOUT_VERSION, target_nodesize, 0, out->h);
    for (int i=0; i<n_children-1; i++)
        node->childkeys[i] = NULL;
    unsigned int totalchildkeylens = 0;
+11 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    BRTNODE_DISK_DATA ndd = NULL;
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_ON_DISK);
    assert(BP_STATE(dn,1) == PT_ON_DISK);
@@ -57,6 +58,7 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    fill_bfe_for_prefetch(&bfe, brt_h, cursor);
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_AVAIL);
    assert(BP_STATE(dn,1) == PT_AVAIL);
@@ -79,6 +81,7 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    fill_bfe_for_prefetch(&bfe, brt_h, cursor);
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_ON_DISK);
    assert(BP_STATE(dn,1) == PT_AVAIL);
@@ -101,6 +104,7 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    fill_bfe_for_prefetch(&bfe, brt_h, cursor);
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_ON_DISK);
    assert(BP_STATE(dn,1) == PT_AVAIL);
@@ -122,6 +126,7 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    fill_bfe_for_prefetch(&bfe, brt_h, cursor);
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_ON_DISK);
    assert(BP_STATE(dn,1) == PT_ON_DISK);
@@ -143,6 +148,7 @@ test_prefetch_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    fill_bfe_for_prefetch(&bfe, brt_h, cursor);
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_AVAIL);
    assert(BP_STATE(dn,1) == PT_ON_DISK);
@@ -201,6 +207,7 @@ test_subset_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    bfe.disable_prefetching = TRUE;
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_ON_DISK);
    assert(BP_STATE(dn,1) == PT_ON_DISK);
@@ -226,6 +233,7 @@ test_subset_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    bfe.disable_prefetching = FALSE;
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_ON_DISK);
    assert(BP_STATE(dn,1) == PT_AVAIL);
@@ -250,6 +258,7 @@ test_subset_read(int fd, BRT UU(brt), struct brt_header *brt_h) {
    bfe.child_to_read = 0;
    r = toku_deserialize_brtnode_from(fd, make_blocknum(20), 0/*pass zero for hash*/, &dn, &ndd, &bfe);
    assert(r==0);
    dn->h = brt_h;
    assert(dn->n_children == 3);
    assert(BP_STATE(dn,0) == PT_AVAIL);
    assert(BP_STATE(dn,1) == PT_AVAIL);
@@ -334,9 +343,11 @@ test_prefetching(void) {
    struct brt *XMALLOC(brt);
    struct brt_header *XCALLOC(brt_h);
    brt->h = brt_h;
    sn.h = brt_h;
    brt_h->type = BRTHEADER_CURRENT;
    brt_h->panic = 0; brt_h->panic_string = 0;
    brt_h->basementnodesize = 128*1024;
    brt_h->compression_method = TOKU_DEFAULT_COMPRESSION_METHOD;
    toku_brtheader_init_treelock(brt_h);
    toku_blocktable_create_new(&brt_h->blocktable);
    //Want to use block #20
Loading