Commit dc44e1d3 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel
Browse files

[t:4527],[t:4528], merge fixes to main

git-svn-id: file:///svn/toku/tokudb@40084 c7de825b-a66e-492c-adef-691d508d4ae1
parent bf8cabd8
Loading
Loading
Loading
Loading
+12 −8
Original line number Diff line number Diff line
@@ -835,8 +835,9 @@ int toku_brtnode_pe_callback (void *brtnode_pv, PAIR_ATTR UU(old_attr), PAIR_ATT
}

static inline void
brt_status_update_partial_fetch(u_int8_t state)
brt_status_update_partial_fetch(u_int8_t UU(state))
{
#if 0
    if (state == PT_AVAIL) {
        STATUS_VALUE(BRT_PARTIAL_FETCH_HIT)++;
    }
@@ -849,6 +850,7 @@ brt_status_update_partial_fetch(u_int8_t state)
    else {
        invariant(FALSE);
    }
#endif
}

// Callback that states if a partial fetch of the node is necessary
@@ -930,12 +932,13 @@ BOOL toku_brtnode_pf_req_callback(void* brtnode_pv, void* read_extraargs) {

static void
brt_status_update_partial_fetch_reason(
    struct brtnode_fetch_extra *bfe,
    int i,
    int state,
    BOOL is_leaf
    struct brtnode_fetch_extra* UU(bfe),
    int UU(i),
    int UU(state),
    BOOL UU(is_leaf)
    )
{
#if 0
    invariant(state == PT_COMPRESSED || state == PT_ON_DISK);
    if (is_leaf) {
        if (bfe->type == brtnode_fetch_prefetch) {
@@ -991,6 +994,7 @@ brt_status_update_partial_fetch_reason(
            }
        }
    }
#endif
}

// callback for partially reading a node
@@ -5283,7 +5287,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
{
    int r;
    uint trycount = 0;     // How many tries did it take to get the result?
    uint root_tries = 0;   // How many times did we fetch the root node from disk?
    //uint root_tries = 0;   // How many times did we fetch the root node from disk?
    uint tree_height;      // How high is the tree?  This is the height of the root node plus one (leaf is at height 0).

try_again:
@@ -5395,7 +5399,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
        int r2 = getf(0,NULL, 0,NULL, getf_v, false);
        if (r2!=0) r = r2;
    }

#if 0
    {   // accounting (to detect and measure thrashing)
        uint retrycount = trycount - 1;         // how many retries were needed?
        STATUS_VALUE(BRT_TOTAL_SEARCHES)++;
@@ -5414,7 +5418,7 @@ toku_brt_search (BRT brt, brt_search_t *search, BRT_GET_CALLBACK_FUNCTION getf,
                STATUS_VALUE(BRT_SEARCH_TRIES_GT_HEIGHTPLUS3)++;
        }
    }

#endif
    return r;
}

+62 −48
Original line number Diff line number Diff line
@@ -46,8 +46,6 @@ static void cachetable_partial_reader(WORKITEM);
// These should be in the cachetable object, but we make them file-wide so that gdb can get them easily.
// They were left here after engine status cleanup (#2949, rather than moved into the status struct)
// so they are still easily available to the debugger and to save lots of typing.
static u_int64_t cachetable_lock_taken = 0;
static u_int64_t cachetable_lock_released = 0;
static u_int64_t cachetable_hit;
static u_int64_t cachetable_miss;
static u_int64_t cachetable_misstime;     // time spent waiting for disk read
@@ -79,8 +77,6 @@ status_init(void) {
    // Note, this function initializes the keyname, type, and legend fields.
    // Value fields are initialized to zero by compiler.

    STATUS_INIT(CT_LOCK_TAKEN,             UINT64, "lock taken");
    STATUS_INIT(CT_LOCK_RELEASED,          UINT64, "lock released");
    STATUS_INIT(CT_HIT,                    UINT64, "hit");
    STATUS_INIT(CT_MISS,                   UINT64, "miss");
    STATUS_INIT(CT_MISSTIME,               UINT64, "miss time");
@@ -179,7 +175,7 @@ static PAIR_ATTR const zero_attr = {
    .cache_pressure_size = 0
};

static void maybe_flush_some (CACHETABLE ct, long size);
static void maybe_flush_some (CACHETABLE ct, long size, BOOL ct_locked);

static inline void
ctpair_add_ref(PAIR p) {
@@ -261,8 +257,6 @@ toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
    if (!ct_status.initialized)
	status_init();

    STATUS_VALUE(CT_LOCK_TAKEN)             = cachetable_lock_taken;
    STATUS_VALUE(CT_LOCK_RELEASED)          = cachetable_lock_released;
    STATUS_VALUE(CT_HIT)                    = cachetable_hit;
    STATUS_VALUE(CT_MISS)                   = cachetable_miss;
    STATUS_VALUE(CT_MISSTIME)               = cachetable_misstime;
@@ -315,12 +309,10 @@ static inline void cachefiles_unlock(CACHETABLE ct) {
// Lock the cachetable
static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) {
    int r = toku_pthread_mutex_lock(ct->mutex); resource_assert_zero(r);;
    cachetable_lock_taken++;
}

// Unlock the cachetable
static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
    cachetable_lock_released++;
    int r = toku_pthread_mutex_unlock(ct->mutex); resource_assert_zero(r);
}

@@ -540,7 +532,7 @@ u_int64_t toku_cachetable_reserve_memory(CACHETABLE ct, double fraction) {
    cachetable_wait_write(ct);
    uint64_t reserved_memory = fraction*(ct->size_limit-ct->size_reserved);
    ct->size_reserved += reserved_memory;
    maybe_flush_some(ct, reserved_memory);    
    maybe_flush_some(ct, reserved_memory, TRUE);    
    ct->size_current += reserved_memory;
    cachetable_unlock(ct);
    return reserved_memory;
@@ -1621,17 +1613,20 @@ static void cachetable_partial_eviction(WORKITEM wi) {
}


static void maybe_flush_some (CACHETABLE ct, long size) {
static void maybe_flush_some (CACHETABLE ct, long size, BOOL ct_locked) {

    //
    // These variables will help us detect if everything in the clock is currently being accessed.
    // We must detect this case otherwise we will end up in an infinite loop below.
    //
    if (size + ct->size_current <= ct->size_limit + ct->size_evicting) return;

    CACHEKEY curr_cachekey;
    curr_cachekey.b = INT64_MAX; // create initial value so compiler does not complain
    FILENUM curr_filenum;
    curr_filenum.fileid = UINT32_MAX; // create initial value so compiler does not complain
    BOOL set_val = FALSE;
    if (!ct_locked) cachetable_lock(ct);
    
    while ((ct->clock_head) && (size + ct->size_current > ct->size_limit + ct->size_evicting)) {
        PAIR curr_in_clock = ct->clock_head;
@@ -1728,12 +1723,13 @@ static void maybe_flush_some (CACHETABLE ct, long size) {
        cachetable_rehash(ct, ct->table_size/2);
    }
exit:
    if (!ct_locked) cachetable_unlock(ct);
    return;
}

void toku_cachetable_maybe_flush_some(CACHETABLE ct) {
    cachetable_lock(ct);
    maybe_flush_some(ct, 0);
    maybe_flush_some(ct, 0, TRUE);
    cachetable_unlock(ct);
}

@@ -1781,6 +1777,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
    return p;
}

/*
enum { hash_histogram_max = 100 };
static unsigned long long hash_histogram[hash_histogram_max];
void toku_cachetable_print_hash_histogram (void) {
@@ -1797,6 +1794,7 @@ note_hash_count (int count) {
    if (count>=hash_histogram_max) count=hash_histogram_max-1;
    hash_histogram[count]++;
}
*/

// has ct locked on entry
// This function MUST NOT release and reacquire the cachetable lock
@@ -1847,7 +1845,7 @@ static int cachetable_put_internal(
        );
    assert(p);
    nb_mutex_write_lock(&p->nb_mutex, ct->mutex);
    note_hash_count(count);
    //note_hash_count(count);
    return 0;
}

@@ -1866,7 +1864,7 @@ static int cachetable_get_pair (CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
            break;
        }
    }
    note_hash_count(count);
    //note_hash_count(count);
    return r;
}

@@ -1984,7 +1982,7 @@ int toku_cachetable_put_with_dep_pairs(
    // is used to ensure that a checkpoint is not begun during 
    // cachetable_put_internal
    // 
    maybe_flush_some(ct, attr.size);
    maybe_flush_some(ct, attr.size, TRUE);
    int rval;
    {
	BEGIN_CRITICAL_REGION;   // checkpoint may not begin inside critical region, detect and crash if one begins
@@ -2026,7 +2024,7 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v
    CACHETABLE ct = cachefile->cachetable;
    cachetable_lock(ct);
    cachetable_wait_write(ct);
    maybe_flush_some(ct, attr.size);
    maybe_flush_some(ct, attr.size, TRUE);
    int r = cachetable_put_internal(
        cachefile,
        key,
@@ -2175,6 +2173,9 @@ int toku_cachetable_get_and_pin (
        );
}

static BOOL resolve_checkpointing_fast(PAIR p) {
    return !(p->checkpoint_pending && (p->dirty == CACHETABLE_DIRTY));
}

int toku_cachetable_get_and_pin_with_dep_pairs (
    CACHEFILE cachefile, 
@@ -2204,6 +2205,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
    for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
        count++;
        if (p->key.b==key.b && p->cachefile==cachefile) {
            //note_hash_count(count);
            // still have the cachetable lock
            //
            // at this point, we know the node is at least partially in memory,
@@ -2218,7 +2220,22 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
                cachetable_wait_writing++;
            }
            nb_mutex_write_lock(&p->nb_mutex, ct->mutex);
            pair_touch(p);
            // used for shortcutting a path to getting the user the data
            // helps scalability for in-memory workloads
            BOOL fast_checkpointing = (resolve_checkpointing_fast(p) && num_dependent_pairs == 0);
            if (p->checkpoint_pending && fast_checkpointing) write_locked_pair_for_checkpoint(ct, p);
            cachetable_unlock(ct);
            BOOL partial_fetch_required = pf_req_callback(p->value,read_extraargs);
            // shortcutting a path to getting the user the data
            // helps scalability for in-memory workloads
            if (!partial_fetch_required && fast_checkpointing) {
                *value = p->value;
                if (sizep) *sizep = p->attr.size;
                maybe_flush_some(ct, 0, FALSE);
                return 0;
            }
            cachetable_lock(ct);
            //
            // Just because the PAIR exists does necessarily mean the all the data the caller requires
            // is in memory. A partial fetch may be required, which is evaluated above
@@ -2233,15 +2250,11 @@ int toku_cachetable_get_and_pin_with_dep_pairs (

                do_partial_fetch(ct, cachefile, p, pf_callback, read_extraargs, TRUE);
            }

            pair_touch(p);
            cachetable_hit++;
            note_hash_count(count);
            //cachetable_hit++;
            WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
            goto got_value;
	}
    }
    note_hash_count(count);
    // Note.  hashit(t,key) may have changed as a result of flushing.  But fullhash won't have changed.
    // The pair was not found, we must retrieve it from disk 
    {
@@ -2316,7 +2329,7 @@ int toku_cachetable_get_and_pin_with_dep_pairs (
	END_CRITICAL_REGION;    // checkpoint after this point would no longer cause a threadsafety bug
    }

    maybe_flush_some(ct, 0);
    maybe_flush_some(ct, 0, TRUE);
    cachetable_unlock(ct);
    WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
    return 0;
@@ -2357,7 +2370,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
            break;
	}
    }
    note_hash_count(count);
    //note_hash_count(count);
    cachetable_unlock(ct);
    return r;
}
@@ -2388,7 +2401,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
            break;
	}
    }
    note_hash_count(count);
    //note_hash_count(count);
    cachetable_unlock(ct);
    return r;
}
@@ -2426,14 +2439,14 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
	    WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
	    {
                if (flush) {
                    maybe_flush_some(ct, 0);
                    maybe_flush_some(ct, 0, TRUE);
                }
	    }
            r = 0; // we found one
            break;
	}
    }
    note_hash_count(count);
    //note_hash_count(count);
    if (!have_ct_lock) cachetable_unlock(ct);
    return r;
}
@@ -2462,7 +2475,7 @@ int toku_cachetable_get_and_pin_nonblocking (
    CACHEKEY key, 
    u_int32_t fullhash, 
    void**value, 
    long *sizep,
    long* UU(sizep),
    CACHETABLE_WRITE_CALLBACK write_callback,
    CACHETABLE_FETCH_CALLBACK fetch_callback, 
    CACHETABLE_PARTIAL_FETCH_REQUIRED_CALLBACK pf_req_callback,
@@ -2489,7 +2502,7 @@ int toku_cachetable_get_and_pin_nonblocking (
    for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
	count++;
	if (p->key.b==key.b && p->cachefile==cf) {
	    note_hash_count(count);
	    //note_hash_count(count);

            //
            // In Doofenshmirts, we keep the root to leaf path pinned
@@ -2503,9 +2516,14 @@ int toku_cachetable_get_and_pin_nonblocking (
            // Otherwise, if there is no write lock grabbed, we know there will 
            // be no stall, so we grab the lock and return to the user
            //
            if (!nb_mutex_writers(&p->nb_mutex) && !p->checkpoint_pending) {
                cachetable_hit++;
            if (!nb_mutex_writers(&p->nb_mutex) && resolve_checkpointing_fast(p)) {
                //cachetable_hit++;
                nb_mutex_write_lock(&p->nb_mutex, ct->mutex);
                if (p->checkpoint_pending) {
                    write_locked_pair_for_checkpoint(ct, p);
                }
                pair_touch(p);
                cachetable_unlock(ct);
                BOOL partial_fetch_required = pf_req_callback(p->value,read_extraargs);
                //
                // Just because the PAIR exists does necessarily mean the all the data the caller requires
@@ -2514,6 +2532,7 @@ int toku_cachetable_get_and_pin_nonblocking (
                // and then call a callback to retrieve what we need
                //
                if (partial_fetch_required) {
                    cachetable_lock(ct);
                    p->state = CTPAIR_READING;
                    run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
                    // Now wait for the I/O to occur.    
@@ -2521,14 +2540,11 @@ int toku_cachetable_get_and_pin_nonblocking (
                    cachetable_unlock(ct);
                    return TOKUDB_TRY_AGAIN;
                }
                pair_touch(p);
                else {
                    *value = p->value;
                if (sizep) *sizep = p->attr.size;
                // for ticket #3755
                assert(!p->checkpoint_pending);
                cachetable_unlock(ct);
                    return 0;
                }
            }
            else {
                run_unlockers(unlockers); // The contract says the unlockers are run with the ct lock being held.
                // Now wait for the I/O to occur.
@@ -2720,7 +2736,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
         ptr_to_p = &p->hash_chain,                p = *ptr_to_p) {
        count++;
        if (p->key.b==oldkey.b && p->cachefile==cachefile) {
            note_hash_count(count);
            //note_hash_count(count);
            *ptr_to_p = p->hash_chain;
            p->key = newkey;
            u_int32_t new_fullhash = toku_cachetable_hash(cachefile, newkey);
@@ -2732,7 +2748,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
            return 0;
        }
    }
    note_hash_count(count);
    //note_hash_count(count);
    cachetable_unlock(ct);
    return -1;
}
@@ -3220,7 +3236,7 @@ int toku_cachetable_unpin_and_remove (
        }
    }
 done:
    note_hash_count(count);
    //note_hash_count(count);
    cachetable_unlock(ct);
    return r;
}
@@ -3722,7 +3738,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
    for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
	count++;
        if (p->key.b == key.b && p->cachefile == cf) {
	    note_hash_count(count);
	    //note_hash_count(count);
            if (value_ptr)
                *value_ptr = p->value;
            if (dirty_ptr)
@@ -3735,7 +3751,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
            break;
        }
    }
    note_hash_count(count);
    //note_hash_count(count);
    cachetable_unlock(ct);
    return r;
}
@@ -4003,8 +4019,6 @@ void __attribute__((__constructor__)) toku_cachetable_drd_ignore(void);
void
toku_cachetable_drd_ignore(void) {
    // incremented only while lock is held, but read by engine status asynchronously.
    DRD_IGNORE_VAR(STATUS_VALUE(CT_LOCK_TAKEN));
    DRD_IGNORE_VAR(STATUS_VALUE(CT_LOCK_RELEASED));
    DRD_IGNORE_VAR(STATUS_VALUE(CT_EVICTIONS));
}

+1 −3
Original line number Diff line number Diff line
@@ -470,9 +470,7 @@ void toku_cachetable_maybe_flush_some(CACHETABLE ct);
u_int64_t toku_cachefile_size_in_memory(CACHEFILE cf);

typedef enum {
    CT_LOCK_TAKEN = 0,
    CT_LOCK_RELEASED,
    CT_HIT,
    CT_HIT = 0,
    CT_MISS,
    CT_MISSTIME,               // how many usec spent waiting for disk read because of cache miss
    CT_WAITTIME,               // how many usec spent waiting for another thread to release cache line
+1 −1
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ run_test (void) {
    r = toku_cachetable_get_and_pin(f1, make_blocknum(1), 1, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, NULL);
    r = toku_cachetable_begin_checkpoint(ct, NULL);
    // mark nodes as pending a checkpoint, so that get_and_pin_nonblocking on block 1 will return TOKUDB_TRY_AGAIN
    r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_CLEAN, make_pair_attr(8)); assert(r==0);
    r = toku_cachetable_unpin(f1, make_blocknum(1), 1, CACHETABLE_DIRTY, make_pair_attr(8)); assert(r==0);

    r = toku_cachetable_get_and_pin(f1, make_blocknum(2), 2, &v2, &s2, def_write_callback(NULL), def_fetch, def_pf_req_callback, def_pf_callback, NULL);
    // now we try to pin 1, and it should get evicted out from under us
+0 −2
Original line number Diff line number Diff line
@@ -49,8 +49,6 @@ cachetable_debug_test (int n) {
    }
    toku_cachetable_verify(ct);

    if (verbose) toku_cachetable_print_hash_histogram();

    r = toku_cachefile_close(&f1, 0, FALSE, ZERO_LSN); assert(r == 0 && f1 == 0);
    r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
Loading