Commit 93498009 authored by unknown's avatar unknown
Browse files

bug#12220 - ndb - node recovery with charsets

  LQH computes incorrect hash values during NR (as it doesn't concider charsets)

  Solution: make LQH compute correct hash :-)
           1) move xfrm_key into SimulatedBlock so that there's _one_ impl.
           2) make TC, ACC, LQH use same impl.


ndb/include/kernel/AttributeDescriptor.hpp:
  Make SimulatedBlock use AttributeDescriptor (to xfrm)
ndb/src/kernel/blocks/dbacc/Dbacc.hpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dbacc/DbaccInit.cpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dbacc/DbaccMain.cpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dblqh/Dblqh.hpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dbtc/Dbtc.hpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/blocks/dbtc/DbtcMain.cpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/vm/SimulatedBlock.cpp:
  Move xfrm handling into SimulatedBlock
ndb/src/kernel/vm/SimulatedBlock.hpp:
  Move xfrm handling into SimulatedBlock
parent ad56f835
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ class AttributeDescriptor {
  friend class Dbacc;
  friend class Dbtup;
  friend class Dbtux;
  friend class SimulatedBlock;

private:
  static void setType(Uint32 &, Uint32 type);
+0 −8
Original line number Diff line number Diff line
@@ -851,13 +851,6 @@ struct Tabrec {
  Uint32 fragptrholder[MAX_FRAG_PER_NODE];
  Uint32 tabUserPtr;
  BlockReference tabUserRef;

  Uint8 noOfKeyAttr;
  Uint8 hasCharAttr;
  struct KeyAttr {
    Uint32 attributeDescriptor;
    CHARSET_INFO* charsetInfo;
  } keyAttr[MAX_ATTRIBUTES_IN_INDEX];
};
  typedef Ptr<Tabrec> TabrecPtr;

@@ -903,7 +896,6 @@ private:
  void execACCKEYREQ(Signal* signal);
  void execACCSEIZEREQ(Signal* signal);
  void execACCFRAGREQ(Signal* signal);
  void execTC_SCHVERREQ(Signal* signal);
  void execACC_SRREQ(Signal* signal);
  void execNEXT_SCANREQ(Signal* signal);
  void execACC_ABORTREQ(Signal* signal);
+0 −1
Original line number Diff line number Diff line
@@ -179,7 +179,6 @@ Dbacc::Dbacc(const class Configuration & conf):
  addRecSignal(GSN_ACCKEYREQ, &Dbacc::execACCKEYREQ);
  addRecSignal(GSN_ACCSEIZEREQ, &Dbacc::execACCSEIZEREQ);
  addRecSignal(GSN_ACCFRAGREQ, &Dbacc::execACCFRAGREQ);
  addRecSignal(GSN_TC_SCHVERREQ, &Dbacc::execTC_SCHVERREQ);
  addRecSignal(GSN_ACC_SRREQ, &Dbacc::execACC_SRREQ);
  addRecSignal(GSN_NEXT_SCANREQ, &Dbacc::execNEXT_SCANREQ);
  addRecSignal(GSN_ACC_ABORTREQ, &Dbacc::execACC_ABORTREQ);
+14 −115
Original line number Diff line number Diff line
@@ -28,7 +28,8 @@
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/DropTab.hpp>
#include <signaldata/DumpStateOrd.hpp>
#include <SectionReader.hpp>
#include <KeyDescriptor.hpp>


// TO_DO_RONM is a label for comments on what needs to be improved in future versions
// when more time is given.
@@ -1037,12 +1038,6 @@ void Dbacc::initialiseTableRec(Signal* signal)
      tabptr.p->fragholder[i] = RNIL;
      tabptr.p->fragptrholder[i] = RNIL;
    }//for
    tabptr.p->noOfKeyAttr = 0;
    tabptr.p->hasCharAttr = 0;
    for (Uint32 k = 0; k < MAX_ATTRIBUTES_IN_INDEX; k++) {
      tabptr.p->keyAttr[k].attributeDescriptor = 0;
      tabptr.p->keyAttr[k].charsetInfo = 0;
    }
  }//for
}//Dbacc::initialiseTableRec()

@@ -1172,8 +1167,8 @@ void Dbacc::execACCFRAGREQ(Signal* signal)
  Uint32 userPtr = req->userPtr;
  BlockReference retRef = req->userRef;
  rootfragrecptr.p->rootState = ACTIVEROOT;
  AccFragConf * const conf = (AccFragConf*)&signal->theData[0];

  AccFragConf * const conf = (AccFragConf*)&signal->theData[0];
  conf->userPtr = userPtr;
  conf->rootFragPtr = rootfragrecptr.i;
  conf->fragId[0] = rootfragrecptr.p->fragmentid[0];
@@ -1197,65 +1192,6 @@ void Dbacc::addFragRefuse(Signal* signal, Uint32 errorCode)
  return;
}//Dbacc::addFragRefuseEarly()

void
Dbacc::execTC_SCHVERREQ(Signal* signal)
{
  jamEntry();
  if (! assembleFragments(signal)) {
    jam();
    return;
  }
  tabptr.i = signal->theData[0];
  ptrCheckGuard(tabptr, ctablesize, tabrec);
  Uint32 noOfKeyAttr = signal->theData[6];
  ndbrequire(noOfKeyAttr <= MAX_ATTRIBUTES_IN_INDEX);
  Uint32 hasCharAttr = 0;

  SegmentedSectionPtr s0Ptr;
  signal->getSection(s0Ptr, 0);
  SectionReader r0(s0Ptr, getSectionSegmentPool());
  Uint32 i = 0;
  while (i < noOfKeyAttr) {
    jam();
    Uint32 attributeDescriptor = ~0;
    Uint32 csNumber = ~0;
    if (! r0.getWord(&attributeDescriptor) ||
        ! r0.getWord(&csNumber)) {
      jam();
      break;
    }
    CHARSET_INFO* cs = 0;
    if (csNumber != 0) {
      cs = all_charsets[csNumber];
      ndbrequire(cs != 0);
      hasCharAttr = 1;
    }
    tabptr.p->keyAttr[i].attributeDescriptor = attributeDescriptor;
    tabptr.p->keyAttr[i].charsetInfo = cs;
    i++;
  }
  ndbrequire(i == noOfKeyAttr);
  releaseSections(signal);

  tabptr.p->noOfKeyAttr = noOfKeyAttr;
  tabptr.p->hasCharAttr = hasCharAttr;

  // copy char attr flag to each fragment
  for (Uint32 i1 = 0; i1 < MAX_FRAG_PER_NODE; i1++) {
    jam();
    if (tabptr.p->fragptrholder[i1] != RNIL) {
      rootfragrecptr.i = tabptr.p->fragptrholder[i1];
      ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
      for (Uint32 i2 = 0; i2 < 2; i2++) {
        fragrecptr.i = rootfragrecptr.p->fragmentptr[i2];
        ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
        fragrecptr.p->hasCharAttr = hasCharAttr;
      }
    }
  }

  // no reply to DICT
}

void
Dbacc::execDROP_TAB_REQ(Signal* signal){
@@ -1841,55 +1777,14 @@ void Dbacc::execACCKEYREQ(Signal* signal)
void
Dbacc::xfrmKeyData(Signal* signal)
{
  tabptr.i = fragrecptr.p->myTableId;
  ptrCheckGuard(tabptr, ctablesize, tabrec);

  Uint32 dst[1024 * MAX_XFRM_MULTIPLY];
  Uint32 dstSize = (sizeof(dst) >> 2);
  Uint32 table = fragrecptr.p->myTableId;
  Uint32 dst[MAX_KEY_SIZE_IN_WORDS * MAX_XFRM_MULTIPLY];
  Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
  Uint32* src = &signal->theData[7];
  const Uint32 noOfKeyAttr = tabptr.p->noOfKeyAttr;
  Uint32 dstPos = 0;
  Uint32 srcPos = 0;
  Uint32 i = 0;

  while (i < noOfKeyAttr) {
    const Tabrec::KeyAttr& keyAttr = tabptr.p->keyAttr[i];

    Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(keyAttr.attributeDescriptor);
    Uint32 srcWords = (srcBytes + 3) / 4;
    Uint32 dstWords = ~0;
    uchar* dstPtr = (uchar*)&dst[dstPos];
    const uchar* srcPtr = (const uchar*)&src[srcPos];
    CHARSET_INFO* cs = keyAttr.charsetInfo;

    if (cs == 0) {
      jam();
      memcpy(dstPtr, srcPtr, srcWords << 2);
      dstWords = srcWords;
    } else {
      jam();
      Uint32 typeId = AttributeDescriptor::getType(keyAttr.attributeDescriptor);
      Uint32 lb, len;
      bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
      ndbrequire(ok);
      Uint32 xmul = cs->strxfrm_multiply;
      if (xmul == 0)
        xmul = 1;
      // see comment in DbtcMain.cpp
      Uint32 dstLen = xmul * (srcBytes - lb);
      ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
      int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
      ndbrequire(n != -1);
      while ((n & 3) != 0)
        dstPtr[n++] = 0;
      dstWords = (n >> 2);
    }
    dstPos += dstWords;
    srcPos += srcWords;
    i++;
  }
  memcpy(src, dst, dstPos << 2);
  operationRecPtr.p->xfrmtupkeylen = dstPos;
  Uint32 len = xfrm_key(table, src, dst, sizeof(dst) >> 2, keyPartLen);
  ndbrequire(len); // 0 means error
  memcpy(src, dst, len << 2);
  operationRecPtr.p->xfrmtupkeylen = len;
}

void Dbacc::accIsLockedLab(Signal* signal) 
@@ -8024,6 +7919,10 @@ void Dbacc::initFragAdd(Signal* signal,
  Uint32 Tmp2 = regFragPtr.p->maxloadfactor - regFragPtr.p->minloadfactor;
  Tmp2 = Tmp1 * Tmp2;
  regFragPtr.p->slackCheck = Tmp2;

  Uint32 hasCharAttr = g_key_descriptor_pool.getPtr(req->tableId)->hasCharAttr;
  regFragPtr.p->hasCharAttr = hasCharAttr;

}//Dbacc::initFragAdd()

void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
+45 −23
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@
#include <SectionReader.hpp>
#include <SimpleProperties.hpp>
#include <AttributeHeader.hpp>
#include <KeyDescriptor.hpp>
#include <signaldata/DictSchemaInfo.hpp>
#include <signaldata/DictTabInfo.hpp>
#include <signaldata/DropTabFile.hpp>
@@ -1750,6 +1751,7 @@ void Dbdict::execREAD_CONFIG_REQ(Signal* signal)
  c_schemaPageRecordArray.setSize(2 * NDB_SF_MAX_PAGES);
  c_tableRecordPool.setSize(tablerecSize);
  c_tableRecordHash.setSize(tablerecSize);
  g_key_descriptor_pool.setSize(tablerecSize);
  c_triggerRecordPool.setSize(c_maxNoOfTriggers);
  c_triggerRecordHash.setSize(c_maxNoOfTriggers);
  c_opRecordPool.setSize(256);   // XXX need config params
@@ -4450,6 +4452,44 @@ Dbdict::execADD_FRAGREQ(Signal* signal) {
    sendSignal(DBLQH_REF, GSN_LQHFRAGREQ, signal, 
	       LqhFragReq::SignalLength, JBB);
  }

  /**
   * Create KeyDescriptor
   */
  KeyDescriptor* desc= g_key_descriptor_pool.getPtr(tabPtr.i);
  new (desc) KeyDescriptor();

  Uint32 key = 0;
  Uint32 tAttr = tabPtr.p->firstAttribute;
  while (tAttr != RNIL) 
  {
    jam();
    AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
    if (aRec->tupleKey) 
    {
      desc->noOfKeyAttr ++;
      desc->keyAttr[key].attributeDescriptor = aRec->attributeDescriptor;
      
      Uint32 csNumber = (aRec->extPrecision >> 16);
      if(csNumber)
      {
	desc->keyAttr[key].charsetInfo = all_charsets[csNumber];
	ndbrequire(all_charsets[csNumber]);
	desc->hasCharAttr = 1;
      }
      else
      {
	desc->keyAttr[key].charsetInfo = 0;	  
      }
      if(AttributeDescriptor::getDKey(aRec->attributeDescriptor))
      {
	desc->noOfDistrKeys ++;
      }
      key++;
    }
    tAttr = aRec->nextAttrInTable;
  }
  ndbrequire(key == tabPtr.p->noOfPrimkey);
}

void
@@ -4645,27 +4685,7 @@ Dbdict::execTAB_COMMITCONF(Signal* signal){
    signal->theData[5] = createTabPtr.p->key;
    signal->theData[6] = (Uint32)tabPtr.p->noOfPrimkey;
    
    Uint32 buf[2 * MAX_ATTRIBUTES_IN_INDEX];
    Uint32 sz = 0;
    Uint32 tAttr = tabPtr.p->firstAttribute;
    while (tAttr != RNIL) {
      jam();
      AttributeRecord* aRec = c_attributeRecordPool.getPtr(tAttr);
      if (aRec->tupleKey) {
        buf[sz++] = aRec->attributeDescriptor;
        buf[sz++] = (aRec->extPrecision >> 16); // charset number
      }
      tAttr = aRec->nextAttrInTable;
    }
    ndbrequire((int)sz == 2 * tabPtr.p->noOfPrimkey);

    LinearSectionPtr lsPtr[3];
    lsPtr[0].p = buf;
    lsPtr[0].sz = sz;
    // note: ACC does not reply
    if (tabPtr.p->isTable() || tabPtr.p->isHashIndex())
      sendSignal(DBACC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1);
    sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB, lsPtr, 1);
    sendSignal(DBTC_REF, GSN_TC_SCHVERREQ, signal, 7, JBB);
    return;
  }
  
@@ -12342,3 +12362,5 @@ Dbdict::getMetaAttribute(MetaData::Attribute& attr, const MetaData::Table& table
  new (&attr) MetaData::Attribute(*attrPtr.p);
  return 0;
}

CArray<KeyDescriptor> g_key_descriptor_pool;
Loading