Commit 758860dd authored by unknown's avatar unknown
Browse files

ndb - wl#2972 (5.1) fix detached trigger opType


storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  set non-update pk pre data to UNDEFINED (previously unspecified)
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp:
  fix detached trigger opType
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  fix detached trigger opType
storage/ndb/test/ndbapi/test_event_merge.cpp:
  exact verif post/pre data
parent a23e2eef
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -448,7 +448,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
  if(!regOperPtr.p->is_first_operation())
  {
    /**
     * Out of order commit 
     * Out of order commit   XXX check effect on triggers
     */
    fix_commit_order(regOperPtr);
  }
+10 −1
Original line number Diff line number Diff line
@@ -559,8 +559,9 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,

  /**
   * Set correct operation type and fix change mask
   * Note ALLOC is set in "orig" tuple
   */
  if(req_struct->m_tuple_ptr->m_header_bits & Tuple_header::ALLOC)
  if(save_ptr->m_header_bits & Tuple_header::ALLOC)
  {
    if(save == ZDELETE)
    {
@@ -571,6 +572,14 @@ Dbtup::fireDetachedTriggers(KeyReqStruct *req_struct,
    }
    regOperPtr->op_struct.op_type = ZINSERT;
  }
  else if (save == ZINSERT)
  /**
   * Tuple was not created but last op is INSERT.
   * This is possible only on DELETE + INSERT
   */
  {
    regOperPtr->op_struct.op_type = ZUPDATE;
  }

  ndbrequire(regOperPtr->is_first_operation());
  triggerList.first(trigPtr);
+3 −3
Original line number Diff line number Diff line
@@ -376,10 +376,10 @@ NdbEventOperationImpl::receive_event()
	     AttributeHeader(*aAttrPtr).getAttributeId());
      receive_data(tAttr, aDataPtr, tDataSz);
      if (is_update)
      {
	receive_data(tAttr1, aDataPtr, tDataSz);
      else
        tAttr1->setUNDEFINED(); // do not leave unspecified
      tAttr1= tAttr1->next();
      }
      // next
      aAttrPtr++;
      aDataPtr+= (tDataSz + 3) >> 2;
+100 −70
Original line number Diff line number Diff line
@@ -47,8 +47,8 @@
 * There are 5 ways (ignoring NUL operand) to compose 2 ops:
 *                      5.0 bugs        5.1 bugs
 * INS o DEL = NUL
 * INS o UPD = INS                       5.1
 * DEL o INS = UPD      type=INS         5.1
 * INS o UPD = INS                      type=INS
 * DEL o INS = UPD      type=INS        type=INS
 * UPD o DEL = DEL      no event
 * UPD o UPD = UPD
 */
@@ -78,6 +78,7 @@ static NdbOperation* g_op = 0;
static const char* g_tabname = "tem1";
static const char* g_evtname = "tem1ev1";
static const uint g_charlen = 5;
static const char* g_charval = "abcde";
static const char* g_csname = "latin1_swedish_ci";

static const NdbDictionary::Table* g_tab = 0;
@@ -229,9 +230,9 @@ createtable()
    chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
    chkdb(g_op->insertTuple() == 0);
    Uint32 pk1;
    char pk2[g_charlen];
    char pk2[g_charlen + 1];
    pk1 = g_maxpk;
    memset(pk2, 0x20, g_charlen);
    sprintf(pk2, "%-*u", g_charlen, pk1);
    chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
    chkdb(g_op->equal("pk2", (char*)&pk2[0]) == 0);
    chkdb(g_con->execute(Commit) == 0);
@@ -256,9 +257,9 @@ droptable()

struct Data {
  Uint32 pk1;
  char pk2[g_charlen];
  char pk2[g_charlen + 1];
  Uint32 seq;
  char cc1[g_charlen];
  char cc1[g_charlen + 1];
  void* ptr[g_ncol];
  int ind[g_ncol]; // -1 = no data, 1 = NULL, 0 = not NULL
  void init() {
@@ -276,6 +277,30 @@ struct Data {
  }
};

static int
cmpdata(const Data& d1, const Data& d2)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    if (d1.ind[i] != d2.ind[i])
      return 1;
    if (d1.ind[i] == 0 && memcmp(d1.ptr[i], d2.ptr[i], c.size) != 0)
      return 1;
  }
  return 0;
}

static int
cmpdata(const Data (&d1)[2], const Data (&d2)[2])
{
  if (cmpdata(d1[0], d2[0]) != 0)
    return 1;
  if (cmpdata(d1[1], d2[1]) != 0)
    return 1;
  return 0;
}

static NdbOut&
operator<<(NdbOut& out, const Data& d)
{
@@ -304,7 +329,7 @@ operator<<(NdbOut& out, const Data& d)
            break;
          n--;
        }
        out << buf;
        out << "'" << buf << "'";
      }
      break;
    default:
@@ -370,7 +395,7 @@ operator<<(NdbOut& out, Op::Type t)
static NdbOut&
operator<<(NdbOut& out, const Op& op)
{
  out << "t=" << op.type;
  out << op.type;
  out << " " << op.data[0];
  out << " [" << op.data[1] << "]";
  return out;
@@ -504,13 +529,9 @@ checkop(const Op* op, Uint32& pk1)
        if (t == Op::INS) {
          chkrc(d[1].ind[i] == -1);
        } else if (t == Op::DEL) {
#ifdef ndb_event_cares_about_pk_pre_data
          chkrc(d[1].ind[i] == -1);
#endif
        } else {
#ifdef ndb_event_cares_about_pk_pre_data
          chkrc(d[1].ind[i] == 0);
#endif
        }
      } else {
        if (t == Op::INS) {
@@ -547,43 +568,24 @@ copycol(const Col& c, const Data& d1, Data& d3)
}

static void
copykeys(const Data& d1, Data& d3)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    if (c.pk)
      copycol(c, d1, d3);
  }
}

static void
copydata(const Data& d1, Data& d3)
copydata(const Data& d1, Data& d3, bool pk, bool nonpk)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    if (c.pk && pk || ! c.pk && nonpk)
      copycol(c, d1, d3);
  }
}

static void
copyop(const Op* op1, Op* op3)
{
  op3->type = op1->type;
  copydata(op1->data[0], op3->data[0]);
  copydata(op1->data[1], op3->data[1]);
  Uint32 pk1_tmp;
  reqrc(checkop(op3, pk1_tmp) == 0);
}

// not needed for ops
static void
compdata(const Data& d1, const Data& d2, Data& d3) // d2 overrides d1
compdata(const Data& d1, const Data& d2, Data& d3, bool pk, bool nonpk)
{
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = g_col[i];
    if (c.pk && pk || ! c.pk && nonpk) {
      const Data* d = 0;
      if (d1.ind[i] == -1 && d2.ind[i] == -1)
        d3.ind[i] = -1;
@@ -597,6 +599,17 @@ compdata(const Data& d1, const Data& d2, Data& d3) // d2 overrides d1
        copycol(c, *d, d3);
    }
  }
}

static void
copyop(const Op* op1, Op* op3)
{
  op3->type = op1->type;
  copydata(op1->data[0], op3->data[0], true, true);
  copydata(op1->data[1], op3->data[1], true, true);
  Uint32 pk1_tmp;
  reqrc(checkop(op3, pk1_tmp) == 0);
}

static int
compop(const Op* op1, const Op* op2, Op* op3) // op1 o op2 = op3
@@ -612,11 +625,14 @@ compop(const Op* op1, const Op* op2, Op* op3) // op1 o op2 = op3
  }
  chkrc((comp = comptype(op1->type, op2->type)) != 0);
  op3->type = comp->t3;
  copykeys(op2->data[0], op3->data[0]);
  // sucks
  copydata(op2->data[0], op3->data[0], true, false);
  if (op3->type != Op::DEL)
    copydata(op2->data[0], op3->data[0]);
    copydata(op2->data[0], op3->data[0], false, true);
  if (op3->type != Op::INS)
    copydata(op1->data[1], op3->data[1]);
    copydata(op1->data[1], op3->data[1], false, true);
  if (op3->type == Op::UPD)
    copydata(op2->data[0], op3->data[1], true, false);
  Uint32 pk1_tmp;
  reqrc(checkop(op3, pk1_tmp) == 0);
  // not eliminating identical post-pre fields
@@ -705,9 +721,9 @@ waitgci() // wait for event to be installed and for at least 1 GCI to pass
    chkdb((g_con = g_ndb->startTransaction()) != 0);
    { // forced to exec a dummy op
      Uint32 pk1;
      char pk2[g_charlen];
      char pk2[g_charlen + 1];
      pk1 = g_maxpk;
      memset(pk2, 0x20, g_charlen);
      sprintf(pk2, "%-*u", g_charlen, pk1);
      chkdb((g_op = g_con->getNdbOperation(g_tabname)) != 0);
      chkdb(g_op->readTuple() == 0);
      chkdb(g_op->equal("pk1", (char*)&pk1) == 0);
@@ -723,6 +739,7 @@ waitgci() // wait for event to be installed and for at least 1 GCI to pass
      break;
    }
    i = 1;
    sleep(1);
  }
  return 0;
}
@@ -732,11 +749,14 @@ makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
{
  op->type = t;
  if (t != Op::INS)
    copydata(prev_op->data[0], op->data[1]);
    copydata(prev_op->data[0], op->data[1], true, true);
  uint i;
  for (i = 0; i < g_ncol; i++) {
    const Col& c = getcol(i);
    Data (&d)[2] = op->data;
    if (c.pk && t == Op::DEL) {
      d[1].ind[i] = -1;
    }
    if (i == getcol("pk1").no) {
      d[0].pk1 = pk1;
      d[0].ind[i] = 0;
@@ -758,7 +778,7 @@ makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
    }
    uint u;
    u = urandom(100);
    if (c.nullable && u < 20) {
    if (c.nullable && u < 10) {
      d[0].ind[i] = 1;
      continue;
    }
@@ -776,8 +796,8 @@ makeop(Op* op, Uint32 pk1, Op::Type t, const Op* prev_op)
        char* p = (char*)d[0].ptr[i];
        uint j;
        for (j = 0; j < g_charlen; j++) {
          uint v = urandom(3);
          p[j] = j < u ? "abcde"[v] : 0x20;
          uint v = urandom(strlen(g_charval));
          p[j] = j < u ? g_charval[v] : 0x20;
        }
      }
      break;
@@ -993,9 +1013,9 @@ static int
matchevent(Op* ev)
{
  Op::Type t = ev->type;
  Data (&d)[2] = ev->data;
  Data (&d2)[2] = ev->data;
  // get PK
  Uint32 pk1 = d[0].pk1;
  Uint32 pk1 = d2[0].pk1;
  chkrc(pk1 < g_opts.maxpk);
  // on error repeat and print details
  uint loop = 0;
@@ -1016,30 +1036,35 @@ matchevent(Op* ev)
        op = op->next_op;
      }
      if (com_op->type != Op::NUL) {
        if (com_op->type == t) {
          const Data (&d2)[2] = com_op->data;
          if (t == Op::INS && d2[0].seq == d[0].seq ||
              t == Op::DEL && d2[1].seq == d[1].seq ||
              t == Op::UPD && d2[0].seq == d[0].seq) {
            if (cnt == g_ev_cnt[pk1]) {
              if (! com_op->match) {
                ll2("match pos " << cnt);
        const Data (&d1)[2] = com_op->data;
        if (cmpdata(d1, d2) == 0) {
          bool tmpok = true;
          if (com_op->type != t) {
            ll2("***: wrong type " << com_op->type << " != " << t);
            tmpok = false;
          }
          if (com_op->match) {
            ll2("***: duplicate match");
            tmpok = false;
          }
          if (cnt != g_ev_cnt[pk1]) {
            ll2("***: wrong pos " << cnt << " != " << g_ev_cnt[pk1]);
            tmpok = false;
          }
          if (tmpok) {
            ok = com_op->match = true;
              } else {
                ll2("duplicate match");
              }
            } else {
              ll2("match bad pos event=" << g_ev_cnt[pk1] << " op=" << cnt);
            }
            ll2("===: match");
          }
        }
        cnt++;
      }
      com_op = com_op->next_com;
    }
    if (ok)
    if (ok) {
      ll1("matchevent: match");
      return 0;
    ll2("no match");
    }
    ll1("matchevent: ERROR: no match");
    if (g_loglevel >= 2)
      return -1;
    loop++;
@@ -1100,12 +1125,13 @@ runevents()
{
  ll1("runevents");
  NdbEventOperation* evt_op;
  uint npoll = 3;
  uint mspoll = 1000;
  uint npoll = 7; // strangely long delay
  while (npoll != 0) {
    npoll--;
    int ret;
    ll1("poll");
    ret = g_ndb->pollEvents(1000);
    ret = g_ndb->pollEvents(mspoll);
    if (ret <= 0)
      continue;
    while (1) {
@@ -1180,7 +1206,7 @@ runtest()
  chkrc(createtable() == 0);
  chkrc(createevent() == 0);
  uint n;
  for (n = 0; n < g_opts.loop; n++) {
  for (n = 0; g_opts.loop == 0 || n < g_opts.loop; n++) {
    ll0("loop " << n);
    setseed(n);
    resetmem();
@@ -1280,6 +1306,10 @@ main(int argc, char** argv)
        return NDBT_ProgramExit(NDBT_OK);
    }
  }
  if (g_evt_op != 0) {
    (void)dropeventop();
    g_evt_op = 0;
  }
  delete g_ndb;
  delete g_ncc;
  return NDBT_ProgramExit(NDBT_FAILED);