Commit a3c22ae7 authored by unknown's avatar unknown
Browse files

Merge mysql.com:/home/jonas/src/mysql-4.1-ndb

into mysql.com:/home/jonas/src/mysql-4.1

parents 8f787983 706825a6
Loading
Loading
Loading
Loading
+9 −4
Original line number Diff line number Diff line
@@ -1003,17 +1003,22 @@ public:

  /*
   * TUX index in TUP has single Uint32 array attribute which stores an
   * index node.  TUX uses following methods.
   * index node.  TUX reads and writes the node directly via pointer.
   */
  int tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node);
  void tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node);
  void tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node);

  /*
   * TUX reads primary table attributes for 1) index key 2) primary key
   * when returning keyinfo.  TUX uses following methods.
   * TUX reads primary table attributes for index keys.  Input is
   * attribute ids in AttributeHeader format.  Output is pointers to
   * attribute data within tuple or 0 for NULL value.
   */
  void tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData);

  /*
   * TUX reads primary key for md5 summing and when returning keyinfo.
   */
  void tuxReadAttrs();  // under construction
  void tuxReadKeys();   // under construction

private:
+63 −6
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@
void
Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32& tupAddr)
{
  ljamEntry();
  FragrecordPtr fragPtr;
  fragPtr.i = fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -54,6 +55,7 @@ Dbtup::tuxGetTupAddr(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32&
int
Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pageOffset, Uint32*& node)
{
  ljamEntry();
  FragrecordPtr fragPtr;
  fragPtr.i = fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -63,7 +65,7 @@ Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pag
  PagePtr pagePtr;
  terrorCode = 0;
  if (! allocTh(fragPtr.p, tablePtr.p, NORMAL_PAGE, signal, pageOffset, pagePtr)) {
    jam();
    ljam();
    ndbrequire(terrorCode != 0);
    return terrorCode;
  }
@@ -77,6 +79,7 @@ Dbtup::tuxAllocNode(Signal* signal, Uint32 fragPtrI, Uint32& pageId, Uint32& pag
void
Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32* node)
{
  ljamEntry();
  FragrecordPtr fragPtr;
  fragPtr.i = fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -95,6 +98,7 @@ Dbtup::tuxFreeNode(Signal* signal, Uint32 fragPtrI, Uint32 pageId, Uint32 pageOf
void
Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& node)
{
  ljamEntry();
  FragrecordPtr fragPtr;
  fragPtr.i = fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
@@ -109,9 +113,62 @@ Dbtup::tuxGetNode(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32*& no
  node = &pagePtr.p->pageWord[pageOffset] + attrDataOffset;
}

void    // under construction
Dbtup::tuxReadAttrs()
void
Dbtup::tuxReadAttrs(Uint32 fragPtrI, Uint32 pageId, Uint32 pageOffset, Uint32 tupVersion, Uint32 numAttrs, const Uint32* attrIds, const Uint32** attrData)
{
  ljamEntry();
  FragrecordPtr fragPtr;
  fragPtr.i = fragPtrI;
  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
  TablerecPtr tablePtr;
  tablePtr.i = fragPtr.p->fragTableId;
  ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
  PagePtr pagePtr;
  pagePtr.i = pageId;
  ptrCheckGuard(pagePtr, cnoOfPage, page);
  // search for tuple version if not original
  if (pagePtr.p->pageWord[pageOffset + 1] != tupVersion) {
    ljam();
    OperationrecPtr opPtr;
    opPtr.i = pagePtr.p->pageWord[pageOffset];
    Uint32 loopGuard = 0;
    while (true) {
      ptrCheckGuard(opPtr, cnoOfOprec, operationrec);
      if (opPtr.p->realPageIdC != RNIL) {
        pagePtr.i = opPtr.p->realPageIdC;
        pageOffset = opPtr.p->pageOffsetC;
        ptrCheckGuard(pagePtr, cnoOfPage, page);
        if (pagePtr.p->pageWord[pageOffset + 1] == tupVersion) {
          ljam();
          break;
        }
      }
      ljam();
      opPtr.i = opPtr.p->nextActiveOp;
      ndbrequire(++loopGuard < (1 << ZTUP_VERSION_BITS));
    }
  }
  const Uint32 tabDescriptor = tablePtr.p->tabDescriptor;
  const Uint32* tupleHeader = &pagePtr.p->pageWord[pageOffset];
  for (Uint32 i = 0; i < numAttrs; i++) {
    AttributeHeader ah(attrIds[i]);
    Uint32 attrId = ah.getAttributeId();
    Uint32 index = tabDescriptor + (attrId << ZAD_LOG_SIZE);
    Uint32 desc1 = tableDescriptor[index].tabDescr;
    Uint32 desc2 = tableDescriptor[index + 1].tabDescr;
    if (AttributeDescriptor::getNullable(desc1)) {
      Uint32 offset = AttributeOffset::getNullFlagOffset(desc2);
      ndbrequire(offset < tablePtr.p->tupNullWords);
      offset += tablePtr.p->tupNullIndex;
      ndbrequire(offset < tablePtr.p->tupheadsize);
      if (AttributeOffset::isNULL(tupleHeader[offset], desc2)) {
        ljam();
        attrData[i] = 0;
        continue;
      }
    }
    attrData[i] = tupleHeader + AttributeOffset::getOffset(desc2);
  }
}

void    // under construction
@@ -259,10 +316,10 @@ Dbtup::execTUP_QUERY_TH(Signal* signal)
    for this transaction and savepoint id. If its tuple version equals
    the requested then we have a visible tuple otherwise not.
    */
    jam();
    ljam();
    Uint32 read_tupVersion = pagePtr.p->pageWord[tempOp.pageOffset + 1];
    if (read_tupVersion == req_tupVersion) {
      jam();
      ljam();
      ret_result = 1;
    }
  }
@@ -580,7 +637,7 @@ Dbtup::buildIndex(Signal* signal, Uint32 buildPtrI)
      tuple as a copy tuple. The original tuple is stable and is thus
      preferrable to store in TUX.
      */
      jam();
      ljam();
      ptrCheckGuard(pageOperPtr, cnoOfOprec, operationrec);
      realPageId = pageOperPtr.p->realPageId;
      pageOffset = pageOperPtr.p->pageOffset;
+36 −50
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#include <new>
#include <ndb_limits.h>
#include <SimulatedBlock.hpp>
#include <AttributeDescriptor.hpp>
#include <AttributeHeader.hpp>
#include <ArrayPool.hpp>
#include <DataBuffer.hpp>
@@ -84,6 +85,10 @@
#define jam()           jamLine(90000 + __LINE__)
#define jamEntry()      jamEntryLine(90000 + __LINE__)
#endif
#ifndef jam
#define jam()           jamLine(__LINE__)
#define jamEntry()      jamEntryLine(__LINE__)
#endif

#undef max
#undef min
@@ -115,7 +120,7 @@ private:
  struct DescEnt;

  /*
   * Pointer to Uint32 data.  Interpretation is context dependent.
   * Pointer to array of Uint32.
   */
  struct Data {
  private:
@@ -131,7 +136,7 @@ private:
  friend class Data;

  /*
   * Pointer to constant Uint32 data.
   * Pointer to array of constant Uint32.
   */
  struct ConstData;
  friend struct ConstData;
@@ -153,6 +158,11 @@ private:
  // AttributeHeader size is assumed to be 1 word
  static const unsigned AttributeHeaderSize = 1;

  /*
   * Array of pointers to TUP table attributes.  Always read-on|y.
   */
  typedef const Uint32** TableData;

  /*
   * Logical tuple address, "local key".  Identifies table tuples.
   */
@@ -554,31 +564,6 @@ private:
    ReadPar();
  };

  /*
   * Tree search for entry.
   */
  struct SearchPar;
  friend struct SearchPar;
  struct SearchPar {
    ConstData m_data;           // input index key values
    TreeEnt m_ent;              // input tuple and version
    SearchPar();
  };

  /*
   * Attribute data comparison.
   */
  struct CmpPar;
  friend struct CmpPar;
  struct CmpPar {
    ConstData m_data1;          // full search key
    ConstData m_data2;          // full or prefix data
    unsigned m_len2;            // words in data2 buffer
    unsigned m_first;           // first attribute
    unsigned m_numEq;           // number of initial equal attributes
    CmpPar();
  };

  /*
   * Scan bound comparison.
   */
@@ -602,7 +587,10 @@ private:
  void execSTTOR(Signal* signal);
  void execREAD_CONFIG_REQ(Signal* signal);
  // utils
  void setKeyAttrs(const Frag& frag);
  void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData);
  void copyAttrs(Data dst, ConstData src, CopyPar& copyPar);
  void copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);

  /*
   * DbtuxMeta.cpp
@@ -645,7 +633,7 @@ private:
  /*
   * DbtuxTree.cpp
   */
  void treeSearch(Signal* signal, Frag& frag, SearchPar searchPar, TreePos& treePos);
  void treeSearch(Signal* signal, Frag& frag, TableData searchKey, TreeEnt searchEnt, TreePos& treePos);
  void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent);
  void treeRemove(Signal* signal, Frag& frag, TreePos treePos);
  void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i);
@@ -672,7 +660,8 @@ private:
  /*
   * DbtuxCmp.cpp
   */
  int cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar);
  int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2 = MaxAttrDataSize);
  int cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2);
  int cmpScanBound(const Frag& frag, const BoundPar boundPar);

  /*
@@ -716,12 +705,25 @@ private:
  Uint32 c_internalStartPhase;
  Uint32 c_typeOfStart;

  // buffers
  Data c_keyBuffer;             // search key or scan bound
  /*
   * Array of index key attribute ids in AttributeHeader format.
   * Includes fixed attribute sizes.  This is global data set at
   * operation start and is not passed as a parameter.
   */
  Data c_keyAttrs;

  // buffer for search key data as pointers to TUP storage
  TableData c_searchKey;

  // buffer for current entry key data as pointers to TUP storage
  TableData c_entryKey;

  // buffer for scan bounds and keyinfo (primary key)
  Data c_dataBuffer;

  // inlined utils
  DescEnt& getDescEnt(Uint32 descPage, Uint32 descOff);
  Uint32 getTupAddr(const Frag& frag, const TreeEnt ent);
  Uint32 getTupAddr(const Frag& frag, TreeEnt ent);
  static unsigned min(unsigned x, unsigned y);
  static unsigned max(unsigned x, unsigned y);
};
@@ -1211,23 +1213,6 @@ Dbtux::ReadPar::ReadPar() :
{
}

inline
Dbtux::SearchPar::SearchPar() :
  m_data(0),
  m_ent()
{
}

inline
Dbtux::CmpPar::CmpPar() :
  m_data1(0),
  m_data2(0),
  m_len2(0),
  m_first(0),
  m_numEq(0)
{
}

inline
Dbtux::BoundPar::BoundPar() :
  m_data1(0),
@@ -1267,12 +1252,13 @@ Dbtux::getDescEnt(Uint32 descPage, Uint32 descOff)
}

inline Uint32
Dbtux::getTupAddr(const Frag& frag, const TreeEnt ent)
Dbtux::getTupAddr(const Frag& frag, TreeEnt ent)
{
  const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
  const TupLoc tupLoc = ent.m_tupLoc;
  Uint32 tupAddr = NullTupAddr;
  c_tup->tuxGetTupAddr(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupAddr);
  jamEntry();
  return tupAddr;
}

+71 −26
Original line number Diff line number Diff line
@@ -18,50 +18,42 @@
#include "Dbtux.hpp"

/*
 * Search key vs tree entry.
 * Search key vs node prefix.
 *
 * Compare search key and index attribute data.  The attribute data may
 * be partial in which case CmpUnknown may be returned.  Also counts how
 * many (additional) initial attributes were equal.
 * The comparison starts at given attribute position (in fact 0).  The
 * position is updated by number of equal initial attributes found.  The
 * prefix may be partial in which case CmpUnknown may be returned.
 */
int
Dbtux::cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar)
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, ConstData data2, unsigned maxlen2)
{
  const unsigned numAttrs = frag.m_numAttrs;
  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
  ConstData data1 = cmpPar.m_data1;
  ConstData data2 = cmpPar.m_data2;
  // number of words of attribute data left
  unsigned len2 = cmpPar.m_len2;
  const unsigned numAttrs = frag.m_numAttrs;
  unsigned index = cmpPar.m_first;
  ndbrequire(index < numAttrs);
  // skip to right position in search key  XXX do it before the call
  for (unsigned i = 0; i < index; i++) {
    jam();
    data1 += AttributeHeaderSize + data1.ah().getDataSize();
  }
  unsigned numEq = 0;
  unsigned len2 = maxlen2;
  // skip to right position in search key
  data1 += start;
  int ret = 0;
  while (index < numAttrs) {
  while (start < numAttrs) {
    if (len2 < AttributeHeaderSize) {
      jam();
      ret = NdbSqlUtil::CmpUnknown;
      break;
    }
    len2 -= AttributeHeaderSize;
    if (! data1.ah().isNULL()) {
    if (*data1 != 0) {
      if (! data2.ah().isNULL()) {
        jam();
        // current attribute
        const DescAttr& descAttr = descEnt.m_descAttr[index];
        const DescAttr& descAttr = descEnt.m_descAttr[start];
        const unsigned typeId = descAttr.m_typeId;
        // full data size
        const unsigned size1 = data1.ah().getDataSize();
        const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
        ndbrequire(size1 != 0 && size1 == data2.ah().getDataSize());
        const unsigned size2 = min(size1, len2);
        len2 -= size2;
        // compare
        const Uint32* const p1 = &data1[AttributeHeaderSize];
        const Uint32* const p1 = *data1;
        const Uint32* const p2 = &data2[AttributeHeaderSize];
        ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size2);
        if (ret != 0) {
@@ -82,17 +74,70 @@ Dbtux::cmpTreeAttrs(const Frag& frag, CmpPar& cmpPar)
        break;
      }
    }
    data1 += AttributeHeaderSize + data1.ah().getDataSize();
    data1 += 1;
    data2 += AttributeHeaderSize + data2.ah().getDataSize();
    numEq++;
    index++;
    start++;
  }
  // XXX until data format errors are handled
  ndbrequire(ret != NdbSqlUtil::CmpError);
  return ret;
}

/*
 * Search key vs tree entry.
 *
 * Start position is updated as in previous routine.
 */
int
Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, TableData data1, TableData data2)
{
  const unsigned numAttrs = frag.m_numAttrs;
  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
  // skip to right position
  data1 += start;
  data2 += start;
  int ret = 0;
  while (start < numAttrs) {
    if (*data1 != 0) {
      if (*data2 != 0) {
        jam();
        // current attribute
        const DescAttr& descAttr = descEnt.m_descAttr[start];
        const unsigned typeId = descAttr.m_typeId;
        // full data size
        const unsigned size1 = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
        // compare
        const Uint32* const p1 = *data1;
        const Uint32* const p2 = *data2;
        ret = NdbSqlUtil::cmp(typeId, p1, p2, size1, size1);
        if (ret != 0) {
          jam();
          break;
        }
      } else {
        jam();
        // not NULL < NULL
        ret = -1;
        break;
      }
    } else {
      if (*data2 != 0) {
        jam();
        // NULL > not NULL
        ret = +1;
        break;
      }
    }
    data1 += 1;
    data2 += 1;
    start++;
  }
  // XXX until data format errors are handled
  ndbrequire(ret != NdbSqlUtil::CmpError);
  cmpPar.m_numEq += numEq;      // add to previous count
  return ret;
}


/*
 * Scan bound vs tree entry.
 *
+78 −2
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ Dbtux::Dbtux(const Configuration& conf) :
#endif
  c_internalStartPhase(0),
  c_typeOfStart(NodeState::ST_ILLEGAL_TYPE),
  c_keyBuffer(0)
  c_dataBuffer(0)
{
  BLOCK_CONSTRUCTOR(Dbtux);
  // verify size assumptions (also when release-compiled)
@@ -195,7 +195,10 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)
    new (indexPtr.p) Index();
  }
  // allocate buffers
  c_keyBuffer = (Uint32*)allocRecord("c_keyBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
  c_keyAttrs = (Uint32*)allocRecord("c_keyAttrs", sizeof(Uint32), MaxIndexAttributes);
  c_searchKey = (TableData)allocRecord("c_searchKey", sizeof(Uint32*), MaxIndexAttributes);
  c_entryKey = (TableData)allocRecord("c_entryKey", sizeof(Uint32*), MaxIndexAttributes);
  c_dataBuffer = (Uint32*)allocRecord("c_dataBuffer", sizeof(Uint64), (MaxAttrDataSize + 1) >> 1);
  // ack
  ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
  conf->senderRef = reference();
@@ -206,6 +209,37 @@ Dbtux::execREAD_CONFIG_REQ(Signal* signal)

// utils

void
Dbtux::setKeyAttrs(const Frag& frag)
{
  Data keyAttrs = c_keyAttrs;   // global
  const unsigned numAttrs = frag.m_numAttrs;
  const DescEnt& descEnt = getDescEnt(frag.m_descPage, frag.m_descOff);
  for (unsigned i = 0; i < numAttrs; i++) {
    const DescAttr& descAttr = descEnt.m_descAttr[i];
    Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
    // set attr id and fixed size
    keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
    keyAttrs += 1;
  }
}

void
Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, TableData keyData)
{
  ConstData keyAttrs = c_keyAttrs; // global
  const Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[ent.m_fragBit];
  const TupLoc tupLoc = ent.m_tupLoc;
  const Uint32 tupVersion = ent.m_tupVersion;
  ndbrequire(start < frag.m_numAttrs);
  const unsigned numAttrs = frag.m_numAttrs - start;
  // start applies to both keys and output data
  keyAttrs += start;
  keyData += start;
  c_tup->tuxReadAttrs(tableFragPtrI, tupLoc.m_pageId, tupLoc.m_pageOffset, tupVersion, numAttrs, keyAttrs, keyData);
  jamEntry();
}

void
Dbtux::copyAttrs(Data dst, ConstData src, CopyPar& copyPar)
{
@@ -240,4 +274,46 @@ Dbtux::copyAttrs(Data dst, ConstData src, CopyPar& copyPar)
  copyPar = c;
}

/*
 * Input is pointers to table attributes.  Output is array of attribute
 * data with headers.  Copies whatever fits.
 */
void
Dbtux::copyAttrs(const Frag& frag, TableData data1, Data data2, unsigned maxlen2)
{
  ConstData keyAttrs = c_keyAttrs; // global
  const unsigned numAttrs = frag.m_numAttrs;
  unsigned len2 = maxlen2;
  for (unsigned n = 0; n < numAttrs; n++) {
    jam();
    const unsigned attrId = keyAttrs.ah().getAttributeId();
    const unsigned dataSize = keyAttrs.ah().getDataSize();
    const Uint32* const p1 = *data1;
    if (p1 != 0) {
      if (len2 == 0)
        return;
      data2.ah() = AttributeHeader(attrId, dataSize);
      data2 += 1;
      len2 -= 1;
      unsigned n = dataSize;
      for (unsigned i = 0; i < dataSize; i++) {
        if (len2 == 0)
          return;
        *data2 = p1[i];
        data2 += 1;
        len2 -= 1;
      }
    } else {
      if (len2 == 0)
        return;
      data2.ah() = AttributeHeader(attrId, 0);
      data2.ah().setNULL();
      data2 += 1;
      len2 -= 1;
    }
    keyAttrs += 1;
    data1 += 1;
  }
}

BLOCK_FUNCTIONS(Dbtux);
Loading