Commit 3c09c1c3 authored by unknown's avatar unknown
Browse files

ndb - bug#25711

  fix cpu peak in big clusters during unpack of config


ndb/src/common/util/ConfigValues.cpp:
  use bin-search instead of hash (as keys collide too much)
parent fed5cb6a
Loading
Loading
Loading
Loading
+120 −84
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ static const char Magic[] = { 'N', 'D', 'B', 'C', 'O', 'N', 'F', 'V' };

//#define DEBUG_CV
#ifdef DEBUG_CV
#define DEBUG
#define DEBUG if(getenv("CV_DEBUG"))
#else
#define DEBUG if(0)
#endif
@@ -202,62 +202,60 @@ ConfigValues::Iterator::set(Uint32 key, const char * value){
static
bool
findKey(const Uint32 * values, Uint32 sz, Uint32 key, Uint32 * _pos){
  Uint32 pos = hash(key, sz);
  Uint32 count = 0;
  while((values[pos] & KP_MASK) != key && count < sz){
    pos = nextHash(key, sz, pos, ++count);
  }
  Uint32 lo = 0;
  Uint32 hi = sz;
  Uint32 pos = (hi + lo) >> 1;

  if((values[pos] & KP_MASK)== key){
    *_pos = pos;
    return true;
  }
  return false;
}
  DEBUG printf("findKey(H'%.8x %d)", key, sz);

static
Uint32
hash(Uint32 key, Uint32 size){
  Uint32 tmp = (key >> 16) ^ (key & 0xFFFF);
  return (((tmp << 16) | tmp) % size) << 1;
  if (sz == 0)
  {
    DEBUG ndbout_c(" -> false, 0");
    * _pos = 0;
    return false;
  }

static
Uint32
nextHash(Uint32 key, Uint32 size, Uint32 pos, Uint32 count){
  Uint32 p = (pos >> 1);
  if((key % size) != 0)
    p += key;
  else
    p += 1;
  return (p % size) << 1;
  Uint32 val = 0;
  Uint32 oldpos = pos + 1;
  while (pos != oldpos) 
  {
    DEBUG printf(" [ %d %d %d ] ", lo, pos, hi);
    assert(pos < hi);
    assert(pos >= lo);
    val = values[2*pos] & KP_MASK;
    if (key > val)
    {
      lo = pos;
    }

static
Uint32
directory(Uint32 sz){
  const Uint32 _input = sz;
  if((sz & 1) == 0)
    sz ++;
  
  bool prime = false;
  while(!prime){
    prime = true;
    for(Uint32 n = 3; n*n <= sz; n += 2){
      if((sz % n) == 0){
	prime = false;
	sz += 2;
	break;
    else if (key < val)
    {
      hi = pos;
    }
    else 
    {
      * _pos = 2*pos;
      DEBUG ndbout_c(" -> true, %d", pos);
      return true;
    }
    oldpos = pos;
    pos = (hi + lo) >> 1;
  } 
  DEBUG printf("directory %d -> %d\n", _input, sz);
  return sz;

  DEBUG printf(" pos: %d (key %.8x val: %.8x values[pos]: %x) key>val: %d ",
	       pos, key, val, values[2*pos] & KP_MASK,
	       key > val);

  pos += (key > val) ? 1 : 0;
  
  * _pos = 2*pos;
  DEBUG ndbout_c(" -> false, %d", pos);
  return false;
}


ConfigValuesFactory::ConfigValuesFactory(Uint32 keys, Uint32 data){
  m_sectionCounter = (1 << KP_SECTION_SHIFT);
  m_freeKeys = directory(keys);
  m_freeKeys = keys;
  m_freeData = (data + 7) & ~7;
  m_currentSection = 0;
  m_cfg = create(m_freeKeys, m_freeData);
@@ -316,11 +314,14 @@ ConfigValuesFactory::expand(Uint32 fk, Uint32 fs){
    return ;
  }

  DEBUG printf("[ fk fd ] : [ %d %d ]", m_freeKeys, m_freeData);
  
  m_freeKeys = (m_freeKeys >= fk ? m_cfg->m_size : fk + m_cfg->m_size);
  m_freeData = (m_freeData >= fs ? m_cfg->m_dataSize : fs + m_cfg->m_dataSize);
  m_freeKeys = directory(m_freeKeys);
  m_freeData = (m_freeData + 7) & ~7;
  
  DEBUG ndbout_c(" [ %d %d ]", m_freeKeys, m_freeData);

  ConfigValues * m_tmp = m_cfg;
  m_cfg = create(m_freeKeys, m_freeData);
  put(* m_tmp);
@@ -336,7 +337,6 @@ ConfigValuesFactory::shrink(){

  m_freeKeys = m_cfg->m_size - m_freeKeys;
  m_freeData = m_cfg->m_dataSize - m_freeData;
  m_freeKeys = directory(m_freeKeys);
  m_freeData = (m_freeData + 7) & ~7;

  ConfigValues * m_tmp = m_cfg;
@@ -415,52 +415,58 @@ ConfigValuesFactory::put(const ConfigValues::Entry & entry){
  }
  
  const Uint32 tmp = entry.m_key | m_currentSection;
  const Uint32 sz = m_cfg->m_size;
  Uint32 pos = hash(tmp, sz);
  Uint32 count = 0;
  Uint32 val = m_cfg->m_values[pos];
  const Uint32 sz = m_cfg->m_size - m_freeKeys;

  while((val & KP_MASK) != tmp && val != CFV_KEY_FREE && count < sz){
    pos = nextHash(tmp, sz, pos, ++count);
    val = m_cfg->m_values[pos];
  }

  if((val & KP_MASK) == tmp){
  Uint32 pos;
  if (findKey(m_cfg->m_values, sz, tmp, &pos))
  {
    DEBUG ndbout_c("key %x already found at pos: %d", tmp, pos);
    return false;
  }

  if(count >= sz){
    pos = hash(tmp, sz);    
    count = 0;
    Uint32 val = m_cfg->m_values[pos];
   
    printf("key: %d, (key %% size): %d\n", entry.m_key, (entry.m_key % sz));
    printf("pos: %d", pos);
    while((val & KP_MASK) != tmp && val != CFV_KEY_FREE && count < sz){
      pos = nextHash(tmp, sz, pos, ++count);
      val = m_cfg->m_values[pos];
      printf(" %d", pos);
  DEBUG {
    printf("H'before ");
    Uint32 prev = 0;
    for (Uint32 i = 0; i<sz; i++)
    {
      Uint32 val = m_cfg->m_values[2*i] & KP_MASK;
      ndbout_c("%.8x", val);
      assert(val >= prev);
      prev = val;
    }
  }
    printf("\n");
  
    abort();
    printf("Full\n");
    return false;
  if (pos != 2*sz)
  {
    DEBUG ndbout_c("pos: %d sz: %d", pos, sz);
    memmove(m_cfg->m_values + pos + 2, m_cfg->m_values + pos, 
	    4 * (2*sz - pos));
  }

  assert(pos < (sz << 1));

  Uint32 key = tmp;
  key |= (entry.m_type << KP_TYPE_SHIFT);
  m_cfg->m_values[pos] = key;

  DEBUG {
    printf("H'after ");
    Uint32 prev = 0;
    for (Uint32 i = 0; i<=sz; i++)
    {
      Uint32 val = m_cfg->m_values[2*i] & KP_MASK;
      ndbout_c("%.8x", val);
      assert(val >= prev);
      prev = val;
    }
  }
  
  switch(entry.m_type){
  case ConfigValues::IntType:
  case ConfigValues::SectionType:
    m_cfg->m_values[pos+1] = entry.m_int;    
    m_freeKeys--;
    DEBUG printf("Putting at: %d(%d) (loop = %d) key: %d value: %d\n", 
		   pos, sz, count, 
		   pos, sz, 0, 
		   (key >> KP_KEYVAL_SHIFT) & KP_KEYVAL_MASK,
		   entry.m_int);
    return true;
@@ -472,7 +478,7 @@ ConfigValuesFactory::put(const ConfigValues::Entry & entry){
    m_freeKeys--;
    m_freeData -= sizeof(char *);
    DEBUG printf("Putting at: %d(%d) (loop = %d) key: %d value(%d): %s\n", 
		   pos, sz, count, 
		   pos, sz, 0, 
		   (key >> KP_KEYVAL_SHIFT) & KP_KEYVAL_MASK,
		   index,
		   entry.m_string);
@@ -485,7 +491,7 @@ ConfigValuesFactory::put(const ConfigValues::Entry & entry){
    m_freeKeys--;
    m_freeData -= 8;
    DEBUG printf("Putting at: %d(%d) (loop = %d) key: %d value64(%d): %lld\n", 
		   pos, sz, count, 
		   pos, sz, 0, 
		   (key >> KP_KEYVAL_SHIFT) & KP_KEYVAL_MASK,
		   index,
		   entry.m_int64);
@@ -648,6 +654,8 @@ ConfigValuesFactory::unpack(const void * _src, Uint32 len){
  }

  const char * src = (const char *)_src;
  const char * end = src + len - 4;
  src += sizeof(Magic);
  
  {
    Uint32 len32 = (len >> 2);
@@ -663,9 +671,37 @@ ConfigValuesFactory::unpack(const void * _src, Uint32 len){
    }
  }

  const char * end = src + len - 4;
  src += sizeof(Magic);
  const char * save = src;

  {
    Uint32 keys = 0;
    Uint32 data = 0;
    while(end - src > 4){
      Uint32 tmp = ntohl(* (const Uint32 *)src); src += 4;
      keys++;
      switch(::getTypeOf(tmp)){
      case ConfigValues::IntType:
      case ConfigValues::SectionType:
	src += 4;
	break;
      case ConfigValues::Int64Type:
	src += 8;
	data += 8;
	break;
      case ConfigValues::StringType:{
	Uint32 s_len = ntohl(* (const Uint32 *)src);
	src += 4 + mod4(s_len);
	data += sizeof(char*);
	break;
      }
      default:
	break;
      }
    }
    expand(keys, data);
  }
  
  src = save;
  ConfigValues::Entry entry;
  while(end - src > 4){
    Uint32 tmp = ntohl(* (const Uint32 *)src); src += 4;