Commit 5db59889 authored by unknown's avatar unknown
Browse files

ndb - wl#1497 LinearPool seize_index


storage/ndb/src/kernel/vm/LinearPool.hpp:
  wl#1497 LinearPool seize_index
storage/ndb/src/kernel/vm/testSuperPool.cpp:
  wl#1497 LinearPool seize_index
parent 081cc155
Loading
Loading
Loading
Loading
+251 −75
Original line number Diff line number Diff line
@@ -17,7 +17,8 @@
#ifndef LINEAR_POOL_HPP
#define LINEAR_POOL_HPP

#include <SuperPool.hpp>
#include <Bitmask.hpp>
#include "SuperPool.hpp"

/*
 * LinearPool - indexed record pool
@@ -38,22 +39,26 @@
 *
 * This works exactly like numbers in a given base.  Each map has base
 * size entries.  For implementation convenience the base must be power
 * of 2 and is given as its log2 value.
 * of 2 between 2^1 and 2^15.  It is given by its log2 value (1-15).
 *
 * A position in a map is also called a "digit".
 *
 * There is a doubly linked list of available maps (some free entries)
 * on each level.  There is a singly linked free list within each map.
 * on each level.  There is a doubly linked freelist within each map.
 * There is also a bitmask of used entries in each map.
 *
 * Level 0 free entry has space for one record.  Level N free entry
 * implies space for base^N records.  The implied levels are created and
 * removed on demand.  Completely empty maps are removed.
 * removed on demand.  Empty maps are usually removed.
 *
 * Default base is 256 (log2 = 8) which requires maximum 4 levels or
 * "digits" (similar to ip address).
 * digits (similar to ip address).
 *
 * TODO
 *
 * - move most of the inline code to LinearPool.cpp
 * - add methods to check / seize user-specified index
 * - optimize for common case
 * - add optimized 2-level implementation (?)
 */

#include "SuperPool.hpp"
@@ -71,6 +76,9 @@ class LinearPool {
  // Max possible levels (0 to max root level).
  STATIC_CONST( MaxLevels = (32 + LogBase - 1) / LogBase );

  // Number of words in map used bit mask.
  STATIC_CONST( BitmaskSize = (Base + 31) / 32 );

  // Map.
  struct Map {
    Uint32 m_level;
@@ -80,6 +88,7 @@ class LinearPool {
    Uint32 m_index;     // from root to here
    PtrI m_nextavail;
    PtrI m_prevavail;
    Uint32 m_bitmask[BitmaskSize];
    PtrI m_entry[Base];
  };

@@ -97,9 +106,15 @@ public:
  // Allocate record from the pool.  Reuses free index if possible.
  bool seize(Ptr<T>& ptr);

  // Allocate given index.  Like seize but returns -1 if in use.
  int seize_index(Ptr<T>& ptr, Uint32 index);

  // Return record to the pool.
  void release(Ptr<T>& ptr);

  // Return number of used records (may require 1 page scan).
  Uint32 count();

  // Verify (debugging).
  void verify();

@@ -114,6 +129,12 @@ private:
  // Add new non-root map.
  bool add_map(Ptr<Map>& map_ptr, Ptr<Map> parent_ptr, Uint32 digit);

  // Subroutine to initialize map free lists.
  void init_free(Ptr<Map> map_ptr);

  // Add entry at given free position.
  void add_entry(Ptr<Map> map_ptr, Uint32 digit, PtrI ptr_i);

  // Remove entry and map if it becomes empty.
  void remove_entry(Ptr<Map> map_ptr, Uint32 digit);

@@ -126,8 +147,11 @@ private:
  // Remove map from available list.
  void remove_avail(Ptr<Map> map_ptr);

  // Verify available lists
  void verify_avail();

  // Verify map (recursive).
  void verify(Ptr<Map> map_ptr, Uint32 level);
  void verify_map(Ptr<Map> map_ptr, Uint32 level, Uint32* count);

  RecordPool<T> m_records;
  RecordPool<Map> m_maps;
@@ -166,6 +190,7 @@ LinearPool<T, LogBase>::getPtr(Ptr<T>& ptr)
  // get record
  Ptr<T> rec_ptr;
  Uint32 digit = index & DigitMask;
  assert(BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
  rec_ptr.i = map_ptr.p->m_entry[digit];
  m_records.getPtr(rec_ptr);
  ptr.p = rec_ptr.p;
@@ -193,26 +218,20 @@ LinearPool<T, LogBase>::seize(Ptr<T>& ptr)
  }
  m_maps.getPtr(map_ptr);
  // walk down creating missing levels and using an entry on each
  Uint32 firstfree;
  Uint32 digit;
  Ptr<Map> new_ptr;
  new_ptr.i = RNIL;
  while (true) {
    assert(map_ptr.p->m_occup < Base);
    map_ptr.p->m_occup++;
    firstfree = map_ptr.p->m_firstfree;
    assert(firstfree < Base);
    map_ptr.p->m_firstfree = map_ptr.p->m_entry[firstfree];
    if (map_ptr.p->m_occup == Base) {
      assert(map_ptr.p->m_firstfree == Base);
      // remove from available list
      remove_avail(map_ptr);
    }
    digit = map_ptr.p->m_firstfree;
    if (n == 0)
      break;
    Ptr<Map> child_ptr;
    if (! add_map(child_ptr, map_ptr, firstfree)) {
      remove_entry(map_ptr, firstfree);
    if (! add_map(child_ptr, map_ptr, digit)) {
      if (new_ptr.i != RNIL)
        remove_map(new_ptr);
      return false;
    }
    map_ptr.p->m_entry[firstfree] = child_ptr.i;
    new_ptr = child_ptr;
    map_ptr = child_ptr;
    n--;
  }
@@ -220,11 +239,71 @@ LinearPool<T, LogBase>::seize(Ptr<T>& ptr)
  assert(map_ptr.p->m_level == 0);
  Ptr<T> rec_ptr;
  if (! m_records.seize(rec_ptr)) {
    remove_entry(map_ptr, firstfree);
    if (new_ptr.i != RNIL)
      remove_map(new_ptr);
    return false;
  }
  add_entry(map_ptr, digit, rec_ptr.i);
  ptr.i = digit + (map_ptr.p->m_index << LogBase);
  ptr.p = rec_ptr.p;
  return true;
}

template <class T, Uint32 LogBase>
inline int
LinearPool<T, LogBase>::seize_index(Ptr<T>& ptr, Uint32 index)
{
  // extract all digits at least up to current root level
  Uint32 digits[MaxLevels];
  Uint32 n = 0;
  Uint32 tmp = index;
  do {
    digits[n] = tmp & DigitMask;
    tmp >>= LogBase;
  } while (++n < m_levels || tmp != 0);
  // add any new root levels
  while (n > m_levels) {
    if (! add_root())
      return false;
  }
  map_ptr.p->m_entry[firstfree] = rec_ptr.i;
  ptr.i = firstfree + (map_ptr.p->m_index << LogBase);
  // start from root
  Ptr<Map> map_ptr;
  map_ptr.i = m_root;
  m_maps.getPtr(map_ptr);
  // walk down creating or re-using existing levels
  Uint32 digit;
  bool used;
  Ptr<Map> new_ptr;
  new_ptr.i = RNIL;
  while (true) {
    digit = digits[--n];
    used = BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit);
    if (n == 0)
      break;
    if (used) {
      map_ptr.i = map_ptr.p->m_entry[digit];
      m_maps.getPtr(map_ptr);
    } else {
      Ptr<Map> child_ptr;
      if (! add_map(child_ptr, map_ptr, digit)) {
        if (new_ptr.i != RNIL)
          remove_map(new_ptr);
      }
      new_ptr = child_ptr;
      map_ptr = child_ptr;
    }
  }
  // now at level 0
  assert(map_ptr.p->m_level == 0);
  Ptr<T> rec_ptr;
  if (used || ! m_records.seize(rec_ptr)) {
    if (new_ptr.i != RNIL)
      remove_map(new_ptr);
    return used ? -1 : false;
  }
  add_entry(map_ptr, digit, rec_ptr.i);
  assert(index == digit + (map_ptr.p->m_index << LogBase));
  ptr.i = index;
  ptr.p = rec_ptr.p;
  return true;
}
@@ -249,17 +328,32 @@ LinearPool<T, LogBase>::release(Ptr<T>& ptr)
  ptr.p = 0;
}

template <class T, Uint32 LogBase>
inline Uint32
LinearPool<T, LogBase>::count()
{
  SuperPool& sp = m_records.m_superPool;
  Uint32 count1 = sp.getRecUseCount(m_records.m_recInfo);
  return count1;
}

template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify()
{
  if (m_root == RNIL)
  verify_avail();
  if (m_root == RNIL) {
    assert(m_levels == 0);
    return;
  }
  assert(m_levels != 0);
  Ptr<Map> map_ptr;
  map_ptr.i = m_root;
  m_maps.getPtr(map_ptr);
  verify(map_ptr, m_levels - 1);
  Uint32 count1 = count();
  Uint32 count2 = 0;
  verify_map(map_ptr, m_levels - 1, &count2);
  assert(count1 == count2);
}

// private methods
@@ -303,26 +397,18 @@ LinearPool<T, LogBase>::add_root()
  assert(n < MaxLevels);
  // set up
  map_ptr.p->m_level = n;
  if (n == 0) {
    map_ptr.p->m_occup = 0;
    map_ptr.p->m_firstfree = 0;
  } else {
  map_ptr.p->m_parent = RNIL;
  map_ptr.p->m_index = 0;
  init_free(map_ptr);
  // on level > 0 digit 0 points to old root
    map_ptr.p->m_occup = 1;
    map_ptr.p->m_firstfree = 1;
  if (n > 0) {
    Ptr<Map> old_ptr;
    old_ptr.i = m_root;
    m_maps.getPtr(old_ptr);
    assert(old_ptr.p->m_parent == RNIL);
    old_ptr.p->m_parent = map_ptr.i;
    map_ptr.p->m_entry[0] = old_ptr.i;
    add_entry(map_ptr, 0, old_ptr.i);
  }
  // set up free list with Base as terminator
  for (Uint32 j = map_ptr.p->m_firstfree; j < Base; j++)
    map_ptr.p->m_entry[j] = j + 1;
  map_ptr.p->m_parent = RNIL;
  map_ptr.p->m_index = 0;
  add_avail(map_ptr);
  // set new root
  m_root = map_ptr.i;
  return true;
@@ -337,25 +423,81 @@ LinearPool<T, LogBase>::add_map(Ptr<Map>& map_ptr, Ptr<Map> parent_ptr, Uint32 d
  assert(parent_ptr.p->m_level != 0);
  // set up
  map_ptr.p->m_level = parent_ptr.p->m_level - 1;
  map_ptr.p->m_occup = 0;
  map_ptr.p->m_firstfree = 0;
  // set up free list with Base as terminator
  for (Uint32 j = map_ptr.p->m_firstfree; j < Base; j++)
    map_ptr.p->m_entry[j] = j + 1;
  map_ptr.p->m_parent = parent_ptr.i;
  map_ptr.p->m_index = digit + (parent_ptr.p->m_index << LogBase);
  add_avail(map_ptr);
  init_free(map_ptr);
  add_entry(parent_ptr, digit, map_ptr.i);
  return true;
}

template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::init_free(Ptr<Map> map_ptr)
{
  map_ptr.p->m_occup = 0;
  map_ptr.p->m_firstfree = 0;
  // freelist
  Uint32 j;
  Uint16 back = ZNIL;
  for (j = 0; j < Base - 1; j++) {
    map_ptr.p->m_entry[j] = back | ((j + 1) << 16);
    back = j;
  }
  map_ptr.p->m_entry[j] = back | (ZNIL << 16);
  // bitmask
  BitmaskImpl::clear(BitmaskSize, map_ptr.p->m_bitmask);
  // add to available
  add_avail(map_ptr);
}

template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::add_entry(Ptr<Map> map_ptr, Uint32 digit, PtrI ptr_i)
{
  assert(map_ptr.p->m_occup < Base && digit < Base);
  assert(! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
  // unlink from freelist
  Uint32 val = map_ptr.p->m_entry[digit];
  Uint16 back = val & ZNIL;
  Uint16 forw = val >> 16;
  if (back != ZNIL) {
    assert(back < Base);
    map_ptr.p->m_entry[back] &= ZNIL;
    map_ptr.p->m_entry[back] |= (forw << 16);
  }
  if (forw != ZNIL) {
    assert(forw < Base);
    map_ptr.p->m_entry[forw] &= (ZNIL << 16);
    map_ptr.p->m_entry[forw] |= back;
  }
  if (back == ZNIL) {
    map_ptr.p->m_firstfree = forw;
  }
  // set new value
  map_ptr.p->m_entry[digit] = ptr_i;
  map_ptr.p->m_occup++;
  BitmaskImpl::set(BitmaskSize, map_ptr.p->m_bitmask, digit);
  if (map_ptr.p->m_occup == Base)
    remove_avail(map_ptr);
}

template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::remove_entry(Ptr<Map> map_ptr, Uint32 digit)
{
  assert(map_ptr.p->m_occup != 0 && digit < Base);
  map_ptr.p->m_occup--;
  map_ptr.p->m_entry[digit] = map_ptr.p->m_firstfree;
  assert(BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, digit));
  // add to freelist
  Uint32 firstfree = map_ptr.p->m_firstfree;
  map_ptr.p->m_entry[digit] = ZNIL | (firstfree << 16);
  if (firstfree != ZNIL) {
    assert(firstfree < Base);
    map_ptr.p->m_entry[firstfree] &= (ZNIL << 16);
    map_ptr.p->m_entry[firstfree] |= digit;
  }
  map_ptr.p->m_firstfree = digit;
  map_ptr.p->m_occup--;
  BitmaskImpl::clear(BitmaskSize, map_ptr.p->m_bitmask, digit);
  if (map_ptr.p->m_occup + 1 == Base)
    add_avail(map_ptr);
  else if (map_ptr.p->m_occup == 0)
@@ -375,7 +517,7 @@ LinearPool<T, LogBase>::remove_map(Ptr<Map> map_ptr)
  m_maps.release(map_ptr);
  if (m_root == map_ptr_i) {
    assert(parent_ptr.i == RNIL);
    Uint32 used = m_maps.m_superPool.getRecUseCount(m_maps.m_recInfo);
    Uint32 used = count();
    assert(used == 0);
    m_root = RNIL;
    m_levels = 0;
@@ -431,35 +573,71 @@ LinearPool<T, LogBase>::remove_avail(Ptr<Map> map_ptr)

template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify(Ptr<Map> map_ptr, Uint32 level)
LinearPool<T, LogBase>::verify_avail()
{
  assert(level < MaxLevels);
  assert(map_ptr.p->m_level == level);
  Uint32 j = 0;
  while (j < Base) {
    bool free = false;
    Uint32 j2 = map_ptr.p->m_firstfree;
    while (j2 != Base) {
      if (j2 == j) {
        free = true;
        break;
  // check available lists
  for (Uint32 n = 0; n < MaxLevels; n++) {
    Ptr<Map> map_ptr;
    map_ptr.i = m_avail[n];
    Uint32 back = RNIL;
    while (map_ptr.i != RNIL) {
      m_maps.getPtr(map_ptr);
      assert(map_ptr.p->m_occup < Base);
      assert(back == map_ptr.p->m_prevavail);
      back = map_ptr.i;
      map_ptr.i = map_ptr.p->m_nextavail;
    }
  }
      assert(j2 < Base);
      j2 = map_ptr.p->m_entry[j2];
}
    if (! free) {

template <class T, Uint32 LogBase>
inline void
LinearPool<T, LogBase>::verify_map(Ptr<Map> map_ptr, Uint32 level, Uint32* count)
{
  assert(level < MaxLevels);
  assert(map_ptr.p->m_level == level);
  // check freelist
  {
    Uint32 nused = BitmaskImpl::count(BitmaskSize, map_ptr.p->m_bitmask);
    assert(nused <= Base);
    assert(map_ptr.p->m_occup == nused);
    Uint32 nfree = 0;
    Uint32 j = map_ptr.p->m_firstfree;
    Uint16 back = ZNIL;
    while (j != ZNIL) {
      assert(j < Base);
      assert(! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, j));
      Uint32 val = map_ptr.p->m_entry[j];
      assert(back == (val & ZNIL));
      back = j;
      j = (val >> 16);
      nfree++;
    }
    assert(nused + nfree == Base);
  }
  // check entries
  {
    for (Uint32 j = 0; j < Base; j++) {
      bool free = ! BitmaskImpl::get(BitmaskSize, map_ptr.p->m_bitmask, j);
      if (free)
        continue;
      if (level != 0) {
        Ptr<Map> child_ptr;
        child_ptr.i = map_ptr.p->m_entry[j];
        m_maps.getPtr(child_ptr);
        assert(child_ptr.p->m_parent == map_ptr.i);
        assert(child_ptr.p->m_index == j + (map_ptr.p->m_index << LogBase));
        verify(child_ptr, level - 1);
        verify_map(child_ptr, level - 1, count);
      } else {
        Ptr<T> rec_ptr;
        rec_ptr.i = map_ptr.p->m_entry[j];
        m_records.getPtr(rec_ptr);
        (*count)++;
      }
    }
  }
  // check membership on available list
  {
    Ptr<Map> avail_ptr;
    avail_ptr.i = m_avail[map_ptr.p->m_level];
    bool found = false;
@@ -473,8 +651,6 @@ LinearPool<T, LogBase>::verify(Ptr<Map> map_ptr, Uint32 level)
    }
    assert(found == (map_ptr.p->m_occup < Base));
  }
    j++;
  }
}

#endif
+49 −4
Original line number Diff line number Diff line
#if 0
make -f Makefile -f - testSuperPool <<'_eof_'
testSuperPool: testSuperPool.cpp libkernel.a
testSuperPool: testSuperPool.cpp libkernel.a LinearPool.hpp
	$(CXXCOMPILE) -o $@ $@.cpp libkernel.a -L../../common/util/.libs -lgeneral
_eof_
exit $?
@@ -57,6 +57,7 @@ random_coprime(Uint32 n)
{
  Uint32 prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
  Uint32 count = sizeof(prime) / sizeof(prime[0]);
  assert(n != 0);
  while (1) {
    Uint32 i = urandom(count);
    if (n % prime[i] != 0)
@@ -208,7 +209,7 @@ lp_test(GroupPool& gp)
  ndbout << "linear pool test" << endl;
  Ptr<T> ptr;
  Uint32 loop;
  for (loop = 0; loop < 3 * loopcount; loop++) {
  for (loop = 0; loop < loopcount; loop++) {
    int count = 0;
    while (1) {
      bool ret = lp.seize(ptr);
@@ -223,6 +224,7 @@ lp_test(GroupPool& gp)
      assert(ptr.p == ptr2.p);
      count++;
    }
    assert(count != 0);
    ndbout << "seized " << count << endl;
    switch (loop % 3) {
    case 0:
@@ -264,6 +266,47 @@ lp_test(GroupPool& gp)
      }
      break;
    }
    { Uint32 cnt = lp.count(); assert(cnt == 0); }
    // seize_index test
    char *used = new char [10 * count];
    memset(used, false, sizeof(used));
    Uint32 i, ns = 0, nr = 0;
    for (i = 0; i < count; i++) {
      Uint32 index = urandom(10 * count);
      if (used[index]) {
        ptr.i = index;
        lp.release(ptr);
        lp.verify();
        nr++;
      } else {
        int i = lp.seize_index(ptr, index);
        assert(i >= 0);
        lp.verify();
        if (i == 0) // no space
          continue;
        assert(ptr.i == index);
        Ptr<T> ptr2;
        ptr2.i = ptr.i;
        ptr2.p = 0;
        lp.getPtr(ptr2);
        assert(ptr.p == ptr2.p);
        ns++;
      }
      used[index] = ! used[index];
    }
    ndbout << "random sparse seize " << ns << " release " << nr << endl;
    nr = 0;
    for (i = 0; i < 10 * count; i++) {
      if (used[i]) {
        ptr.i = i;
        lp.release(ptr);
        lp.verify();
        used[i] = false;
        nr++;
      }
    }
    ndbout << "released " << nr << endl;
    { Uint32 cnt = lp.count(); assert(cnt == 0); }
  }
}

@@ -291,8 +334,10 @@ template static void sp_test<T5>(GroupPool& sp);
template static void lp_test<T3>(GroupPool& sp);

int
main()
main(int argc, char** argv)
{
  if (argc > 1 && strncmp(argv[1], "-l", 2) == 0)
    loopcount = atoi(argv[1] + 2);
  HeapPool sp(pageSize, pageBits);
  sp.setInitPages(7);
  sp.setMaxPages(7);
@@ -304,7 +349,7 @@ main()
  ndbout << "rand " << s << endl;
  int count;
  count = 0;
  while (++count <= 0) {
  while (++count <= 0) { // change to 1 to find new bug
    sp_test<T1>(gp);
    sp_test<T2>(gp);
    sp_test<T3>(gp);