Commit 76dd9aa2 authored by unknown's avatar unknown
Browse files

Merge orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-ndb

into  orca.ndb.mysql.com:/export/home/space/pekka/ndb/version/my50-bug20446

parents d049ec4a eb97edf4
Loading
Loading
Loading
Loading
+56 −146
Original line number Diff line number Diff line
@@ -121,41 +121,17 @@ private:
  // forward declarations
  struct DescEnt;

  /*
   * Pointer to array of Uint32.
   */
  struct Data {
  private:
    Uint32* m_data;
  public:
    Data();
    Data(Uint32* data);
    Data& operator=(Uint32* data);
    operator Uint32*() const;
    Data& operator+=(size_t n);
    AttributeHeader& ah() const;
  };
  friend class Data;
  // Pointer to array of Uint32 represents attribute data and bounds

  /*
   * Pointer to array of constant Uint32.
   */
  struct ConstData;
  friend struct ConstData;
  struct ConstData {
  private:
    const Uint32* m_data;
  public:
    ConstData();
    ConstData(const Uint32* data);
    ConstData& operator=(const Uint32* data);
    operator const Uint32*() const;
    ConstData& operator+=(size_t n);
    const AttributeHeader& ah() const;
    // non-const pointer can be cast to const pointer
    ConstData(Data data);
    ConstData& operator=(Data data);
  };
  typedef Uint32 *Data;
  inline AttributeHeader& ah(Data data) {
    return *reinterpret_cast<AttributeHeader*>(data);
  }

  typedef const Uint32* ConstData;
  inline const AttributeHeader& ah(ConstData data) {
    return *reinterpret_cast<const AttributeHeader*>(data);
  }

  // AttributeHeader size is assumed to be 1 word
  STATIC_CONST( AttributeHeaderSize = 1 );
@@ -212,6 +188,7 @@ private:
    unsigned m_fragBit : 1;     // which duplicated table fragment
    TreeEnt();
    // methods
    bool eqtuple(const TreeEnt ent) const;
    bool eq(const TreeEnt ent) const;
    int cmp(const TreeEnt ent) const;
  };
@@ -289,8 +266,7 @@ private:
  struct TreePos {
    TupLoc m_loc;               // physical node address
    Uint16 m_pos;               // position 0 to m_occup
    Uint8 m_match;              // at an existing entry
    Uint8 m_dir;                // see scanNext()
    Uint8 m_dir;                // see scanNext
    TreePos();
  };

@@ -381,12 +357,13 @@ private:
    enum {
      Undef = 0,
      First = 1,                // before first entry
      Current = 2,              // at current before locking
      Blocked = 3,              // at current waiting for ACC lock
      Locked = 4,               // at current and locked or no lock needed
      Next = 5,                 // looking for next extry
      Last = 6,                 // after last entry
      Aborting = 7,             // lock wait at scan close
      Current = 2,              // at some entry
      Found = 3,                // return current as next scan result
      Blocked = 4,              // found and waiting for ACC lock
      Locked = 5,               // found and locked or no lock needed
      Next = 6,                 // looking for next extry
      Last = 7,                 // after last entry
      Aborting = 8,             // lock wait at scan close
      Invalid = 9               // cannot return REF to LQH currently
    };
    Uint16 m_state;
@@ -563,6 +540,7 @@ private:
  void readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData);
  void readTablePk(const Frag& frag, TreeEnt ent, Data pkData, unsigned& pkSize);
  void copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2 = MaxAttrDataSize);
  void unpackBound(const ScanBound& bound, Data data);

  /*
   * DbtuxMeta.cpp
@@ -637,7 +615,9 @@ private:
  void execACCKEYREF(Signal* signal);
  void execACC_ABORTCONF(Signal* signal);
  void scanFirst(ScanOpPtr scanPtr);
  void scanFind(ScanOpPtr scanPtr);
  void scanNext(ScanOpPtr scanPtr, bool fromMaintReq);
  bool scanCheck(ScanOpPtr scanPtr, TreeEnt ent);
  bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
  void scanClose(Signal* signal, ScanOpPtr scanPtr);
  void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
@@ -647,8 +627,8 @@ private:
  /*
   * DbtuxSearch.cpp
   */
  void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
  void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
  bool searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
  bool searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
  void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, bool descending, TreePos& treePos);
  void searchToScanAscending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
  void searchToScanDescending(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
@@ -737,99 +717,6 @@ private:
  static unsigned max(unsigned x, unsigned y);
};

// Dbtux::Data

inline
Dbtux::Data::Data() :
  m_data(0)
{
}

inline
Dbtux::Data::Data(Uint32* data) :
  m_data(data)
{
}

inline Dbtux::Data&
Dbtux::Data::operator=(Uint32* data)
{
  m_data = data;
  return *this;
}

inline
Dbtux::Data::operator Uint32*() const
{
  return m_data;
}

inline Dbtux::Data&
Dbtux::Data::operator+=(size_t n)
{
  m_data += n;
  return *this;
}

inline AttributeHeader&
Dbtux::Data::ah() const
{
  return *reinterpret_cast<AttributeHeader*>(m_data);
}

// Dbtux::ConstData

inline
Dbtux::ConstData::ConstData() :
  m_data(0)
{
}

inline
Dbtux::ConstData::ConstData(const Uint32* data) :
  m_data(data)
{
}

inline Dbtux::ConstData&
Dbtux::ConstData::operator=(const Uint32* data)
{
  m_data = data;
  return *this;
}

inline
Dbtux::ConstData::operator const Uint32*() const
{
  return m_data;
}

inline Dbtux::ConstData&
Dbtux::ConstData::operator+=(size_t n)
{
  m_data += n;
  return *this;
}

inline const AttributeHeader&
Dbtux::ConstData::ah() const
{
  return *reinterpret_cast<const AttributeHeader*>(m_data);
}

inline
Dbtux::ConstData::ConstData(Data data) :
  m_data(static_cast<Uint32*>(data))
{
}

inline Dbtux::ConstData&
Dbtux::ConstData::operator=(Data data)
{
  m_data = static_cast<Uint32*>(data);
  return *this;
}

// Dbtux::TupLoc

inline
@@ -898,6 +785,14 @@ Dbtux::TreeEnt::TreeEnt() :
{
}

inline bool
Dbtux::TreeEnt::eqtuple(const TreeEnt ent) const
{
  return
    m_tupLoc == ent.m_tupLoc &&
    m_fragBit == ent.m_fragBit;
}

inline bool
Dbtux::TreeEnt::eq(const TreeEnt ent) const
{
@@ -910,6 +805,11 @@ Dbtux::TreeEnt::eq(const TreeEnt ent) const
inline int
Dbtux::TreeEnt::cmp(const TreeEnt ent) const
{
  // compare frag first to improve cacheing in 5.0
  if (m_fragBit < ent.m_fragBit)
    return -1;
  if (m_fragBit > ent.m_fragBit)
    return +1;
  if (m_tupLoc.getPageId() < ent.m_tupLoc.getPageId())
    return -1;
  if (m_tupLoc.getPageId() > ent.m_tupLoc.getPageId())
@@ -918,14 +818,25 @@ Dbtux::TreeEnt::cmp(const TreeEnt ent) const
    return -1;
  if (m_tupLoc.getPageOffset() > ent.m_tupLoc.getPageOffset())
    return +1;
  if (m_tupVersion < ent.m_tupVersion)
  /*
   * Guess if one tuple version has wrapped around.  This is well
   * defined ordering on existing versions since versions are assigned
   * consecutively and different versions exists only on uncommitted
   * tuple.  Assuming max 2**14 uncommitted ops on same tuple.
   */
  const unsigned version_wrap_limit = (1 << (ZTUP_VERSION_BITS - 1));
  if (m_tupVersion < ent.m_tupVersion) {
    if (ent.m_tupVersion - m_tupVersion < version_wrap_limit)
      return -1;
  if (m_tupVersion > ent.m_tupVersion)
    else
      return +1;
  if (m_fragBit < ent.m_fragBit)
    return -1;
  if (m_fragBit > ent.m_fragBit)
  }
  if (m_tupVersion > ent.m_tupVersion) {
    if (m_tupVersion - ent.m_tupVersion < version_wrap_limit)
      return +1;
    else
      return -1;
  }
  return 0;
}

@@ -992,7 +903,6 @@ inline
Dbtux::TreePos::TreePos() :
  m_loc(),
  m_pos(ZNIL),
  m_match(false),
  m_dir(255)
{
}
+19 −19
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
  // skip to right position in search key only
  for (unsigned i = 0; i < start; i++) {
    jam();
    searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
    searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
  }
  // number of words of entry data left
  unsigned len2 = maxlen;
@@ -46,16 +46,16 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
      break;
    }
    len2 -= AttributeHeaderSize;
    if (! searchKey.ah().isNULL()) {
      if (! entryData.ah().isNULL()) {
    if (! ah(searchKey).isNULL()) {
      if (! ah(entryData).isNULL()) {
        jam();
        // verify attribute id
        const DescAttr& descAttr = descEnt.m_descAttr[start];
        ndbrequire(searchKey.ah().getAttributeId() == descAttr.m_primaryAttrId);
        ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
        ndbrequire(ah(searchKey).getAttributeId() == descAttr.m_primaryAttrId);
        ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
        // sizes
        const unsigned size1 = searchKey.ah().getDataSize();
        const unsigned size2 = min(entryData.ah().getDataSize(), len2);
        const unsigned size1 = ah(searchKey).getDataSize();
        const unsigned size2 = min(ah(entryData).getDataSize(), len2);
        len2 -= size2;
        // compare
        NdbSqlUtil::Cmp* const cmp = c_sqlCmp[start];
@@ -74,15 +74,15 @@ Dbtux::cmpSearchKey(const Frag& frag, unsigned& start, ConstData searchKey, Cons
        break;
      }
    } else {
      if (! entryData.ah().isNULL()) {
      if (! ah(entryData).isNULL()) {
        jam();
        // NULL < not NULL
        ret = -1;
        break;
      }
    }
    searchKey += AttributeHeaderSize + searchKey.ah().getDataSize();
    entryData += AttributeHeaderSize + entryData.ah().getDataSize();
    searchKey += AttributeHeaderSize + ah(searchKey).getDataSize();
    entryData += AttributeHeaderSize + ah(entryData).getDataSize();
    start++;
  }
  return ret;
@@ -130,17 +130,17 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
    // get and skip bound type (it is used after the loop)
    type = boundInfo[0];
    boundInfo += 1;
    if (! boundInfo.ah().isNULL()) {
      if (! entryData.ah().isNULL()) {
    if (! ah(boundInfo).isNULL()) {
      if (! ah(entryData).isNULL()) {
        jam();
        // verify attribute id
        const Uint32 index = boundInfo.ah().getAttributeId();
        const Uint32 index = ah(boundInfo).getAttributeId();
        ndbrequire(index < frag.m_numAttrs);
        const DescAttr& descAttr = descEnt.m_descAttr[index];
        ndbrequire(entryData.ah().getAttributeId() == descAttr.m_primaryAttrId);
        ndbrequire(ah(entryData).getAttributeId() == descAttr.m_primaryAttrId);
        // sizes
        const unsigned size1 = boundInfo.ah().getDataSize();
        const unsigned size2 = min(entryData.ah().getDataSize(), len2);
        const unsigned size1 = ah(boundInfo).getDataSize();
        const unsigned size2 = min(ah(entryData).getDataSize(), len2);
        len2 -= size2;
        // compare
        NdbSqlUtil::Cmp* const cmp = c_sqlCmp[index];
@@ -159,14 +159,14 @@ Dbtux::cmpScanBound(const Frag& frag, unsigned idir, ConstData boundInfo, unsign
      }
    } else {
      jam();
      if (! entryData.ah().isNULL()) {
      if (! ah(entryData).isNULL()) {
        jam();
        // NULL < not NULL
        return -1;
      }
    }
    boundInfo += AttributeHeaderSize + boundInfo.ah().getDataSize();
    entryData += AttributeHeaderSize + entryData.ah().getDataSize();
    boundInfo += AttributeHeaderSize + ah(boundInfo).getDataSize();
    entryData += AttributeHeaderSize + ah(entryData).getDataSize();
    boundCount -= 1;
  }
  // all attributes were equal
+0 −1
Original line number Diff line number Diff line
@@ -311,7 +311,6 @@ operator<<(NdbOut& out, const Dbtux::TreePos& pos)
  out << "[TreePos " << hex << &pos;
  out << " [loc " << pos.m_loc << "]";
  out << " [pos " << dec << pos.m_pos << "]";
  out << " [match " << dec << pos.m_match << "]";
  out << " [dir " << dec << pos.m_dir << "]";
  out << "]";
  return out;
+17 −4
Original line number Diff line number Diff line
@@ -221,7 +221,7 @@ Dbtux::setKeyAttrs(const Frag& frag)
    const DescAttr& descAttr = descEnt.m_descAttr[i];
    Uint32 size = AttributeDescriptor::getSizeInWords(descAttr.m_attrDesc);
    // set attr id and fixed size
    keyAttrs.ah() = AttributeHeader(descAttr.m_primaryAttrId, size);
    ah(keyAttrs) = AttributeHeader(descAttr.m_primaryAttrId, size);
    keyAttrs += 1;
    // set comparison method pointer
    const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getTypeBinary(descAttr.m_typeId);
@@ -251,8 +251,8 @@ Dbtux::readKeyAttrs(const Frag& frag, TreeEnt ent, unsigned start, Data keyData)
    ConstData data = keyData;
    Uint32 totalSize = 0;
    for (Uint32 i = start; i < frag.m_numAttrs; i++) {
      Uint32 attrId = data.ah().getAttributeId();
      Uint32 dataSize = data.ah().getDataSize();
      Uint32 attrId = ah(data).getAttributeId();
      Uint32 dataSize = ah(data).getDataSize();
      debugOut << i << " attrId=" << attrId << " size=" << dataSize;
      data += 1;
      for (Uint32 j = 0; j < dataSize; j++) {
@@ -290,7 +290,7 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
  unsigned len2 = maxlen2;
  while (n != 0) {
    jam();
    const unsigned dataSize = data1.ah().getDataSize();
    const unsigned dataSize = ah(data1).getDataSize();
    // copy header
    if (len2 == 0)
      return;
@@ -314,4 +314,17 @@ Dbtux::copyAttrs(const Frag& frag, ConstData data1, Data data2, unsigned maxlen2
#endif
}

void
Dbtux::unpackBound(const ScanBound& bound, Data dest)
{
  ScanBoundIterator iter;
  bound.first(iter);
  const unsigned n = bound.getSize();
  unsigned j;
  for (j = 0; j < n; j++) {
    dest[j] = *iter.data;
    bound.next(iter);
  }
}

BLOCK_FUNCTIONS(Dbtux)
+7 −6
Original line number Diff line number Diff line
@@ -113,16 +113,17 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
  // do the operation
  req->errorCode = 0;
  TreePos treePos;
  bool ok;
  switch (opCode) {
  case TuxMaintReq::OpAdd:
    jam();
    searchToAdd(frag, c_searchKey, ent, treePos);
    ok = searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
    if (debugFlags & DebugMaint) {
      debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
      debugOut << treePos << (! ok ? " - error" : "") << endl;
    }
#endif
    if (treePos.m_match) {
    if (! ok) {
      jam();
      // there is no "Building" state so this will have to do
      if (indexPtr.p->m_state == Index::Online) {
@@ -152,13 +153,13 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
    break;
  case TuxMaintReq::OpRemove:
    jam();
    searchToRemove(frag, c_searchKey, ent, treePos);
    ok = searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE
    if (debugFlags & DebugMaint) {
      debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
      debugOut << treePos << (! ok ? " - error" : "") << endl;
    }
#endif
    if (! treePos.m_match) {
    if (! ok) {
      jam();
      // there is no "Building" state so this will have to do
      if (indexPtr.p->m_state == Index::Online) {
Loading