Commit 51ee3581 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel
Browse files

[t:4552], finish fix

git-svn-id: file:///svn/toku/tokudb@40180 c7de825b-a66e-492c-adef-691d508d4ae1
parent d71bfabb
Loading
Loading
Loading
Loading
+6 −3
Original line number Diff line number Diff line
@@ -128,9 +128,11 @@ toku_pin_brtnode(
    const PIVOT_BOUNDS bounds,
    BRTNODE_FETCH_EXTRA bfe,
    BOOL apply_ancestor_messages, // this BOOL is probably temporary, for #3972, once we know how range query estimates work, will revisit this
    BRTNODE *node_p)
    BRTNODE *node_p,
    BOOL* msgs_applied)
{
    void *node_v;
    *msgs_applied = FALSE;
    int r = toku_cachetable_get_and_pin_nonblocking(
            brt->cf,
            blocknum,
@@ -146,7 +148,7 @@ toku_pin_brtnode(
    if (r==0) {
        BRTNODE node = node_v;
        if (apply_ancestor_messages) {
            maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
            maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, msgs_applied);
        }
        *node_p = node;
        // printf("%*sPin %ld\n", 8-node->height, "", blocknum.b);
@@ -183,7 +185,8 @@ toku_pin_brtnode_holding_lock(
        );
    assert(r==0);
    BRTNODE node = node_v;
    if (apply_ancestor_messages) maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds);
    BOOL msgs_applied;
    if (apply_ancestor_messages) maybe_apply_ancestors_messages_to_node(brt, node, ancestors, bounds, &msgs_applied);
    *node_p = node;
}

+2 −1
Original line number Diff line number Diff line
@@ -72,7 +72,8 @@ toku_pin_brtnode(
    const PIVOT_BOUNDS pbounds,
    BRTNODE_FETCH_EXTRA bfe,
    BOOL apply_ancestor_messages, // this BOOL is probably temporary, for #3972, once we know how range query estimates work, will revisit this
    BRTNODE *node_p
    BRTNODE *node_p,
    BOOL* msgs_applied
    ) __attribute__((__warn_unused_result__));

/**
+1 −1
Original line number Diff line number Diff line
@@ -706,7 +706,7 @@ struct pivot_bounds {
};

// FIXME needs toku prefix
void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds);
void maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, BOOL* msgs_applied);

int
toku_brt_search_which_child(
+39 −15
Original line number Diff line number Diff line
@@ -4777,7 +4777,8 @@ bnc_apply_messages_to_basement_node(
    BASEMENTNODE bn,   // where to apply messages
    BRTNODE ancestor,  // the ancestor node where we can find messages to apply
    int childnum,      // which child buffer of ancestor contains messages we want
    struct pivot_bounds const * const bounds  // contains pivot key bounds of this basement node
    struct pivot_bounds const * const bounds,  // contains pivot key bounds of this basement node
    BOOL* msgs_applied
    )
{
    int r;
@@ -4830,6 +4831,7 @@ bnc_apply_messages_to_basement_node(

        // Apply the messages in MSN order.
        for (int i = 0; i < buffer_size; ++i) {
            *msgs_applied = TRUE;
            const struct fifo_entry *entry = toku_fifo_get_entry(bnc->buffer, offsets[i]);
            do_brt_leaf_put_cmd(t, leafnode, bn, ancestor, childnum, entry);
        }
@@ -4838,12 +4840,13 @@ bnc_apply_messages_to_basement_node(
    } else if (stale_lbi == stale_ube) {
        // No stale messages to apply, we just apply fresh messages.
        struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum };

        if (fresh_ube - fresh_lbi > 0) *msgs_applied = TRUE;
        r = toku_omt_iterate_on_range(bnc->fresh_message_tree, fresh_lbi, fresh_ube, iterate_do_brt_leaf_put_cmd, &iter_extra);
        assert_zero(r);
    } else if (fresh_lbi == fresh_ube) {
        // No fresh messages to apply, we just apply stale messages.

        if (stale_ube - stale_lbi > 0) *msgs_applied = TRUE;
        struct iterate_do_brt_leaf_put_cmd_extra iter_extra = { .t = t, .leafnode = leafnode, .bn = bn, .ancestor = ancestor, .childnum = childnum };

        r = toku_omt_iterate_on_range(bnc->stale_message_tree, stale_lbi, stale_ube, iterate_do_brt_leaf_put_cmd, &iter_extra);
@@ -4867,6 +4870,7 @@ bnc_apply_messages_to_basement_node(
        // Iterate over both lists, applying the smaller (in (key, msn)
        // order) message at each step
        while (stale_i < stale_ube && fresh_i < fresh_ube) {
            *msgs_applied = TRUE;
            const long stale_offset = (long) stale_v;
            const long fresh_offset = (long) fresh_v;
            int c = toku_fifo_entry_key_msn_cmp(&extra, &stale_offset, &fresh_offset);
@@ -4944,7 +4948,7 @@ bnc_apply_messages_to_basement_node(
}

void
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds)
maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors, struct pivot_bounds const * const bounds, BOOL* msgs_applied)
// Effect:
//   Bring a leaf node up-to-date according to all the messages in the ancestors.
//   If the leaf node is already up-to-date then do nothing.
@@ -4975,7 +4979,8 @@ maybe_apply_ancestors_messages_to_node (BRT t, BRTNODE node, ANCESTORS ancestors
                    curr_bn,
                    curr_ancestors->node,
                    curr_ancestors->childnum,
                    &curr_bounds
                    &curr_bounds,
                    msgs_applied
                    );
                // We don't want to check this ancestor node again if the
                // next time we query it, the msn hasn't changed.
@@ -5186,6 +5191,7 @@ brt_node_maybe_prefetch(BRT brt, BRTNODE node, int childnum, BRT_CURSOR brtcurso
struct unlock_brtnode_extra {
    BRT brt;
    BRTNODE node;
    BOOL msgs_applied;
};
// When this is called, the cachetable lock is held
static void
@@ -5194,7 +5200,13 @@ unlock_brtnode_fun (void *v) {
    BRT brt = x->brt;
    BRTNODE node = x->node;
    // CT lock is held
    int r = toku_cachetable_unpin_ct_prelocked_no_flush(brt->cf, node->thisnodename, node->fullhash, (enum cachetable_dirty) node->dirty, make_brtnode_pair_attr(node));
    int r = toku_cachetable_unpin_ct_prelocked_no_flush(
        brt->cf, 
        node->thisnodename, 
        node->fullhash, 
        (enum cachetable_dirty) node->dirty, 
        x->msgs_applied ? make_brtnode_pair_attr(node) : make_invalid_pair_attr()
        );
    assert(r==0);
}

@@ -5221,24 +5233,24 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
        brtcursor->right_is_pos_infty,
        brtcursor->disable_prefetching
        );
    BOOL msgs_applied = FALSE;
    {
        int rr = toku_pin_brtnode(brt, childblocknum, fullhash,
                                  unlockers,
                                  &next_ancestors, bounds,
                                  &bfe,
                                  TRUE,
                                  &childnode);
                                  &childnode,
                                  &msgs_applied);
        if (rr==TOKUDB_TRY_AGAIN) return rr;
        assert(rr==0);
    }

    struct unlock_brtnode_extra unlock_extra   = {brt,childnode};
    struct unlock_brtnode_extra unlock_extra   = {brt,childnode,msgs_applied};
    struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};

    int r = brt_search_node(brt, childnode, search, bfe.child_to_read, getf, getf_v, doprefetch, brtcursor, &next_unlockers, &next_ancestors, bounds, can_bulk_fetch);
    if (r!=TOKUDB_TRY_AGAIN) {
        // Even if r is reactive, we want to handle the maybe reactive child.

#if TOKU_DO_PREFETCH
        // maybe prefetch the next child
        if (r == 0 && node->height == 1) {
@@ -5247,7 +5259,12 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
#endif

        assert(next_unlockers.locked);
        toku_unpin_brtnode_read_only(brt, childnode); // unpin the childnode before handling the reactive child (because that may make the childnode disappear.)
        if (msgs_applied) {
            toku_unpin_brtnode(brt, childnode);
        }
        else {
            toku_unpin_brtnode_read_only(brt, childnode);
        }
    } else {
        // try again.

@@ -5258,9 +5275,14 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
        //  some piece of a node that it needed was not in memory. In this case,
        //  the node was not unpinned, so we unpin it here
        if (next_unlockers.locked) {
            if (msgs_applied) {
                toku_unpin_brtnode(brt, childnode);
            }
            else {
                toku_unpin_brtnode_read_only(brt, childnode);
            }
        }
    }

    return r;
}
@@ -5557,7 +5579,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
    //uint tree_height = node->height + 1;  // How high is the tree?  This is the height of the root node plus one (leaf is at height 0).


    struct unlock_brtnode_extra unlock_extra   = {brt,node};
    struct unlock_brtnode_extra unlock_extra   = {brt,node,FALSE};
    struct unlockers		unlockers      = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};

    {
@@ -6061,11 +6083,13 @@ toku_brt_keyrange_internal (BRT brt, BRTNODE node,
	BLOCKNUM childblocknum = BP_BLOCKNUM(node, child_number);
	u_int32_t fullhash = compute_child_fullhash(brt->cf, node, child_number);
	BRTNODE childnode;
	r = toku_pin_brtnode(brt, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, FALSE, &childnode);
        BOOL msgs_applied = FALSE;
	r = toku_pin_brtnode(brt, childblocknum, fullhash, unlockers, &next_ancestors, bounds, bfe, FALSE, &childnode, &msgs_applied);
        assert(!msgs_applied);
	if (r != TOKUDB_TRY_AGAIN) {
	    assert(r == 0);

	    struct unlock_brtnode_extra unlock_extra   = {brt,childnode};
	    struct unlock_brtnode_extra unlock_extra   = {brt,childnode,FALSE};
	    struct unlockers next_unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, unlockers};
	    const struct pivot_bounds next_bounds = next_pivot_keys(node, child_number, bounds);

@@ -6119,7 +6143,7 @@ toku_brt_keyrange (BRT brt, DBT *key, u_int64_t *less_p, u_int64_t *equal_p, u_i
            toku_brtheader_release_treelock(brt->h);
        }

	struct unlock_brtnode_extra unlock_extra = {brt,node};
	struct unlock_brtnode_extra unlock_extra = {brt,node,FALSE};
	struct unlockers unlockers = {TRUE, unlock_brtnode_fun, (void*)&unlock_extra, (UNLOCKERS)NULL};

	{
+6 −3
Original line number Diff line number Diff line
@@ -620,7 +620,8 @@ flush_to_leaf(BRT t, bool make_leaf_up_to_date, bool use_flush) {
        parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
        struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
        const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
        maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds);
        BOOL* msgs_applied;
        maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &infinite_bounds, &msgs_applied);

        FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
                     {
@@ -840,7 +841,8 @@ flush_to_leaf_with_keyrange(BRT t, bool make_leaf_up_to_date) {
    parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
    struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
    const struct pivot_bounds bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = kv_pair_malloc(childkeys[7].data, childkeys[7].size, NULL, 0) };
    maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds);
    BOOL msgs_applied;
    maybe_apply_ancestors_messages_to_node(t, child, &ancestors, &bounds, &msgs_applied);

    FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
                 {
@@ -1024,7 +1026,8 @@ compare_apply_and_flush(BRT t, bool make_leaf_up_to_date) {
    parentnode->max_msn_applied_to_node_on_disk = max_parent_msn;
    struct ancestors ancestors = { .node = parentnode, .childnum = 0, .next = NULL };
    const struct pivot_bounds infinite_bounds = { .lower_bound_exclusive = NULL, .upper_bound_inclusive = NULL };
    maybe_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds);
    BOOL msgs_applied;
    maybe_apply_ancestors_messages_to_node(t, child2, &ancestors, &infinite_bounds, &msgs_applied);

    FIFO_ITERATE(parent_bnc->buffer, key, keylen, val, vallen, type, msn, xids, is_fresh,
                 {