Commit f74af1b7 authored by unknown's avatar unknown
Browse files

ndb: fix blob performance in long transactions


ndb/include/ndbapi/NdbConnection.hpp:
  fix blob performance in long transactions
ndb/src/ndbapi/NdbConnection.cpp:
  fix blob performance in long transactions
ndb/test/ndbapi/testBlobs.cpp:
  fix blob performance in long transactions
parent 2d0011fc
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -607,8 +607,8 @@ private:
  NdbOperation*	theLastExecOpInList;	    // Last executing operation in list.


  NdbOperation*	theCompletedFirstOp;	    // First operation in completed 
                                            // operation list.
  NdbOperation*	theCompletedFirstOp;	    // First & last operation in completed 
  NdbOperation*	theCompletedLastOp;         // operation list.

  Uint32	theNoOfOpSent;				// How many operations have been sent	    
  Uint32	theNoOfOpCompleted;			// How many operations have completed
+36 −0
Original line number Diff line number Diff line
@@ -55,6 +55,7 @@ NdbConnection::NdbConnection( Ndb* aNdb ) :
  theFirstExecOpInList(NULL),
  theLastExecOpInList(NULL),
  theCompletedFirstOp(NULL),
  theCompletedLastOp(NULL),
  theNoOfOpSent(0),
  theNoOfOpCompleted(0),
  theNoOfOpFetched(0),
@@ -124,6 +125,7 @@ NdbConnection::init()
  theLastExecOpInList	  = NULL;

  theCompletedFirstOp	  = NULL;
  theCompletedLastOp	  = NULL;

  theGlobalCheckpointId   = 0;
  theCommitStatus         = Started;
@@ -256,6 +258,8 @@ NdbConnection::handleExecuteCompletion()
  if (tLastExecOp != NULL) {
    tLastExecOp->next(theCompletedFirstOp);
    theCompletedFirstOp = tFirstExecOp;
    if (theCompletedLastOp == NULL)
      theCompletedLastOp = tLastExecOp;
    theFirstExecOpInList = NULL;
    theLastExecOpInList = NULL;
  }//if
@@ -292,6 +296,8 @@ NdbConnection::execute(ExecType aTypeOfExec,

  ExecType tExecType;
  NdbOperation* tPrepOp;
  NdbOperation* tCompletedFirstOp = NULL;
  NdbOperation* tCompletedLastOp = NULL;

  int ret = 0;
  do {
@@ -314,6 +320,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
      }
      tPrepOp = tPrepOp->next();
    }

    // save rest of prepared ops if batch
    NdbOperation* tRestOp= 0;
    NdbOperation* tLastOp= 0;
@@ -323,6 +330,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
      tLastOp = theLastOpInList;
      theLastOpInList = tPrepOp;
    }

    if (tExecType == Commit) {
      NdbOperation* tOp = theCompletedFirstOp;
      while (tOp != NULL) {
@@ -338,6 +346,19 @@ NdbConnection::execute(ExecType aTypeOfExec,
      }
    }

    // completed ops are in unspecified order
    if (theCompletedFirstOp != NULL) {
      if (tCompletedFirstOp == NULL) {
        tCompletedFirstOp = theCompletedFirstOp;
        tCompletedLastOp = theCompletedLastOp;
      } else {
        tCompletedLastOp->next(theCompletedFirstOp);
        tCompletedLastOp = theCompletedLastOp;
      }
      theCompletedFirstOp = NULL;
      theCompletedLastOp = NULL;
    }

    if (executeNoBlobs(tExecType, abortOption, forceSend) == -1)
        ret = -1;
#ifndef VM_TRACE
@@ -362,6 +383,7 @@ NdbConnection::execute(ExecType aTypeOfExec,
        tOp = tOp->next();
      }
    }

    // add saved prepared ops if batch
    if (tPrepOp != NULL && tRestOp != NULL) {
      if (theFirstOpInList == NULL)
@@ -373,6 +395,18 @@ NdbConnection::execute(ExecType aTypeOfExec,
    assert(theFirstOpInList == NULL || tExecType == NoCommit);
  } while (theFirstOpInList != NULL || tExecType != aTypeOfExec);

  if (tCompletedFirstOp != NULL) {
    tCompletedLastOp->next(theCompletedFirstOp);
    theCompletedFirstOp = tCompletedFirstOp;
    if (theCompletedLastOp == NULL)
      theCompletedLastOp = tCompletedLastOp;
  }
#if ndb_api_count_completed_ops_after_blob_execute
  { NdbOperation* tOp; unsigned n = 0;
    for (tOp = theCompletedFirstOp; tOp != NULL; tOp = tOp->next()) n++;
    ndbout << "completed ops: " << n << endl;
  }
#endif
  DBUG_RETURN(ret);
}

@@ -894,6 +928,7 @@ NdbConnection::releaseOperations()
  releaseOps(theFirstExecOpInList);

  theCompletedFirstOp = NULL;
  theCompletedLastOp = NULL;
  theFirstOpInList = NULL;
  theFirstExecOpInList = NULL;
  theLastOpInList = NULL;
@@ -909,6 +944,7 @@ NdbConnection::releaseCompletedOperations()
{
  releaseOps(theCompletedFirstOp);
  theCompletedFirstOp = NULL;
  theCompletedLastOp = NULL;
}//NdbConnection::releaseOperations()

/******************************************************************************
+302 −1
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@
#include <NdbMain.h>
#include <NdbOut.hpp>
#include <NdbTest.hpp>
#include <NdbTick.h>

struct Bcol {
  bool m_nullable;
@@ -59,6 +60,9 @@ struct Opt {
  bool m_oneblob;
  Bcol m_blob1;
  Bcol m_blob2;
  // perf
  const char* m_tnameperf;
  unsigned m_rowsperf;
  // bugs
  int m_bug;
  int (*m_bugtest)();
@@ -84,6 +88,9 @@ struct Opt {
    m_oneblob(false),
    m_blob1(false, 7, 1137, 10),
    m_blob2(true, 99, 55, 1),
    // perf
    m_tnameperf("TBLOB2"),
    m_rowsperf(10000),
    // bugs
    m_bug(0),
    m_bugtest(0) {
@@ -107,6 +114,7 @@ printusage()
    << "  -loop N     loop N times 0=forever [" << d.m_loop << "]" << endl
    << "  -parts N    max parts in blob value [" << d.m_parts << "]" << endl
    << "  -rows N     number of rows [" << d.m_rows << "]" << endl
    << "  -rowsperf N rows for performace test [" << d.m_rowsperf << "]" << endl
    << "  -seed N     random seed 0=loop number [" << d.m_seed << "]" << endl
    << "  -skip xxx   skip given tests (see list) [no tests]" << endl
    << "  -test xxx   only given tests (see list) [all tests]" << endl
@@ -118,6 +126,7 @@ printusage()
    << "  i           hash index ops" << endl
    << "  s           table scans" << endl
    << "  r           ordered index scans" << endl
    << "  p           performance test" << endl
    << "additional flags for test/skip" << endl
    << "  u           update existing blob value" << endl
    << "  n           normal insert and update" << endl
@@ -1381,6 +1390,292 @@ testmain()
  return 0;
}

// separate performance test

struct Tmr {    // stolen from testOIBasic
  Tmr() {
    clr();
  }
  void clr() {
    m_on = m_ms = m_cnt = m_time[0] = m_text[0] = 0;
  }
  void on() {
    assert(m_on == 0);
    m_on = NdbTick_CurrentMillisecond();
  }
  void off(unsigned cnt = 0) {
    NDB_TICKS off = NdbTick_CurrentMillisecond();
    assert(m_on != 0 && off >= m_on);
    m_ms += off - m_on;
    m_cnt += cnt;
    m_on = 0;
  }
  const char* time() {
    if (m_cnt == 0)
      sprintf(m_time, "%u ms", m_ms);
    else
      sprintf(m_time, "%u ms per %u ( %u ms per 1000 )", m_ms, m_cnt, (1000 * m_ms) / m_cnt);
    return m_time;
  }
  const char* pct (const Tmr& t1) {
    if (0 < t1.m_ms)
      sprintf(m_text, "%u pct", (100 * m_ms) / t1.m_ms);
    else
      sprintf(m_text, "[cannot measure]");
    return m_text;
  }
  const char* over(const Tmr& t1) {
    if (0 < t1.m_ms) {
      if (t1.m_ms <= m_ms)
        sprintf(m_text, "%u pct", (100 * (m_ms - t1.m_ms)) / t1.m_ms);
      else
        sprintf(m_text, "-%u pct", (100 * (t1.m_ms - m_ms)) / t1.m_ms);
    } else
      sprintf(m_text, "[cannot measure]");
    return m_text;
  }
  NDB_TICKS m_on;
  unsigned m_ms;
  unsigned m_cnt;
  char m_time[100];
  char m_text[100];
};

static int
testperf()
{
  if (! testcase('p'))
    return 0;
  DBG("=== perf test ===");
  g_ndb = new Ndb("TEST_DB");
  CHK(g_ndb->init() == 0);
  CHK(g_ndb->waitUntilReady() == 0);
  g_dic = g_ndb->getDictionary();
  NdbDictionary::Table tab(g_opt.m_tnameperf);
  if (g_dic->getTable(tab.getName()) != 0)
    CHK(g_dic->dropTable(tab) == 0);
  // col A - pk
  { NdbDictionary::Column col("A");
    col.setType(NdbDictionary::Column::Unsigned);
    col.setPrimaryKey(true);
    tab.addColumn(col);
  }
  // col B - char 20
  { NdbDictionary::Column col("B");
    col.setType(NdbDictionary::Column::Char);
    col.setLength(20);
    col.setNullable(true);
    tab.addColumn(col);
  }
  // col C - text
  { NdbDictionary::Column col("C");
    col.setType(NdbDictionary::Column::Text);
    col.setInlineSize(20);
    col.setPartSize(512);
    col.setStripeSize(1);
    col.setNullable(true);
    tab.addColumn(col);
  }
  // create
  CHK(g_dic->createTable(tab) == 0);
  Uint32 cA = 0, cB = 1, cC = 2;
  // timers
  Tmr t1;
  Tmr t2;
  // insert char (one trans)
  {
    DBG("--- insert char ---");
    t1.on();
    CHK((g_con = g_ndb->startTransaction()) != 0);
    for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
      CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
      CHK(g_opr->insertTuple() == 0);
      CHK(g_opr->equal(cA, (char*)&k) == 0);
      CHK(g_opr->setValue(cB, "b") == 0);
      CHK(g_con->execute(NoCommit) == 0);
    }
    t1.off(g_opt.m_rowsperf);
    CHK(g_con->execute(Rollback) == 0);
    DBG(t1.time());
    g_opr = 0;
    g_con = 0;
  }
  // insert text (one trans)
  {
    DBG("--- insert text ---");
    t2.on();
    CHK((g_con = g_ndb->startTransaction()) != 0);
    for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
      CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
      CHK(g_opr->insertTuple() == 0);
      CHK(g_opr->equal(cA, (char*)&k) == 0);
      CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
      CHK((g_bh1->setValue("c", 1) == 0));
      CHK(g_con->execute(NoCommit) == 0);
    }
    t2.off(g_opt.m_rowsperf);
    CHK(g_con->execute(Rollback) == 0);
    DBG(t2.time());
    g_bh1 = 0;
    g_opr = 0;
    g_con = 0;
  }
  // insert overhead
  DBG("insert overhead: " << t2.over(t1));
  t1.clr();
  t2.clr();
  // insert
  {
    DBG("--- insert for read test ---");
    unsigned n = 0;
    CHK((g_con = g_ndb->startTransaction()) != 0);
    for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
      CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
      CHK(g_opr->insertTuple() == 0);
      CHK(g_opr->equal(cA, (char*)&k) == 0);
      CHK(g_opr->setValue(cB, "b") == 0);
      CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
      CHK((g_bh1->setValue("c", 1) == 0));
      if (++n == g_opt.m_batch) {
        CHK(g_con->execute(Commit) == 0);
        g_ndb->closeTransaction(g_con);
        CHK((g_con = g_ndb->startTransaction()) != 0);
        n = 0;
      }
    }
    if (n != 0) {
      CHK(g_con->execute(Commit) == 0);
      n = 0;
    }
    g_bh1 = 0;
    g_opr = 0;
    g_con = 0;
  }
  // pk read char (one trans)
  {
    DBG("--- pk read char ---");
    CHK((g_con = g_ndb->startTransaction()) != 0);
    Uint32 a;
    char b[20];
    t1.on();
    for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
      CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
      CHK(g_opr->readTuple() == 0);
      CHK(g_opr->equal(cA, (char*)&k) == 0);
      CHK(g_opr->getValue(cA, (char*)&a) != 0);
      CHK(g_opr->getValue(cB, b) != 0);
      a = (Uint32)-1;
      b[0] = 0;
      CHK(g_con->execute(NoCommit) == 0);
      CHK(a == k && strcmp(b, "b") == 0);
    }
    CHK(g_con->execute(Commit) == 0);
    t1.off(g_opt.m_rowsperf);
    DBG(t1.time());
    g_opr = 0;
    g_con = 0;
  }
  // pk read text (one trans)
  {
    DBG("--- pk read text ---");
    CHK((g_con = g_ndb->startTransaction()) != 0);
    Uint32 a;
    char c[20];
    t2.on();
    for (Uint32 k = 0; k < g_opt.m_rowsperf; k++) {
      CHK((g_opr = g_con->getNdbOperation(tab.getName())) != 0);
      CHK(g_opr->readTuple() == 0);
      CHK(g_opr->equal(cA, (char*)&k) == 0);
      CHK(g_opr->getValue(cA, (char*)&a) != 0);
      CHK((g_bh1 = g_opr->getBlobHandle(cC)) != 0);
      a = (Uint32)-1;
      c[0] = 0;
      CHK(g_con->execute(NoCommit) == 0);
      Uint32 m = 20;
      CHK(g_bh1->readData(c, m) == 0);
      CHK(a == k && m == 1 && strcmp(c, "c") == 0);
    }
    CHK(g_con->execute(Commit) == 0);
    t2.off(g_opt.m_rowsperf);
    DBG(t2.time());
    g_opr = 0;
    g_con = 0;
  }
  // pk read overhead
  DBG("pk read overhead: " << t2.over(t1));
  t1.clr();
  t2.clr();
  // scan read char
  {
    DBG("--- scan read char ---");
    NdbResultSet* rs;
    Uint32 a;
    char b[20];
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
    CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Read)) != 0);
    CHK(g_ops->getValue(cA, (char*)&a) != 0);
    CHK(g_ops->getValue(cB, b) != 0);
    CHK(g_con->execute(NoCommit) == 0);
    unsigned n = 0;
    t1.on();
    while (1) {
      a = (Uint32)-1;
      b[0] = 0;
      int ret;
      CHK((ret = rs->nextResult(true)) == 0 || ret == 1);
      if (ret == 1)
        break;
      CHK(a < g_opt.m_rowsperf && strcmp(b, "b") == 0);
      n++;
    }
    CHK(n == g_opt.m_rowsperf);
    t1.off(g_opt.m_rowsperf);
    DBG(t1.time());
    g_ops = 0;
    g_con = 0;
  }
  // scan read text
  {
    DBG("--- read text ---");
    NdbResultSet* rs;
    Uint32 a;
    char c[20];
    CHK((g_con = g_ndb->startTransaction()) != 0);
    CHK((g_ops = g_con->getNdbScanOperation(tab.getName())) != 0);
    CHK((rs = g_ops->readTuples(NdbScanOperation::LM_Read)) != 0);
    CHK(g_ops->getValue(cA, (char*)&a) != 0);
    CHK((g_bh1 = g_ops->getBlobHandle(cC)) != 0);
    CHK(g_con->execute(NoCommit) == 0);
    unsigned n = 0;
    t2.on();
    while (1) {
      a = (Uint32)-1;
      c[0] = 0;
      int ret;
      CHK((ret = rs->nextResult(true)) == 0 || ret == 1);
      if (ret == 1)
        break;
      Uint32 m = 20;
      CHK(g_bh1->readData(c, m) == 0);
      CHK(a < g_opt.m_rowsperf && m == 1 && strcmp(c, "c") == 0);
      n++;
    }
    CHK(n == g_opt.m_rowsperf);
    t2.off(g_opt.m_rowsperf);
    DBG(t2.time());
    g_bh1 = 0;
    g_ops = 0;
    g_con = 0;
  }
  // scan read overhead
  DBG("scan read overhead: " << t2.over(t1));
  t1.clr();
  t2.clr();
  delete g_ndb;
  return 0;
}

// bug tests

static int
@@ -1498,6 +1793,12 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
	continue;
      }
    }
    if (strcmp(arg, "-rowsperf") == 0) {
      if (++argv, --argc > 0) {
	g_opt.m_rowsperf = atoi(argv[0]);
	continue;
      }
    }
    if (strcmp(arg, "-seed") == 0) {
      if (++argv, --argc > 0) {
	g_opt.m_seed = atoi(argv[0]);
@@ -1558,7 +1859,7 @@ NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)
    strcat(b, "r");
    g_opt.m_skip = strdup(b);
  }
  if (testmain() == -1) {
  if (testmain() == -1 || testperf() == -1) {
    ndbout << "line " << __LINE__ << " FAIL loop=" << g_loop << endl;
    return NDBT_ProgramExit(NDBT_FAILED);
  }