Commit fda61baf authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel
Browse files

[t:4762], merge to main

git-svn-id: file:///svn/toku/tokudb@42336 c7de825b-a66e-492c-adef-691d508d4ae1
parent 07c6b9a0
Loading
Loading
Loading
Loading
+7 −3
Original line number Diff line number Diff line
@@ -918,6 +918,7 @@ brt_leaf_apply_cmd_once (
    LEAFENTRY le,
    OMT snapshot_xids,
    OMT live_list_reverse,
    OMT live_root_txns,
    uint64_t *workdonep
    );

@@ -932,7 +933,8 @@ brt_leaf_put_cmd (
    BRT_MSG cmd, 
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    OMT live_list_reverse,
    OMT live_root_txns
    );

void toku_apply_cmd_to_leaf(
@@ -943,7 +945,8 @@ void toku_apply_cmd_to_leaf(
    BRT_MSG cmd, 
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    OMT live_list_reverse,
    OMT live_root_txns
    );

// FIXME needs toku prefix
@@ -955,7 +958,8 @@ void brtnode_put_cmd (
    BRT_MSG cmd, 
    bool is_fresh,
    OMT snapshot_txnids,
    OMT live_list_reverse
    OMT live_list_reverse,
    OMT live_root_txns
    );

void toku_reset_root_xid_that_created(BRT brt, TXNID new_root_xid_that_created);
+3 −2
Original line number Diff line number Diff line
@@ -147,6 +147,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
        &cmd,
        true,
        NULL,
        NULL,
        NULL
        );    

+61 −40
Original line number Diff line number Diff line
@@ -1563,6 +1563,7 @@ brt_leaf_apply_cmd_once (
    LEAFENTRY le,
    OMT snapshot_xids,
    OMT live_list_reverse,
    OMT live_root_txns,
    uint64_t *workdone
    )
// Effect: Apply cmd to leafentry (msn is ignored)
@@ -1585,7 +1586,7 @@ brt_leaf_apply_cmd_once (
    // That means le is guaranteed to not cause a sigsegv but it may point to a mempool that is 
    // no longer in use.  We'll have to release the old mempool later.
    {
	int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, snapshot_xids, live_list_reverse, &numbytes_delta);
	int r = apply_msg_to_leafentry(cmd, le, &newsize, &new_le, bn->buffer, &bn->buffer_mempool, &maybe_free, snapshot_xids, live_list_reverse, live_root_txns, &numbytes_delta);
	invariant(r==0);
    }

@@ -1658,6 +1659,7 @@ struct setval_extra_s {
    LEAFENTRY le;
    OMT snapshot_txnids;
    OMT live_list_reverse;
    OMT live_root_txns;
    uint64_t * workdone;  // set by brt_leaf_apply_cmd_once()
};

@@ -1689,7 +1691,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
        }
        brt_leaf_apply_cmd_once(svextra->leafnode, svextra->bn, &msg,
                                svextra->idx, svextra->le,
                                svextra->snapshot_txnids, svextra->live_list_reverse,
                                svextra->snapshot_txnids, svextra->live_list_reverse, svextra->live_root_txns,
                                svextra->workdone);
        svextra->setval_r = 0;
    }
@@ -1700,7 +1702,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
// would be to put a dummy msn in the messages created by setval_fun(), but preserving
// the original msn seems cleaner and it preserves accountability at a lower layer.
static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BRTNODE leafnode, BASEMENTNODE bn, BRT_MSG cmd, int idx,
                     LEAFENTRY le, OMT snapshot_txnids, OMT live_list_reverse,
                     LEAFENTRY le, OMT snapshot_txnids, OMT live_list_reverse, OMT live_root_txns,
                     uint64_t * workdone) {
    LEAFENTRY le_for_update;
    DBT key;
@@ -1743,7 +1745,7 @@ static int do_update(brt_update_func update_fun, DESCRIPTOR desc, BRTNODE leafno
    }

    struct setval_extra_s setval_extra = {setval_tag, FALSE, 0, leafnode, bn, cmd->msn, cmd->xids,
                                          keyp, idx, le_for_update, snapshot_txnids, live_list_reverse, workdone};
                                          keyp, idx, le_for_update, snapshot_txnids, live_list_reverse, live_root_txns, workdone};
    // call handlerton's brt->update_fun(), which passes setval_extra to setval_fun()
    FAKE_DB(db, desc);
    int r = update_fun(
@@ -1769,7 +1771,8 @@ brt_leaf_put_cmd (
    BRT_MSG cmd, 
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    OMT live_list_reverse,
    OMT live_root_txns
    )
// Effect: 
//   Put a cmd into a leaf.
@@ -1809,7 +1812,7 @@ brt_leaf_put_cmd (
	    assert(r==0);
	    storeddata=storeddatav;
	}
	brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
	brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, live_root_txns, workdone);

	// if the insertion point is within a window of the right edge of
	// the leaf then it is sequential
@@ -1841,7 +1844,7 @@ brt_leaf_put_cmd (
	while (1) {
	    u_int32_t num_leafentries_before = toku_omt_size(bn->buffer);

	    brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
	    brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, live_root_txns, workdone);

	    { 
		// Now we must find the next leafentry. 
@@ -1887,7 +1890,7 @@ brt_leaf_put_cmd (
	    storeddata=storeddatav;
	    int deleted = 0;
	    if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, live_root_txns, workdone);
		u_int32_t new_omt_size = toku_omt_size(bn->buffer);
		if (new_omt_size != omt_size) {
		    assert(new_omt_size+1 == omt_size);
@@ -1913,7 +1916,7 @@ brt_leaf_put_cmd (
	    storeddata=storeddatav;
	    int deleted = 0;
	    if (le_has_xids(storeddata, cmd->xids)) {
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
		brt_leaf_apply_cmd_once(leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, live_root_txns, workdone);
		u_int32_t new_omt_size = toku_omt_size(bn->buffer);
		if (new_omt_size != omt_size) {
		    assert(new_omt_size+1 == omt_size);
@@ -1934,10 +1937,10 @@ brt_leaf_put_cmd (
	r = toku_omt_find_zero(bn->buffer, toku_cmd_leafval_heaviside, &be,
			       &storeddatav, &idx);
	if (r==DB_NOTFOUND) {
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, NULL, snapshot_txnids, live_list_reverse, workdone);
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, NULL, snapshot_txnids, live_list_reverse, live_root_txns, workdone);
	} else if (r==0) {
	    storeddata=storeddatav;
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, live_root_txns, workdone);
	} // otherwise, a worse error, just return it
	break;
    }
@@ -1949,7 +1952,7 @@ brt_leaf_put_cmd (
	    r = toku_omt_fetch(bn->buffer, idx, &storeddatav);
	    assert(r==0);
	    storeddata=storeddatav;
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, workdone);
	    r = do_update(update_fun, desc, leafnode, bn, cmd, idx, storeddata, snapshot_txnids, live_list_reverse, live_root_txns, workdone);
	    // TODO(leif): This early return means get_leaf_reactivity()
	    // and VERIFY_NODE() never get called.  Is this a problem?
	    assert(r==0);
@@ -2286,6 +2289,7 @@ brt_basement_node_gc_once(BASEMENTNODE bn,
                          LEAFENTRY leaf_entry,
                          OMT snapshot_xids,
                          OMT live_list_reverse,
                          OMT live_root_txns,
                          STAT64INFO_S * delta)
{
    assert(leaf_entry);
@@ -2313,7 +2317,8 @@ brt_basement_node_gc_once(BASEMENTNODE bn,
                              &bn->buffer_mempool,
                              &maybe_free,
                              snapshot_xids,
                              live_list_reverse);
                              live_list_reverse,
                              live_root_txns);

    // These will represent the number of bytes and rows changed as
    // part of the garbage collection.
@@ -2355,6 +2360,7 @@ static void
basement_node_gc_all_les(BASEMENTNODE bn,
                         OMT snapshot_xids,
                         OMT live_list_reverse,
                         OMT live_root_txns,
                         STAT64INFO_S * delta)
{
    int r = 0;
@@ -2366,7 +2372,7 @@ basement_node_gc_all_les(BASEMENTNODE bn,
        r = toku_omt_fetch(bn->buffer, index, &storedatav);
        assert(r == 0);
        leaf_entry = storedatav;
        brt_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, live_list_reverse, delta);
        brt_basement_node_gc_once(bn, index, leaf_entry, snapshot_xids, live_list_reverse, live_root_txns, delta);
        // Check if the leaf entry was deleted or not.
        if (num_leafentries_before == toku_omt_size(bn->buffer)) {
            ++index;
@@ -2378,7 +2384,8 @@ basement_node_gc_all_les(BASEMENTNODE bn,
static void
brt_leaf_gc_all_les(BRTNODE node,
                    OMT snapshot_xids,
                    OMT live_list_reverse)
                    OMT live_list_reverse,
                    OMT live_root_txns)
{
    toku_assert_entire_node_in_memory(node);
    assert(node->height == 0);
@@ -2389,7 +2396,7 @@ brt_leaf_gc_all_les(BRTNODE node,
        STAT64INFO_S delta;
        delta.numrows = 0;
        delta.numbytes = 0;
        basement_node_gc_all_les(bn, snapshot_xids, live_list_reverse, &delta);
        basement_node_gc_all_les(bn, snapshot_xids, live_list_reverse, live_root_txns, &delta);
        // Update the header stats, but only if the leaf node is
        // dirty.
        if (node->dirty) {
@@ -2424,32 +2431,39 @@ toku_bnc_flush_to_child(
                &brtcmd, 
                is_fresh,
                NULL,  // pass NULL omts (snapshot_txnids and live_list_reverse)
                NULL   // we're going to handle GC ourselves next
                NULL,   // we're going to handle GC ourselves next
                NULL
                );
        }));

    // Run garbage collection, if we are a leaf entry.
    TOKULOGGER logger = toku_cachefile_logger(cf);
    if (child->height == 0 && logger) {
        int r;
        OMT snapshot_txnids = NULL;
        OMT live_list_reverse = NULL;
        OMT live_root_txns = NULL;
        {
            toku_pthread_mutex_lock(&logger->txn_list_lock);
        int r = toku_omt_clone_noptr(&snapshot_txnids,
            r = toku_omt_clone_noptr(&snapshot_txnids,
                                     logger->snapshot_txnids);
            assert_zero(r);
            r = toku_omt_clone_pool(&live_list_reverse, 
                                    logger->live_list_reverse,
                                    sizeof(XID_PAIR_S));
            assert_zero(r);
            r = toku_omt_clone_noptr(&live_root_txns,
                                     logger->live_root_txns);
            assert_zero(r);
            // take advantage of surrounding mutex, update stats.
            size_t buffsize = bnc->n_bytes_in_buffer;
            STATUS_VALUE(BRT_MSG_BYTES_OUT) += buffsize;
            // may be misleading if there's a broadcast message in there
            STATUS_VALUE(BRT_MSG_BYTES_CURR) -= buffsize;
            toku_pthread_mutex_unlock(&logger->txn_list_lock);

        }
        // Perform the garbage collection.
        brt_leaf_gc_all_les(child, snapshot_txnids, live_list_reverse);
        brt_leaf_gc_all_les(child, snapshot_txnids, live_list_reverse, live_root_txns);

        // Free the OMT's we used for garbage collecting.
        toku_omt_destroy(&snapshot_txnids);
@@ -2485,7 +2499,8 @@ brtnode_put_cmd (
    BRT_MSG cmd, 
    bool is_fresh,
    OMT snapshot_txnids,
    OMT live_list_reverse
    OMT live_list_reverse,
    OMT live_root_txns
    )
// Effect: Push CMD into the subtree rooted at NODE.
//   If NODE is a leaf, then
@@ -2510,7 +2525,8 @@ brtnode_put_cmd (
            cmd, 
            &workdone,
            snapshot_txnids,
            live_list_reverse
            live_list_reverse,
            live_root_txns
            );
    } else {
        brt_nonleaf_put_cmd(compare_fun, desc, node, cmd, is_fresh);
@@ -2532,7 +2548,8 @@ void toku_apply_cmd_to_leaf(
    BRT_MSG cmd, 
    uint64_t *workdone,
    OMT snapshot_txnids,
    OMT live_list_reverse
    OMT live_list_reverse,
    OMT live_root_txns
    ) 
{
    VERIFY_NODE(t, node);
@@ -2575,7 +2592,8 @@ void toku_apply_cmd_to_leaf(
                             cmd,
                             workdone,
                             snapshot_txnids,
                             live_list_reverse);
                             live_list_reverse,
                             live_root_txns);
        } else {
            STATUS_VALUE(BRT_MSN_DISCARDS)++;
        }
@@ -2592,7 +2610,8 @@ void toku_apply_cmd_to_leaf(
                                 cmd,
                                 workdone,
                                 snapshot_txnids,
                                 live_list_reverse);
                                 live_list_reverse,
                                 live_root_txns);
            } else {
                STATUS_VALUE(BRT_MSN_DISCARDS)++;
            }
@@ -2613,6 +2632,7 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
    TOKULOGGER logger = toku_cachefile_logger(brt->cf);
    OMT snapshot_txnids = logger ? logger->snapshot_txnids : NULL;
    OMT live_list_reverse = logger ? logger->live_list_reverse : NULL;
    OMT live_root_txns = logger ? logger->live_root_txns : NULL;
    MSN cmd_msn = cmd->msn;
    invariant(cmd_msn.msn > node->max_msn_applied_to_node_on_disk.msn);
    brtnode_put_cmd(
@@ -2623,7 +2643,8 @@ static void push_something_at_root (BRT brt, BRTNODE *nodep, BRT_MSG cmd)
        cmd,
        true,
        snapshot_txnids,
        live_list_reverse
        live_list_reverse,
        live_root_txns
        );
    //
    // assumption is that brtnode_put_cmd will
@@ -4478,7 +4499,7 @@ does_txn_read_entry(TXNID id, TOKUTXN context) {
    if (id < oldest_live_in_snapshot || id == context->ancestor_txnid64) {
	rval = TOKUDB_ACCEPT;
    }
    else if (id > context->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(context, id)) {
    else if (id > context->snapshot_txnid64 || toku_is_txn_in_live_root_txn_list(context->live_root_txn_list, id)) {
	rval = 0;
    }
    else {
@@ -4791,7 +4812,7 @@ do_brt_leaf_put_cmd(BRT t, BRTNODE leafnode, BASEMENTNODE bn, BRTNODE ancestor,
        toku_fill_dbt(&hk, key, keylen);
        DBT hv;
        BRT_MSG_S brtcmd = { type, msn, xids, .u.id = { &hk, toku_fill_dbt(&hv, val, vallen) } };
        brt_leaf_put_cmd(t->compare_fun, t->update_fun, &t->h->cmp_descriptor, leafnode, bn, &brtcmd, &BP_WORKDONE(ancestor, childnum), NULL, NULL);  // pass NULL omts (snapshot_txnids and live_list_reverse) to prevent GC from running on message application for a query
        brt_leaf_put_cmd(t->compare_fun, t->update_fun, &t->h->cmp_descriptor, leafnode, bn, &brtcmd, &BP_WORKDONE(ancestor, childnum), NULL, NULL, NULL);  // pass NULL omts (snapshot_txnids and live_list_reverse) to prevent GC from running on message application for a query
    } else {
        STATUS_VALUE(BRT_MSN_DISCARDS)++;
    }
+1 −1
Original line number Diff line number Diff line
@@ -2740,7 +2740,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
    DBT theval = { .data = val, .size = vallen };
    BRT_MSG_S cmd = { BRT_INSERT, ZERO_MSN, lbuf->xids, .u.id = { &thekey, &theval } };
    uint64_t workdone=0;
    brt_leaf_apply_cmd_once(leafnode, BLB(leafnode,0), &cmd, idx, NULL, NULL, NULL, &workdone);
    brt_leaf_apply_cmd_once(leafnode, BLB(leafnode,0), &cmd, idx, NULL, NULL, NULL, NULL, &workdone);
}

static int write_literal(struct dbout *out, void*data,  size_t len) {
+11 −15
Original line number Diff line number Diff line
@@ -158,9 +158,9 @@ toku_find_xid_by_xid (OMTVALUE v, void *xidv) {
int
toku_find_pair_by_xid (OMTVALUE v, void *xidv) {
    XID_PAIR pair = v;
    TXNID *xidfind = xidv;
    if (pair->xid1<*xidfind) return -1;
    if (pair->xid1>*xidfind) return +1;
    TXNID xidfind = (TXNID)xidv;
    if (pair->xid1<xidfind) return -1;
    if (pair->xid1>xidfind) return +1;
    return 0;
}

@@ -176,17 +176,17 @@ static int
live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), void*txnv) {
    TOKUTXN txn = txnv;
    TXNID xid = txn->txnid64;          // xid of txn that is closing
    TXNID *live_xid = live_xidv;       // xid on closing txn's live list
    TXNID live_xid = (TXNID)live_xidv;       // xid on closing txn's live list
    OMTVALUE pairv;
    XID_PAIR pair;
    uint32_t idx;

    int r;
    OMT reverse = txn->logger->live_list_reverse;
    r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, live_xid, &pairv, &idx);
    r = toku_omt_find_zero(reverse, toku_find_pair_by_xid, (void *)live_xid, &pairv, &idx);
    invariant(r==0);
    pair = pairv;
    invariant(pair->xid1 == *live_xid); //sanity check
    invariant(pair->xid1 == live_xid); //sanity check
    if (pair->xid2 == xid) {
        //There is a record that needs to be either deleted or updated
        TXNID olderxid;
@@ -200,7 +200,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
            //There is an older txn
            olderxid = (TXNID) olderv;
            invariant(olderxid < xid);
            if (olderxid >= *live_xid) {
            if (olderxid >= live_xid) {
                //older txn is new enough, we need to update.
                pair->xid2 = olderxid;
                should_delete = FALSE;
@@ -296,12 +296,13 @@ void toku_rollback_txn_close (TOKUTXN txn) {
        }

        if (txn->parent==NULL) {
            OMTVALUE txnagain;
            OMTVALUE v;
            u_int32_t idx;
            //Remove txn from list of live root txns
            r = toku_omt_find_zero(logger->live_root_txns, find_xid, txn, &txnagain, &idx);
            r = toku_omt_find_zero(logger->live_root_txns, toku_find_xid_by_xid, (OMTVALUE)txn->txnid64, &v, &idx);
            assert(r==0);
            assert(txn==txnagain);
            TXNID xid = (TXNID) v;
            invariant(xid == txn->txnid64);
            r = toku_omt_delete_at(logger->live_root_txns, idx);
            assert(r==0);
        }
@@ -328,11 +329,6 @@ void toku_rollback_txn_close (TOKUTXN txn) {
            {
                //Free memory used for live root txns local list
                invariant(toku_omt_size(txn->live_root_txn_list) > 0);
                OMTVALUE v;
                //store a single array of txnids
                r = toku_omt_fetch(txn->live_root_txn_list, 0, &v);
                invariant(r==0);
                toku_free(v);
                toku_omt_destroy(&txn->live_root_txn_list);
            }
        }
Loading