Commit 91539210 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel
Browse files

[t:4693], write brtnodes out in parallel for loader, serially for other cases

git-svn-id: file:///svn/toku/tokudb@41699 c7de825b-a66e-492c-adef-691d508d4ae1
parent 136d3475
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -457,6 +457,7 @@ int toku_serialize_brtnode_to_memory (BRTNODE node,
                                      BRTNODE_DISK_DATA* ndd,
                                      unsigned int basementnodesize,
                                      BOOL do_rebalancing,
                                      BOOL in_parallel,
                              /*out*/ size_t *n_bytes_to_write,
                              /*out*/ char  **bytes_to_write);
int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, BRTNODE_DISK_DATA* ndd, BOOL do_rebalancing, struct brt_header *h, int n_workitems, int n_threads, BOOL for_checkpoint);
+81 −6
Original line number Diff line number Diff line
@@ -721,13 +721,63 @@ toku_create_compressed_partition_from_available(
}



static void
serialize_and_compress(BRTNODE node, int npartitions, struct sub_block sb[]) {
serialize_and_compress_serially(BRTNODE node, int npartitions, struct sub_block sb[]) {
    for (int i = 0; i < npartitions; i++) {
        serialize_and_compress_partition(node, i, &sb[i]);
    }
}

struct serialize_compress_work {
    struct work base;
    BRTNODE node;
    int i;
    struct sub_block *sb;
};

static void *
serialize_and_compress_worker(void *arg) {
    struct workset *ws = (struct workset *) arg;
    while (1) {
        struct serialize_compress_work *w = (struct serialize_compress_work *) workset_get(ws);
        if (w == NULL)
            break;
        int i = w->i;
        serialize_and_compress_partition(w->node, i, &w->sb[i]);
    }
    workset_release_ref(ws);
    return arg;
}

static void
serialize_and_compress_in_parallel(BRTNODE node, int npartitions, struct sub_block sb[]) {
    if (npartitions == 1) {
        serialize_and_compress_partition(node, 0, &sb[0]);
    } else {
        int T = num_cores;
        if (T > npartitions)
            T = npartitions;
        if (T > 0)
            T = T - 1;
        struct workset ws;
        workset_init(&ws);
        struct serialize_compress_work work[npartitions];
        workset_lock(&ws);
        for (int i = 0; i < npartitions; i++) {
            work[i] = (struct serialize_compress_work) { .node = node, .i = i, .sb = sb };
            workset_put_locked(&ws, &work[i].base);
        }
        workset_unlock(&ws);
        toku_thread_pool_run(brt_pool, 0, &T, serialize_and_compress_worker, &ws);
        workset_add_ref(&ws, T);
        serialize_and_compress_worker(&ws);
        workset_join(&ws);
        workset_destroy(&ws);
    }
}


// Writes out each child to a separate malloc'd buffer, then compresses
// all of them, and writes the uncompressed header, to bytes_to_write,
// which is malloc'd.
@@ -737,6 +787,7 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
                                  BRTNODE_DISK_DATA* ndd,
                                  unsigned int basementnodesize,
                                  BOOL do_rebalancing,
                                  BOOL in_parallel, // for loader is TRUE, for toku_brtnode_flush_callback, is false
                          /*out*/ size_t *n_bytes_to_write,
                          /*out*/ char  **bytes_to_write)
{
@@ -761,8 +812,12 @@ toku_serialize_brtnode_to_memory (BRTNODE node,
    //
    // First, let's serialize and compress the individual sub blocks
    //
    serialize_and_compress(node, npartitions, sb);

    if (in_parallel) {
        serialize_and_compress_in_parallel(node, npartitions, sb);
    }
    else {
        serialize_and_compress_serially(node, npartitions, sb);
    }
    //
    // Now lets create a sub-block that has the common node information,
    // This does NOT include the header
@@ -837,8 +892,28 @@ toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRTNODE_DISK
    size_t n_to_write;
    char *compressed_buf = NULL;
    {
	int r = toku_serialize_brtnode_to_memory(node, ndd, h->basementnodesize, do_rebalancing,
                                                 &n_to_write, &compressed_buf);
        // because toku_serialize_brtnode_to is only called for 
        // in toku_brtnode_flush_callback, we pass FALSE
        // for in_parallel. The reasoning is that when we write
        // nodes to disk via toku_brtnode_flush_callback, we 
        // assume that it is being done on a non-critical
        // background thread (probably for checkpointing), and therefore 
        // should not hog CPU,
        //
        // Should the above facts change, we may want to revisit
        // passing FALSE for in_parallel here
        //
        // alternatively, we could have made in_parallel a parameter
        // for toku_serialize_brtnode_to, but instead we did this.
        int r = toku_serialize_brtnode_to_memory(
            node, 
            ndd, 
            h->basementnodesize, 
            do_rebalancing,
            FALSE, // in_parallel
            &n_to_write, 
            &compressed_buf
            );
        if (r!=0) return r;
    }

+2 −2
Original line number Diff line number Diff line
@@ -2813,7 +2813,7 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
    size_t serialized_leaf_size = 0;
    char *serialized_leaf = NULL;
    BRTNODE_DISK_DATA ndd = NULL;
    result = toku_serialize_brtnode_to_memory(lbuf->node, &ndd, target_basementnodesize, TRUE, &serialized_leaf_size, &serialized_leaf);
    result = toku_serialize_brtnode_to_memory(lbuf->node, &ndd, target_basementnodesize, TRUE, TRUE, &serialized_leaf_size, &serialized_leaf);

    // write it out
    if (result == 0) {
@@ -3029,7 +3029,7 @@ static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknu
        size_t n_bytes;
        char *bytes;
        int r;
        r = toku_serialize_brtnode_to_memory(node, &ndd, target_basementnodesize, TRUE, &n_bytes, &bytes);
        r = toku_serialize_brtnode_to_memory(node, &ndd, target_basementnodesize, TRUE, TRUE, &n_bytes, &bytes);
        if (r) {
            result = r;
        } else {