Commit 875036d1 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel
Browse files

[t:4502] fix a weird case rich thought of

git-svn-id: file:///svn/toku/tokudb@40101 c7de825b-a66e-492c-adef-691d508d4ae1
parent 67828ac0
Loading
Loading
Loading
Loading
+11 −3
Original line number Diff line number Diff line
@@ -912,6 +912,8 @@ brt_leaf_apply_cmd_once (
    const BRT_MSG cmd,
    u_int32_t idx,
    LEAFENTRY le,
    OMT snapshot_xids,
    OMT live_list_reverse,
    uint64_t *workdonep
    );

@@ -925,7 +927,9 @@ brt_leaf_put_cmd (
    BASEMENTNODE bn, 
    BRT_MSG cmd, 
    bool* made_change,
    uint64_t *workdone
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    );

void toku_apply_cmd_to_leaf(
@@ -935,7 +939,9 @@ void toku_apply_cmd_to_leaf(
    BRTNODE node, 
    BRT_MSG cmd, 
    bool *made_change, 
    uint64_t *workdone
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    );

// FIXME needs toku prefix
@@ -945,7 +951,9 @@ void brtnode_put_cmd (
    DESCRIPTOR desc,
    BRTNODE node, 
    BRT_MSG cmd, 
    bool is_fresh
    bool is_fresh,
    OMT snapshot_txnids,
    OMT live_list_reverse
    );

void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
+3 −1
Original line number Diff line number Diff line
@@ -143,7 +143,9 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
        &brt->h->descriptor,
        node,
        &cmd,
        true
        true,
	NULL,
	NULL
        );    

    toku_verify_or_set_counts(node);
+43 −19
Original line number Diff line number Diff line
@@ -1373,6 +1373,8 @@ brt_leaf_apply_cmd_once (
    const BRT_MSG cmd,
    u_int32_t idx,
    LEAFENTRY le,
    OMT snapshot_xids,
    OMT live_list_reverse,
    uint64_t *workdone
    )
// Effect: Apply cmd to leafentry (msn is ignored)
@@ -1395,7 +1397,7 @@ brt_leaf_apply_cmd_once (
    // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is 
    // no longer in use.  We'll have to release the old mempool later.
    {
	int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, &numbytes_delta);
	int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, snapshot_xids, live_list_reverse, &numbytes_delta);
	invariant(r==0);
    }

@@ -1466,6 +1468,8 @@ struct setval_extra_s {
    const DBT *key;
    u_int32_t idx;
    LEAFENTRY le;
    OMT snapshot_txnids;
    OMT live_list_reverse;
    bool made_change;
    uint64_t * workdone;  // set by brt_leaf_apply_cmd_once()
};
@@ -1498,6 +1502,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
        }
        brt_leaf_apply_cmd_once(svextra->leafnode, svextra->bn, &msg,
                                svextra->idx, svextra->le,
                                svextra->snapshot_txnids, svextra->live_list_reverse,
                                svextra->workdone);
        svextra->setval_r = 0;
    }
@@ -1509,7 +1514,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
// would be to put a dummy msn in the messages created by setval_fun(), but preserving
// the original msn seems cleaner and it preserves accountability at a lower layer.
static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BRTNODE leafnode, BASEMENTNODE bn, BRT_MSG cmd, int idx,
                     LEAFENTRY le,  bool* made_change,
                     LEAFENTRY le, OMT snapshot_txnids, OMT live_list_reverse, bool* made_change,
                     uint64_t * workdone) {
    LEAFENTRY le_for_update;
    DBT key;
@@ -1552,7 +1557,7 @@ static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BRTNODE leafno
    }

    struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, leafnode, bn, cmd->msn, cmd->xids,
                                          keyp, idx, le_for_update, 0, workdone};
                                          keyp, idx, le_for_update, snapshot_txnids, live_list_reverse, 0, workdone};
    // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
    FAKE_DB(db, tmp_desc, desc);
    int r = update_fun(
@@ -1581,7 +1586,9 @@ brt_leaf_put_cmd (
    BASEMENTNODE bn, 
    BRT_MSG cmd, 
    bool* made_change,
    uint64_t *workdone
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    )
// Effect: 
//   Put a cmd into a leaf.
@@ -1623,7 +1630,7 @@ brt_leaf_put_cmd (
	    assert(r==0);
	    storeddata=storeddatav;
	}
	brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, workdone);
	brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);

	// if the insertion point is within a window of the right edge of
	// the leaf then it is sequential
@@ -1655,7 +1662,7 @@ brt_leaf_put_cmd (
	while (1) {
	    u_int32_t num_leafentries_before = toku_omt_size(bn->buffer);

	    brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, workdone);
	    brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
	    *made_change = 1;

	    { 
@@ -1705,7 +1712,7 @@ brt_leaf_put_cmd (
	    storeddata=storeddatav;
	    int deleted = 0;
	    if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, workdone);
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
		u_int32_t new_omt_size = toku_omt_size(bn->buffer);
		if (new_omt_size != omt_size) {
		    assert(new_omt_size+1 == omt_size);
@@ -1732,7 +1739,7 @@ brt_leaf_put_cmd (
	    storeddata=storeddatav;
	    int deleted = 0;
	    if (le_has_xids(storeddata, cmd->xids)) {
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, workdone);
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
		u_int32_t new_omt_size = toku_omt_size(bn->buffer);
		if (new_omt_size != omt_size) {
		    assert(new_omt_size+1 == omt_size);
@@ -1754,10 +1761,10 @@ brt_leaf_put_cmd (
	r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
			       &storeddatav, &idx);
	if (r==DB_NOTFOUND) {
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, NULL, made_change, workdone);
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, NULL, snapshot_txnids, live_list_reverse, made_change, workdone);
	} else if (r==0) {
	    storeddata=storeddatav;
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, made_change, workdone);
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, made_change, workdone);
	} // otherwise, a worse error, just return it
	break;
    }
@@ -1769,7 +1776,7 @@ brt_leaf_put_cmd (
	    r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
	    assert(r==0);
	    storeddata=storeddatav;
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, made_change, workdone);
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, made_change, workdone);
	    // TODO(leif): This early return means get_leaf_reactivity()
	    // and VERIFY_NODE() never get called.  Is this a problem?
	    assert(r==0);
@@ -2242,7 +2249,9 @@ toku_bnc_flush_to_child(
                desc,
                child, 
                &brtcmd, 
                is_fresh
                is_fresh,
                NULL,  // pass NULL omts (snapshot_txnids and live_list_reverse)
                NULL   // we're going to handle GC ourselves next
                );
        }));

@@ -2297,7 +2306,9 @@ brtnode_put_cmd (
    DESCRIPTOR desc,
    BRTNODE node, 
    BRT_MSG cmd, 
    bool is_fresh
    bool is_fresh,
    OMT snapshot_txnids,
    OMT live_list_reverse
    )
// Effect: Push CMD into the subtree rooted at NODE.
//   If NODE is a leaf, then
@@ -2322,7 +2333,9 @@ brtnode_put_cmd (
            node, 
            cmd, 
            &made_change, 
            &workdone
            &workdone,
            snapshot_txnids,
            live_list_reverse
            );
    } else {
        brt_nonleaf_put_cmd(compare_fun, desc, node, cmd, is_fresh);
@@ -2343,7 +2356,9 @@ void toku_apply_cmd_to_leaf(
    BRTNODE node, 
    BRT_MSG cmd, 
    bool *made_change, 
    uint64_t *workdone
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    ) 
{
    VERIFY_NODE(t, node);
@@ -2385,7 +2400,9 @@ void toku_apply_cmd_to_leaf(
                             BLB(node, childnum),
                             cmd,
                             made_change,
                             workdone);
                             workdone,
                             snapshot_txnids,
                             live_list_reverse);
        } else {
            STATUS_VALUE(BRT_MSN_DISCARDS)++;
        }
@@ -2402,7 +2419,9 @@ void toku_apply_cmd_to_leaf(
                                 BLB(node, childnum),
                                 cmd,
                                 &bn_made_change,
                                 workdone);
                                 workdone,
                                 snapshot_txnids,
                                 live_list_reverse);
                if (bn_made_change) *made_change = 1;
            } else {
                STATUS_VALUE(BRT_MSN_DISCARDS)++;
@@ -2421,6 +2440,9 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
{
    BRTNODE node = *nodep;
    toku_assert_entire_node_in_memory(node);
    TOKULOGGER logger = toku_cachefile_logger(brt->cf);
    OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL;
    OMT live_list_reverse = logger ? logger->live_list_reverse : NULL;
    MSN cmd_msn = cmd->msn;
    invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
    brtnode_put_cmd(
@@ -2429,7 +2451,9 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
        &brt->h->descriptor,
        node,
        cmd,
        true
        true,
        snapshot_txnids,
        live_list_reverse
        );
    //
    // assumption is that brtnode_put_cmd will
@@ -4592,7 +4616,7 @@ do_brt_leaf_put_cmd(BRT t, BRTNODE leafnode, BASEMENTNODE bn, BRTNODE ancestor,
        DBT hv;
        BRT_MSG_S brtcmd = { type, msn, xids, .u.id = { &hk, toku_fill_dbt(&hv, val, vallen) } };
        bool made_change;
        brt_leaf_put_cmd(t->compare_fun, t->update_fun, &t->h->descriptor, leafnode, bn, &brtcmd, &made_change, &BP_WORKDONE(ancestor, childnum));
        brt_leaf_put_cmd(t->compare_fun, t->update_fun, &t->h->descriptor, leafnode, bn, &brtcmd, &made_change, &BP_WORKDONE(ancestor, childnum), NULL, NULL);  // pass NULL omts (snapshot_txnids and live_list_reverse) to prevent GC from running on message application for a query
    } else {
        STATUS_VALUE(BRT_MSN_DISCARDS)++;
    }
+1 −1
Original line number Diff line number Diff line
@@ -2788,7 +2788,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
    DBT theval = { .data = val, .size = vallen };
    BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, lbuf->xids, .u.id = { &thekey, &theval } };
    uint64_t workdone=0;
    brt_leaf_apply_cmd_once(leafnode, BLB(leafnode,0), &cmd, idx, NULL, &workdone);
    brt_leaf_apply_cmd_once(leafnode, BLB(leafnode,0), &cmd, idx, NULL, NULL, NULL, &workdone);
}

static int write_literal(struct dbout *out, void*data,  size_t len) {
+5 −0
Original line number Diff line number Diff line
@@ -336,6 +336,8 @@ apply_msg_to_leafentry(BRT_MSG msg, // message to apply to leafentry
		       OMT omt, 
		       struct mempool *mp, 
		       void **maybe_free,
                       OMT snapshot_xids,
		       OMT live_list_reverse,
                       int64_t * numbytes_delta_p) {  // change in total size of key and val, not including any overhead
    ULE_S ule;
    int rval;
@@ -349,6 +351,9 @@ apply_msg_to_leafentry(BRT_MSG msg, // message to apply to leafentry
	oldnumbytes = ule_get_innermost_numbytes(&ule);
    }
    msg_modify_ule(&ule, msg);          // modify unpacked leafentry
    if (snapshot_xids && live_list_reverse) {
	garbage_collection(&ule, snapshot_xids, live_list_reverse);
    }
    rval = le_pack(&ule,                // create packed leafentry
		   new_leafentry_memorysize, 
		   new_leafentry_p,
Loading