Commit 6e0cdd89 authored by unknown's avatar unknown
Browse files

Merge clam.ndb.mysql.com:/space/pekka/ndb/version/my50-bug19285

into  clam.ndb.mysql.com:/space/pekka/ndb/version/my51-bug19285


storage/ndb/include/ndbapi/NdbBlob.hpp:
  manual merge
storage/ndb/include/ndbapi/NdbScanOperation.hpp:
  SCCS merged
storage/ndb/src/ndbapi/NdbBlob.cpp:
  SCCS merged
storage/ndb/src/ndbapi/NdbBlobImpl.hpp:
  manual merge
storage/ndb/src/ndbapi/ndberror.c:
  use local - add 4275 afterwards
storage/ndb/test/ndbapi/testBlobs.cpp:
  automerge
parents 0d8d39c8 e9bfc415
Loading
Loading
Loading
Loading
+39 −13
Original line number Diff line number Diff line
@@ -79,19 +79,41 @@ class NdbEventOperationImpl;
 * Non-void NdbBlob methods return -1 on error and 0 on success.  Output
 * parameters are used when necessary.
 *
 * Operation types:
 * - insertTuple must use setValue if blob column is non-nullable
 * - readTuple with exclusive lock can also update existing value
 * - updateTuple can overwrite with setValue or update existing value
 * - writeTuple always overwrites and must use setValue if non-nullable
 * Usage notes for different operation types:
 *
 * - insertTuple must use setValue if blob attribute is non-nullable
 *
 * - readTuple or scan readTuples with lock mode LM_CommittedRead is
 *   automatically upgraded to lock mode LM_Read if any blob attributes
 *   are accessed (to guarantee consistent view)
 *
 * - readTuple (with any lock mode) can only read blob value
 *
 * - updateTuple can either overwrite existing value with setValue or
 *   update it in active phase
 *
 * - writeTuple always overwrites blob value and must use setValue if
 *   blob attribute is non-nullable
 *
 * - deleteTuple creates implicit non-accessible blob handles
 * - scan with exclusive lock can also update existing value
 * - scan "lock takeover" update op must do its own getBlobHandle
 *
 * - scan readTuples (any lock mode) can use its blob handles only
 *   to read blob value
 *
 * - scan readTuples with lock mode LM_Exclusive can update row and blob
 *   value using updateCurrentTuple, where the operation returned must
 *   create its own blob handles explicitly
 *
 * - scan readTuples with lock mode LM_Exclusive can delete row (and
 *   therefore blob values) using deleteCurrentTuple, which creates
 *   implicit non-accessible blob handles
 *
 * - the operation returned by lockCurrentTuple cannot update blob value
 *
 * Bugs / limitations:
 * - lock mode upgrade should be handled automatically
 * - lock mode vs allowed operation is not checked
 *
 * - too many pending blob ops can blow up i/o buffers
 *
 * - table and its blob part tables are not created atomically
 */
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
@@ -214,6 +236,9 @@ public:
  /**
   * Return error object.  The error may be blob specific or may be
   * copied from a failed implicit operation.
   *
   * The error code is copied back to the operation unless the operation
   * already has a non-zero error code.
   */
  const NdbError& getNdbError() const;
  /**
@@ -326,6 +351,7 @@ private:
  bool isWriteOp();
  bool isDeleteOp();
  bool isScanOp();
  bool isReadOnlyOp();
  bool isTakeOverOp();
  // computations
  Uint32 getPartNumber(Uint64 pos);
@@ -367,10 +393,10 @@ private:
  int atNextResult();
  int atNextEvent();
  // errors
  void setErrorCode(int anErrorCode, bool invalidFlag = true);
  void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
  void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true);
  void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true);
  void setErrorCode(int anErrorCode, bool invalidFlag = false);
  void setErrorCode(NdbOperation* anOp, bool invalidFlag = false);
  void setErrorCode(NdbTransaction* aCon, bool invalidFlag = false);
  void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = false);
#ifdef VM_TRACE
  int getOperationType() const;
  friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
+1 −1
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ public:
   * readTuples.
   */
  enum ScanFlag {
    SF_TupScan = (1 << 16),     // scan TUP - only LM_CommittedRead
    SF_TupScan = (1 << 16),     // scan TUP
    SF_OrderBy = (1 << 24),     // index scan in order
    SF_Descending = (2 << 24),  // index scan in descending order
    SF_ReadRangeNo = (4 << 24), // enable @ref get_range_no
+44 −12
Original line number Diff line number Diff line
@@ -393,6 +393,16 @@ NdbBlob::isScanOp()
    theNdbOp->theOperationType == NdbOperation::OpenRangeScanRequest;
}

inline bool
NdbBlob::isReadOnlyOp()
{
  return ! (
    theNdbOp->theOperationType == NdbOperation::InsertRequest ||
    theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
    theNdbOp->theOperationType == NdbOperation::WriteRequest
  );
}

inline bool
NdbBlob::isTakeOverOp()
{
@@ -638,12 +648,12 @@ NdbBlob::getValue(void* data, Uint32 bytes)
{
  DBUG_ENTER("NdbBlob::getValue");
  DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
  if (theGetFlag || theState != Prepared) {
    setErrorCode(NdbBlobImpl::ErrState);
  if (! isReadOp() && ! isScanOp()) {
    setErrorCode(NdbBlobImpl::ErrCompat);
    DBUG_RETURN(-1);
  }
  if (! isReadOp() && ! isScanOp()) {
    setErrorCode(NdbBlobImpl::ErrUsage);
  if (theGetFlag || theState != Prepared) {
    setErrorCode(NdbBlobImpl::ErrState);
    DBUG_RETURN(-1);
  }
  if (data == NULL && bytes != 0) {
@@ -661,12 +671,12 @@ NdbBlob::setValue(const void* data, Uint32 bytes)
{
  DBUG_ENTER("NdbBlob::setValue");
  DBUG_PRINT("info", ("data=%p bytes=%u", data, bytes));
  if (theSetFlag || theState != Prepared) {
    setErrorCode(NdbBlobImpl::ErrState);
  if (isReadOnlyOp()) {
    setErrorCode(NdbBlobImpl::ErrCompat);
    DBUG_RETURN(-1);
  }
  if (! isInsertOp() && ! isUpdateOp() && ! isWriteOp()) {
    setErrorCode(NdbBlobImpl::ErrUsage);
  if (theSetFlag || theState != Prepared) {
    setErrorCode(NdbBlobImpl::ErrState);
    DBUG_RETURN(-1);
  }
  if (data == NULL && bytes != 0) {
@@ -762,6 +772,10 @@ int
NdbBlob::setNull()
{
  DBUG_ENTER("NdbBlob::setNull");
  if (isReadOnlyOp()) {
    setErrorCode(NdbBlobImpl::ErrCompat);
    DBUG_RETURN(-1);
  }
  if (theNullFlag == -1) {
    if (theState == Prepared) {
      DBUG_RETURN(setValue(0, 0));
@@ -800,6 +814,10 @@ NdbBlob::truncate(Uint64 length)
{
  DBUG_ENTER("NdbBlob::truncate");
  DBUG_PRINT("info", ("length=%llu", length));
  if (isReadOnlyOp()) {
    setErrorCode(NdbBlobImpl::ErrCompat);
    DBUG_RETURN(-1);
  }
  if (theNullFlag == -1) {
    setErrorCode(NdbBlobImpl::ErrState);
    DBUG_RETURN(-1);
@@ -857,12 +875,14 @@ NdbBlob::setPos(Uint64 pos)
int
NdbBlob::readData(void* data, Uint32& bytes)
{
  DBUG_ENTER("NdbBlob::readData");
  if (theState != Active) {
    setErrorCode(NdbBlobImpl::ErrState);
    return -1;
    DBUG_RETURN(-1);
  }
  char* buf = static_cast<char*>(data);
  return readDataPrivate(buf, bytes);
  int ret = readDataPrivate(buf, bytes);
  DBUG_RETURN(ret);
}

int
@@ -951,12 +971,18 @@ NdbBlob::readDataPrivate(char* buf, Uint32& bytes)
int
NdbBlob::writeData(const void* data, Uint32 bytes)
{
  DBUG_ENTER("NdbBlob::writeData");
  if (isReadOnlyOp()) {
    setErrorCode(NdbBlobImpl::ErrCompat);
    DBUG_RETURN(-1);
  }
  if (theState != Active) {
    setErrorCode(NdbBlobImpl::ErrState);
    return -1;
    DBUG_RETURN(-1);
  }
  const char* buf = static_cast<const char*>(data);
  return writeDataPrivate(buf, bytes);
  int ret = writeDataPrivate(buf, bytes);
  DBUG_RETURN(0);
}

int
@@ -1355,6 +1381,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
        DBUG_RETURN(-1);
    }
    if (isReadOp()) {
      // upgrade lock mode
      if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
        theNdbOp->theLockMode = NdbOperation::LM_Read;
      // add read of head+inline in this op
      if (getHeadInlineValue(theNdbOp) == -1)
        DBUG_RETURN(-1);
@@ -1373,6 +1402,9 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
    supportedOp = true;
  }
  if (isScanOp()) {
    // upgrade lock mode
    if (theNdbOp->theLockMode == NdbOperation::LM_CommittedRead)
      theNdbOp->theLockMode = NdbOperation::LM_Read;
    // add read of head+inline in this op
    if (getHeadInlineValue(theNdbOp) == -1)
      DBUG_RETURN(-1);
+4 −2
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ public:
  STATIC_CONST( ErrTable = 4263 );
  // "Invalid usage of blob attribute" 
  STATIC_CONST( ErrUsage = 4264 );
  // "Method is not valid in current blob state"
  // "The blob method is not valid in current blob state"
  STATIC_CONST( ErrState = 4265 );
  // "Invalid blob seek position"
  STATIC_CONST( ErrSeek = 4266 );
@@ -33,9 +33,11 @@ public:
  // "Error in blob head update forced rollback of transaction"
  STATIC_CONST( ErrAbort = 4268 );
  // "Unknown blob error"
  STATIC_CONST( ErrUnknown = 4269 );
  STATIC_CONST( ErrUnknown = 4270 );
  // "Corrupted main table PK in blob operation"
  STATIC_CONST( ErrCorruptPK = 4274 );
  // "The blob method is incompatible with operation type or lock mode"
  STATIC_CONST( ErrCompat = 4275 );
};

#endif
+89 −40
Original line number Diff line number Diff line
@@ -297,13 +297,15 @@ createTable()
struct Bval {
  char* m_val;
  unsigned m_len;
  char* m_buf;
  char* m_buf; // read/write buffer
  unsigned m_buflen;
  int m_error_code; // for testing expected error code
  Bval() :
    m_val(0),
    m_len(0),
    m_buf(0),   // read/write buffer
    m_buflen(0)
    m_buf(0),
    m_buflen(0),
    m_error_code(0)
    {}
  ~Bval() { delete [] m_val; delete [] m_buf; }
  void alloc(unsigned buflen) {
@@ -459,19 +461,23 @@ getBlobLength(NdbBlob* h, unsigned& len)
// setValue / getValue

static int
setBlobValue(NdbBlob* h, const Bval& v)
setBlobValue(NdbBlob* h, const Bval& v, int error_code = 0)
{
  bool null = (v.m_val == 0);
  bool isNull;
  unsigned len;
  DBG("setValue " <<  h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
  if (null) {
    CHK(h->setNull() == 0);
    CHK(h->setNull() == 0 || h->getNdbError().code == error_code);
    if (error_code)
      return 0;
    isNull = false;
    CHK(h->getNull(isNull) == 0 && isNull == true);
    CHK(getBlobLength(h, len) == 0 && len == 0);
  } else {
    CHK(h->setValue(v.m_val, v.m_len) == 0);
    CHK(h->setValue(v.m_val, v.m_len) == 0 || h->getNdbError().code == error_code);
    if (error_code)
      return 0;
    CHK(h->getNull(isNull) == 0 && isNull == false);
    CHK(getBlobLength(h, len) == 0 && len == v.m_len);
  }
@@ -479,11 +485,11 @@ setBlobValue(NdbBlob* h, const Bval& v)
}

static int
setBlobValue(const Tup& tup)
setBlobValue(const Tup& tup, int error_code = 0)
{
  CHK(setBlobValue(g_bh1, tup.m_blob1) == 0);
  CHK(setBlobValue(g_bh1, tup.m_blob1, error_code) == 0);
  if (! g_opt.m_oneblob)
    CHK(setBlobValue(g_bh2, tup.m_blob2) == 0);
    CHK(setBlobValue(g_bh2, tup.m_blob2, error_code) == 0);
  return 0;
}

@@ -543,13 +549,18 @@ writeBlobData(NdbBlob* h, const Bval& v)
  bool isNull;
  unsigned len;
  DBG("write " <<  h->getColumn()->getName() << " len=" << v.m_len << " null=" << null);
  int error_code = v.m_error_code;
  if (null) {
    CHK(h->setNull() == 0);
    CHK(h->setNull() == 0 || h->getNdbError().code == error_code);
    if (error_code)
      return 0;
    isNull = false;
    CHK(h->getNull(isNull) == 0 && isNull == true);
    CHK(getBlobLength(h, len) == 0 && len == 0);
  } else {
    CHK(h->truncate(v.m_len) == 0);
    CHK(h->truncate(v.m_len) == 0 || h->getNdbError().code == error_code);
    if (error_code)
      return 0;
    unsigned n = 0;
    do {
      unsigned m = g_opt.m_full ? v.m_len : urandom(v.m_len + 1);
@@ -568,11 +579,14 @@ writeBlobData(NdbBlob* h, const Bval& v)
}

static int
writeBlobData(const Tup& tup)
writeBlobData(Tup& tup, int error_code = 0)
{
  tup.m_blob1.m_error_code = error_code;
  CHK(writeBlobData(g_bh1, tup.m_blob1) == 0);
  if (! g_opt.m_oneblob)
  if (! g_opt.m_oneblob) {
    tup.m_blob2.m_error_code = error_code;
    CHK(writeBlobData(g_bh2, tup.m_blob2) == 0);
  }
  return 0;
}

@@ -635,19 +649,20 @@ blobWriteHook(NdbBlob* h, void* arg)
}

static int
setBlobWriteHook(NdbBlob* h, Bval& v)
setBlobWriteHook(NdbBlob* h, Bval& v, int error_code = 0)
{
  DBG("setBlobWriteHook");
  v.m_error_code = error_code;
  CHK(h->setActiveHook(blobWriteHook, &v) == 0);
  return 0;
}

static int
setBlobWriteHook(Tup& tup)
setBlobWriteHook(Tup& tup, int error_code = 0)
{
  CHK(setBlobWriteHook(g_bh1, tup.m_blob1) == 0);
  CHK(setBlobWriteHook(g_bh1, tup.m_blob1, error_code) == 0);
  if (! g_opt.m_oneblob)
    CHK(setBlobWriteHook(g_bh2, tup.m_blob2) == 0);
    CHK(setBlobWriteHook(g_bh2, tup.m_blob2, error_code) == 0);
  return 0;
}

@@ -869,7 +884,10 @@ readPk(int style)
    DBG("readPk pk1=" << hex << tup.m_pk1);
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
    if (urandom(2) == 0)
      CHK(g_opr->readTuple() == 0);
    else
      CHK(g_opr->readTuple(NdbOperation::LM_CommittedRead) == 0);
    CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
    if (g_opt.m_pk2len != 0)
      CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
@@ -883,6 +901,8 @@ readPk(int style)
      CHK(readBlobData(tup) == 0);
    }
    CHK(g_con->execute(Commit) == 0);
    // verify lock mode upgrade
    CHK(g_opr->getLockMode() == NdbOperation::LM_Read);
    if (style == 0 || style == 1) {
      CHK(verifyBlobValue(tup) == 0);
    }
@@ -900,23 +920,40 @@ updatePk(int style)
  for (unsigned k = 0; k < g_opt.m_rows; k++) {
    Tup& tup = g_tups[k];
    DBG("updatePk pk1=" << hex << tup.m_pk1);
    while (1) {
      int mode = urandom(3);
      int error_code = mode == 0 ? 0 : 4275;
      CHK((g_con = g_ndb->startTransaction()) != 0);
      CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
      if (mode == 0) {
        DBG("using updateTuple");
        CHK(g_opr->updateTuple() == 0);
      } else if (mode == 1) {
        DBG("using readTuple exclusive");
        CHK(g_opr->readTuple(NdbOperation::LM_Exclusive) == 0);
      } else {
        DBG("using readTuple - will fail and retry");
        CHK(g_opr->readTuple() == 0);
      }
      CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
      if (g_opt.m_pk2len != 0)
        CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
      CHK(getBlobHandles(g_opr) == 0);
      if (style == 0) {
      CHK(setBlobValue(tup) == 0);
        CHK(setBlobValue(tup, error_code) == 0);
      } else if (style == 1) {
      CHK(setBlobWriteHook(tup) == 0);
        CHK(setBlobWriteHook(tup, error_code) == 0);
      } else {
        CHK(g_con->execute(NoCommit) == 0);
      CHK(writeBlobData(tup) == 0);
        CHK(writeBlobData(tup, error_code) == 0);
      }
      if (error_code == 0) {
        CHK(g_con->execute(Commit) == 0);
        g_ndb->closeTransaction(g_con);
        break;
      }
      g_ndb->closeTransaction(g_con);
    }
    g_opr = 0;
    g_con = 0;
    tup.m_exists = true;
@@ -1002,7 +1039,10 @@ readIdx(int style)
    DBG("readIdx pk1=" << hex << tup.m_pk1);
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
    if (urandom(2) == 0)
      CHK(g_opx->readTuple() == 0);
    else
      CHK(g_opx->readTuple(NdbOperation::LM_CommittedRead) == 0);
    CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
    CHK(getBlobHandles(g_opx) == 0);
    if (style == 0) {
@@ -1014,6 +1054,8 @@ readIdx(int style)
      CHK(readBlobData(tup) == 0);
    }
    CHK(g_con->execute(Commit) == 0);
    // verify lock mode upgrade (already done by NdbIndexOperation)
    CHK(g_opx->getLockMode() == NdbOperation::LM_Read);
    if (style == 0 || style == 1) {
      CHK(verifyBlobValue(tup) == 0);
    }
@@ -1031,6 +1073,7 @@ updateIdx(int style)
  for (unsigned k = 0; k < g_opt.m_rows; k++) {
    Tup& tup = g_tups[k];
    DBG("updateIdx pk1=" << hex << tup.m_pk1);
    // skip 4275 testing
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_opx = g_con->getNdbIndexOperation(g_opt.m_x1name, g_opt.m_tname)) != 0);
    CHK(g_opx->updateTuple() == 0);
@@ -1128,7 +1171,10 @@ readScan(int style, bool idx)
  } else {
    CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
  }
  CHK(g_ops->readTuples(NdbScanOperation::LM_Read) == 0);
  if (urandom(2) == 0)
    CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
  else
    CHK(g_ops->readTuples(NdbOperation::LM_CommittedRead) == 0);
  CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
  if (g_opt.m_pk2len != 0)
    CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
@@ -1139,6 +1185,8 @@ readScan(int style, bool idx)
    CHK(setBlobReadHook(tup) == 0);
  }
  CHK(g_con->execute(NoCommit) == 0);
  // verify lock mode upgrade
  CHK(g_ops->getLockMode() == NdbOperation::LM_Read);
  unsigned rows = 0;
  while (1) {
    int ret;
@@ -1180,7 +1228,7 @@ updateScan(int style, bool idx)
  } else {
    CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
  }
  CHK(g_ops->readTuples(NdbScanOperation::LM_Exclusive) == 0);
  CHK(g_ops->readTuples(NdbOperation::LM_Exclusive) == 0);
  CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
  if (g_opt.m_pk2len != 0)
    CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
@@ -1199,6 +1247,7 @@ updateScan(int style, bool idx)
    // calculate new blob values
    calcBval(g_tups[k], false);
    tup.copyfrom(g_tups[k]);
    // cannot do 4275 testing, scan op error code controls execution
    CHK((g_opr = g_ops->updateCurrentTuple()) != 0);
    CHK(getBlobHandles(g_opr) == 0);
    if (style == 0) {
@@ -1232,7 +1281,7 @@ deleteScan(bool idx)
  } else {
    CHK((g_ops = g_con->getNdbIndexScanOperation(g_opt.m_x2name, g_opt.m_tname)) != 0);
  }
  CHK(g_ops->readTuples(NdbScanOperation::LM_Exclusive) == 0);
  CHK(g_ops->readTuples(NdbOperation::LM_Exclusive) == 0);
  CHK(g_ops->getValue("PK1", (char*)&tup.m_pk1) != 0);
  if (g_opt.m_pk2len != 0)
    CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
@@ -1651,7 +1700,7 @@ testperf()
    char b[20];
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
    CHK(g_ops->readTuples(NdbScanOperation::LM_Read) == 0);
    CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
    CHK(g_ops->getValue(cA, (char*)&a) != 0);
    CHK(g_ops->getValue(cB, b) != 0);
    CHK(g_con->execute(NoCommit) == 0);
@@ -1680,7 +1729,7 @@ testperf()
    char c[20];
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
    CHK(g_ops->readTuples(NdbScanOperation::LM_Read) == 0);
    CHK(g_ops->readTuples(NdbOperation::LM_Read) == 0);
    CHK(g_ops->getValue(cA, (char*)&a) != 0);
    CHK((g_bh1 = g_ops->getBlobHandle(cC)) != 0);
    CHK(g_con->execute(NoCommit) == 0);