Commit ecbeb28f authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel
Browse files

[t:4551] implement binary search version of toku_brt_search_which_child

git-svn-id: file:///svn/toku/tokudb@40158 c7de825b-a66e-492c-adef-691d508d4ae1
parent 81d05703
Loading
Loading
Loading
Loading
+71 −0
Original line number Diff line number Diff line
@@ -4517,6 +4517,7 @@ static void search_save_bound (brt_search_t *search, DBT *pivot) {
    search->have_pivot_bound = TRUE;
}

static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_compare_func cmp, DBT *pivot) __attribute__((unused));
static BOOL search_pivot_is_bounded (brt_search_t *search, DESCRIPTOR desc, brt_compare_func cmp, DBT *pivot)
// Effect:  Return TRUE iff the pivot has already been searched (for fixing #3522.)
//  If searching from left to right, if we have already searched all the values less than pivot, we don't want to search again.
@@ -5244,6 +5245,14 @@ brt_search_child(BRT brt, BRTNODE node, int childnum, brt_search_t *search, BRT_
    return r;
}

static inline int
search_which_child_cmp_with_bound(DB *db, brt_compare_func cmp, BRTNODE node, int childnum, brt_search_t *search, DBT *dbt)
{
    struct kv_pair *pivot = node->childkeys[childnum];
    toku_fill_dbt(dbt, kv_pair_key(pivot), kv_pair_keylen(pivot));
    return cmp(db, dbt, &search->pivot_bound);
}

int
toku_brt_search_which_child(
    DESCRIPTOR desc,
@@ -5252,6 +5261,67 @@ toku_brt_search_which_child(
    brt_search_t *search
    )
{
#define DO_SEARCH_WHICH_CHILD_BINARY 1
#if DO_SEARCH_WHICH_CHILD_BINARY
    if (node->n_children <= 1) return 0;

    DBT pivotkey;
    toku_init_dbt(&pivotkey);
    int lo = 0;
    int hi = node->n_children - 1;
    int mi;
    while (lo < hi) {
        mi = (lo + hi) / 2;
        struct kv_pair *pivot = node->childkeys[mi];
        toku_fill_dbt(&pivotkey, kv_pair_key(pivot), kv_pair_keylen(pivot));
        // search->compare is really strange, and only works well with a
        // linear search, it makes binary search a pita.
        //
        // if you are searching left to right, it returns
        //   "0" for pivots that are < the target, and
        //   "1" for pivots that are >= the target
        // if you are searching right to left, it's the opposite.
        //
        // so if we're searching from the left and search->compare says
        // "1", we want to go left from here, if it says "0" we want to go
        // right.  searching from the right does the opposite.
        bool c = search->compare(search, &pivotkey);
        if (((search->direction == BRT_SEARCH_LEFT) && c) ||
            ((search->direction == BRT_SEARCH_RIGHT) && !c)) {
            hi = mi;
        } else {
            assert(((search->direction == BRT_SEARCH_LEFT) && !c) ||
                   ((search->direction == BRT_SEARCH_RIGHT) && c));
            lo = mi + 1;
        }
    }
    // ready to return something, if the pivot is bounded, we have to move
    // over a bit to get away from what we've already searched
    if (search->have_pivot_bound) {
        FAKE_DB(db, tmpdesc, desc);
        if (search->direction == BRT_SEARCH_LEFT) {
            while (lo < node->n_children - 1 &&
                   search_which_child_cmp_with_bound(&db, cmp, node, lo, search, &pivotkey) <= 0) {
                // searching left to right, if the comparison says the
                // current pivot (lo) is left of or equal to our bound,
                // don't search that child again
                lo++;
            }
        } else {
            while (lo > 0 &&
                   search_which_child_cmp_with_bound(&db, cmp, node, lo - 1, search, &pivotkey) >= 0) {
                // searching right to left, same argument as just above
                // (but we had to pass lo - 1 because the pivot between lo
                // and the thing just less than it is at that position in
                // the childkeys array)
                lo--;
            }
        }
    }
    return lo;
#endif
#define DO_SEARCH_WHICH_CHILD_LINEAR 0
#if DO_SEARCH_WHICH_CHILD_LINEAR
    int c;
    DBT pivotkey;
    toku_init_dbt(&pivotkey);
@@ -5273,6 +5343,7 @@ toku_brt_search_which_child(
    }
    /* check the first (left) or last (right) node if nothing has been found */
    return child[c];
#endif
}

static void