Commit 7412739f authored by unknown's avatar unknown
Browse files

Bug #18491 cluster: node restart with pending dropeed events causes failed restart


storage/ndb/src/kernel/blocks/suma/Suma.cpp:
  Bug #18491 cluster: node restart with pending dropeed events causes failed restart
  - recreate subscriptions in correct state (dropped if dropped...)
  - handle recreated subscribers with drooped table (tab_inforef)
parent c04cf137
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ struct SubCreateReq {
  
  friend bool printSUB_CREATE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
  STATIC_CONST( SignalLength = 6 );
  STATIC_CONST( SignalLength2 = 7 );
  
  enum SubscriptionType {
    SingleTableScan  = 1,  // 
@@ -50,6 +51,7 @@ struct SubCreateReq {
  Uint32 subscriptionKey;
  Uint32 subscriptionType;
  Uint32 tableId;
  Uint32 state;
};

struct SubCreateRef {
+118 −13
Original line number Diff line number Diff line
@@ -1040,6 +1040,15 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
  const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
    Subscription::REPORT_SUBSCRIBE : 0;
  const Uint32 tableId = req.tableId;
  Subscription::State state = (Subscription::State) req.state;
  if (signal->getLength() != SubCreateReq::SignalLength2)
  {
    /*
      api or restarted by older version
      if restarted by old version, do the best we can
    */
    state = Subscription::DEFINED;
  }

  Subscription key;
  key.m_subscriptionId  = subId;
@@ -1067,6 +1076,17 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
      addTableId(req.tableId, subPtr, 0);
    }
  } else {
    if (c_startup.m_restart_server_node_id && 
        refToNode(subRef) != c_startup.m_restart_server_node_id)
    {
      /**
       * only allow "restart_server" Suma's to come through 
       * for restart purposes
       */
      jam();
      sendSubStartRef(signal, 1405);
      DBUG_VOID_RETURN;
    }
    // Check that id/key is unique
    if(c_subscriptions.find(subPtr, key)) {
      jam();
@@ -1090,7 +1110,7 @@ Suma::execSUB_CREATE_REQ(Signal* signal)
    subPtr.p->m_options          = reportSubscribe | reportAll;
    subPtr.p->m_tableId          = tableId;
    subPtr.p->m_table_ptrI       = RNIL;
    subPtr.p->m_state            = Subscription::DEFINED;
    subPtr.p->m_state            = state;
    subPtr.p->n_subscribers      = 0;
    subPtr.p->m_current_sync_ptrI = RNIL;

@@ -1446,7 +1466,9 @@ Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbP
  jam();
  DBUG_ENTER("Suma::completeOneSubscriber");

  if (tabPtr.p->m_error)
  if (tabPtr.p->m_error &&
      (c_startup.m_restart_server_node_id == 0 ||
       tabPtr.p->m_state != Table::DROPPED))
  {
    sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
		    SubscriptionData::TableData);
@@ -1531,9 +1553,45 @@ Suma::completeInitTable(Signal *signal, TablePtr tabPtr)
void
Suma::execGET_TABINFOREF(Signal* signal){
  jamEntry();
  /* ToDo handle this */
  GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
  Uint32 tableId = ref->tableId;
  Uint32 senderData = ref->senderData;
  GetTabInfoRef::ErrorCode errorCode =
    (GetTabInfoRef::ErrorCode) ref->errorCode;
  int do_resend_request = 0;
  TablePtr tabPtr;
  c_tablePool.getPtr(tabPtr, senderData);
  switch (errorCode)
  {
  case GetTabInfoRef::TableNotDefined:
    // wrong state
    break;
  case GetTabInfoRef::InvalidTableId:
    // no such table
    break;
  case GetTabInfoRef::Busy:
    do_resend_request = 1;
    break;
  case GetTabInfoRef::TableNameTooLong:
    ndbrequire(false);
  }
  if (do_resend_request)
  {
    GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
    req->senderRef = reference();
    req->senderData = senderData;
    req->requestType = 
      GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
    req->tableId = tableId;
    sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
                        30, GetTabInfoReq::SignalLength);
    return;
  }
  tabPtr.p->m_state = Table::DROPPED;
  tabPtr.p->m_error = errorCode;
  completeAllSubscribers(signal, tabPtr);
  completeInitTable(signal, tabPtr);
}

void
Suma::execGET_TABINFO_CONF(Signal* signal){
@@ -2173,13 +2231,24 @@ Suma::execSUB_START_REQ(Signal* signal){
    DBUG_VOID_RETURN;
  }
  
  if (subPtr.p->m_state != Subscription::DEFINED) {
  if (subPtr.p->m_state == Subscription::LOCKED) {
    jam();
    DBUG_PRINT("info",("Locked"));
    sendSubStartRef(signal, 1411);
    DBUG_VOID_RETURN;
  }

  if (subPtr.p->m_state == Subscription::DROPPED &&
      c_startup.m_restart_server_node_id == 0) {
    jam();
    DBUG_PRINT("info",("Dropped"));
    sendSubStartRef(signal, 1418);
    DBUG_VOID_RETURN;
  }

  ndbrequire(subPtr.p->m_state == Subscription::DEFINED ||
             c_startup.m_restart_server_node_id);

  SubscriberPtr subbPtr;
  if(!c_subscriberPool.seize(subbPtr)){
    jam();
@@ -2193,6 +2262,7 @@ Suma::execSUB_START_REQ(Signal* signal){
  c_subscriber_nodes.set(refToNode(subscriberRef));

  // setup subscription record
  if (subPtr.p->m_state == Subscription::DEFINED)
    subPtr.p->m_state = Subscription::LOCKED;
  // store these here for later use
  subPtr.p->m_senderRef  = senderRef;
@@ -2241,8 +2311,14 @@ Suma::sendSubStartComplete(Signal* signal,

  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
  ndbrequire( subPtr.p->m_state == Subscription::LOCKED )
  ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
             (subPtr.p->m_state == Subscription::DROPPED &&
              c_startup.m_restart_server_node_id));
  if (subPtr.p->m_state == Subscription::LOCKED)
  {
    jam();
    subPtr.p->m_state = Subscription::DEFINED;
  }
  subPtr.p->n_subscribers++;

  DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
@@ -2293,8 +2369,14 @@ Suma::sendSubStartRef(Signal* signal,
  SubscriptionPtr subPtr;
  c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);

  ndbrequire( subPtr.p->m_state == Subscription::LOCKED );
  ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
             (subPtr.p->m_state == Subscription::DROPPED &&
              c_startup.m_restart_server_node_id));
  if (subPtr.p->m_state == Subscription::LOCKED)
  {
    jam();
    subPtr.p->m_state = Subscription::DEFINED;
  }

  SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
  ref->senderRef        = reference();
@@ -2360,6 +2442,18 @@ Suma::execSUB_STOP_REQ(Signal* signal){
    DBUG_VOID_RETURN;
  }
  
  if (c_startup.m_restart_server_node_id && 
      refToNode(senderRef) != c_startup.m_restart_server_node_id)
  {
    /**
     * only allow "restart_server" Suma's to come through 
     * for restart purposes
     */
    jam();
    sendSubStopRef(signal, 1405);
    DBUG_VOID_RETURN;
  }

  if (subPtr.p->m_state == Subscription::LOCKED) {
    jam();
    DBUG_PRINT("error", ("locked"));
@@ -3668,7 +3762,17 @@ Suma::execSUB_REMOVE_REQ(Signal* signal)
    sendSubRemoveRef(signal, req, 1413);
    DBUG_VOID_RETURN;
  }
  if (subPtr.p->m_state == Subscription::DROPPED)
  {
    /**
     * already dropped
     */
    jam();
    sendSubRemoveRef(signal, req, 1419);
    DBUG_VOID_RETURN;
  }

  ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
  DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));

  if (subPtr.p->n_subscribers == 0)
@@ -3981,8 +4085,9 @@ Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
  case SubCreateReq::TableEvent:
    jam();
    req->tableId = subPtr.p->m_tableId;
    req->state = subPtr.p->m_state;
    suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
		    SubCreateReq::SignalLength, JBB);
		    SubCreateReq::SignalLength2, JBB);
    DBUG_VOID_RETURN;
  case SubCreateReq::SingleTableScan:
    jam();
+2 −0
Original line number Diff line number Diff line
@@ -475,6 +475,8 @@ ErrorBundle ErrorCodes[] = {
  { 1415, DMEC, SE, "Subscription not unique in subscriber manager" },
  { 1416, DMEC, IS, "Can't accept more subscriptions, out of space in pool" },
  { 1417, DMEC, SE, "Table in suscription not defined, probably dropped" },
  { 1418, DMEC, SE, "Subscription dropped, no new subscribers allowed" },
  { 1419, DMEC, SE, "Subscription already dropped" },

  { 4004, DMEC, AE, "Attribute name not found in the Table" },
  
+145 −1
Original line number Diff line number Diff line
@@ -101,6 +101,40 @@ static int dropEvent(Ndb *pNdb, const NdbDictionary::Table &tab)
  return NDBT_OK;
}
 
static
NdbEventOperation *createEventOperation(Ndb *ndb,
                                        const NdbDictionary::Table &tab,
                                        int do_report_error = 1)
{
  char buf[1024];
  sprintf(buf, "%s_EVENT", tab.getName());
  NdbEventOperation *pOp= ndb->createEventOperation(buf);
  if (pOp == 0)
  {
    if (do_report_error)
      g_err << "createEventOperation: "
            << ndb->getNdbError().code << " "
            << ndb->getNdbError().message << endl;
    return 0;
  }
  int n_columns= tab.getNoOfColumns();
  for (int j = 0; j < n_columns; j++)
  {
    pOp->getValue(tab.getColumn(j)->getName());
    pOp->getPreValue(tab.getColumn(j)->getName());
  }
  if ( pOp->execute() )
  {
    if (do_report_error)
      g_err << "pOp->execute(): "
            << pOp->getNdbError().code << " "
            << pOp->getNdbError().message << endl;
    ndb->dropEventOperation(pOp);
    return 0;
  }
  return pOp;
}

static int runCreateEvent(NDBT_Context* ctx, NDBT_Step* step)
{
  if (createEvent(GETNDB(step),* ctx->getTab()) != 0){
@@ -870,7 +904,7 @@ static int createAllEvents(NDBT_Context* ctx, NDBT_Step* step)

static int dropAllEvents(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("createAllEvents");
  DBUG_ENTER("dropAllEvents");
  Ndb * ndb= GETNDB(step);
  int i;

@@ -1212,6 +1246,18 @@ static int createEventOperations(Ndb * ndb)
  DBUG_RETURN(NDBT_OK);
}

static int createAllEventOperations(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("createAllEventOperations");
  Ndb * ndb= GETNDB(step);
  int r= createEventOperations(ndb);
  if (r != NDBT_OK)
  {
    DBUG_RETURN(NDBT_FAILED);
  }
  DBUG_RETURN(NDBT_OK);
}

static int dropEventOperations(Ndb * ndb)
{
  DBUG_ENTER("dropEventOperations");
@@ -1228,6 +1274,18 @@ static int dropEventOperations(Ndb * ndb)
  DBUG_RETURN(NDBT_OK);
}

static int dropAllEventOperations(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("dropAllEventOperations");
  Ndb * ndb= GETNDB(step);
  int r= dropEventOperations(ndb);
  if (r != NDBT_OK)
  {
    DBUG_RETURN(NDBT_FAILED);
  }
  DBUG_RETURN(NDBT_OK);
}

static int runMulti(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("runMulti");
@@ -1409,6 +1467,87 @@ static int runMulti_NR(NDBT_Context* ctx, NDBT_Step* step)
  DBUG_RETURN(NDBT_OK);
}

static int restartAllNodes()
{
  NdbRestarter restarter;
  int id = 0;
  do {
    int nodeId = restarter.getDbNodeId(id++);
    ndbout << "Restart node " << nodeId << endl; 
    if(restarter.restartOneDbNode(nodeId, false, false, true) != 0){
      g_err << "Failed to restartNextDbNode" << endl;
      break;
    }    
    if(restarter.waitClusterStarted(60) != 0){
      g_err << "Cluster failed to start" << endl;
      break;
    }
    id = id % restarter.getNumDbNodes();
  } while (id);
  return id != 0;
}

static int runCreateDropNR(NDBT_Context* ctx, NDBT_Step* step)
{
  DBUG_ENTER("runCreateDropNR");
  Ndb * ndb= GETNDB(step);
  int result = NDBT_OK;
  NdbRestarter restarter;
  int loops = ctx->getNumLoops();

  if (restarter.getNumDbNodes() < 2)
  {
    ctx->stopTest();
    return NDBT_OK;
  }
  do
  {
    result = NDBT_FAILED;
    const NdbDictionary::Table* pTab = ctx->getTab();
    if (createEvent(ndb, *pTab))
    {
      g_err << "createEvent failed" << endl;
      break;
    }
    NdbEventOperation *pOp= createEventOperation(ndb, *pTab);
    if (pOp == 0)
    {
      g_err << "Failed to createEventOperation" << endl;
      break;
    }
    if (dropEvent(ndb, *pTab))
    {
      g_err << "Failed to dropEvent()" << endl;
      break;
    }
    ndbout << "Restarting with dropped events with subscribers" << endl;
    if (restartAllNodes())
      break;
    if (ndb->getDictionary()->dropTable(pTab->getName()) != 0){
      g_err << "Failed to drop " << pTab->getName() <<" in db" << endl;
      break;
    }
    ndbout << "Restarting with dropped events and dropped "
           << "table with subscribers" << endl;
    if (restartAllNodes())
      break;
    if (ndb->dropEventOperation(pOp))
    {
      g_err << "Failed dropEventOperation" << endl;
      break;
    }
    NdbDictionary::Table tmp(*pTab);
    tmp.setNodeGroupIds(0, 0);
    if (ndb->getDictionary()->createTable(tmp) != 0){
      g_err << "createTable failed: "
            << ndb->getDictionary()->getNdbError() << endl;
      break;
    }
    result = NDBT_OK;
  } while (--loops);

  DBUG_RETURN(result);
}

NDBT_TESTSUITE(test_event);
TESTCASE("BasicEventOperation", 
@@ -1492,6 +1631,11 @@ TESTCASE("Multi_NR",
  FINALIZER(dropAllShadows);
  FINALIZER(dropAllEvents);
}
TESTCASE("CreateDropNR", 
	 "Verify that we can Create and Drop in any order"
	 "NOTE! No errors are allowed!" ){
  FINALIZER(runCreateDropNR);
}
NDBT_TESTSUITE_END(test_event);

int main(int argc, const char** argv){
+5 −0
Original line number Diff line number Diff line
@@ -218,6 +218,11 @@ max-time: 2500
cmd: test_event
args: -n Multi

#
max-time: 2500
cmd: test_event
args: -n CreateDropNR -l 2

max-time: 600
cmd: testBasic
args: -n PkRead T1