Loading storage/ndb/src/kernel/blocks/suma/Suma.cpp +50 −26 Original line number Diff line number Diff line Loading @@ -758,6 +758,17 @@ Suma::execNODE_FAILREP(Signal* signal){ Restart.resetRestart(signal); } if (ERROR_INSERTED(13032)) { Uint32 node = c_subscriber_nodes.find(0); if (node != NodeBitmask::NotFound) { ndbout_c("Inserting API_FAILREQ node: %u", node); signal->theData[0] = node; EXECUTE_DIRECT(QMGR, GSN_API_FAILREQ, signal, 1); } } signal->theData[0] = SumaContinueB::RESEND_BUCKET; NdbNodeBitmask tmp; Loading Loading @@ -3463,6 +3474,9 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) f_bufferLock = 0; b_bufferLock = 0; ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0); Uint32 tableId = tabPtr.p->m_tableId; Uint32 bucket= hashValue % c_no_of_buckets; m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); if(m_active_buckets.get(bucket) || Loading @@ -3485,7 +3499,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; data->gci = gci; data->tableId = tabPtr.p->m_tableId; data->tableId = tableId; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); data->logType = 0; Loading @@ -3508,10 +3522,11 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) else { Uint32* dst; Uint32 sz = f_trigBufferSize + b_trigBufferSize + 2; Uint32 sz = f_trigBufferSize + b_trigBufferSize + 3; if((dst = get_buffer_ptr(signal, bucket, gci, sz))) { * dst++ = tabPtr.i; * dst++ = tableId; * dst++ = tabPtr.p->m_schemaVersion; * dst++ = (event << 16) | f_trigBufferSize; memcpy(dst, f_buffer, f_trigBufferSize << 2); dst += f_trigBufferSize; Loading Loading @@ -3643,6 +3658,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* signal) if(c_buckets[i].m_buffer_tail != RNIL) { //Uint32* dst; get_buffer_ptr(signal, i, gci, 0); } } Loading Loading @@ -3987,6 +4003,9 @@ void Suma::completeSubRemove(SubscriptionPtr subPtr) { DBUG_ENTER("Suma::completeSubRemove"); //Uint32 subscriptionId = subPtr.p->m_subscriptionId; //Uint32 subscriptionKey = subPtr.p->m_subscriptionKey; c_subscriptions.release(subPtr); DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d", c_subscriptionPool.getSize(), Loading Loading @@ -5003,27 +5022,30 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, { g_cnt++; Uint32 table = * src++ ; Uint32 schemaVersion = * src++; Uint32 event = * src >> 16; Uint32 sz_1 = (* src ++) & 0xFFFF; ndbassert(sz - 2 >= sz_1); ndbassert(sz - 3 >= sz_1); LinearSectionPtr ptr[3]; const Uint32 nptr= reformat(signal, ptr, src, sz_1, src + sz_1, sz - 2 - sz_1); src + sz_1, sz - 3 - sz_1); Uint32 ptrLen= 0; for(Uint32 i =0; i < nptr; i++) ptrLen+= ptr[i].sz; /** * Signal to subscriber(s) */ Ptr<Table> tabPtr; ndbrequire((tabPtr.p = c_tablePool.getPtr(table)) != 0); if (c_tables.find(tabPtr, table) && tabPtr.p->m_schemaVersion == schemaVersion) { SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; data->gci = last_gci; data->tableId = tabPtr.p->m_tableId; data->tableId = table; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); data->logType = 0; Loading @@ -5031,7 +5053,8 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, data->totalLen = ptrLen; { LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers); LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers); SubscriberPtr subbPtr; for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr)) { Loading @@ -5043,6 +5066,7 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, } } } } break; } Loading storage/ndb/test/tools/listen.cpp +7 −1 Original line number Diff line number Diff line Loading @@ -168,9 +168,15 @@ main(int argc, const char** argv){ break; case NdbDictionary::Event::TE_DROP: break; case NdbDictionary::Event::TE_NODE_FAILURE: break; case NdbDictionary::Event::TE_SUBSCRIBE: case NdbDictionary::Event::TE_UNSUBSCRIBE: break; default: /* We should REALLY never get here. */ ndbout_c("Error: unknown event type"); ndbout_c("Error: unknown event type: %u", (Uint32)pOp->getEventType()); abort(); } } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI()); Loading Loading
storage/ndb/src/kernel/blocks/suma/Suma.cpp +50 −26 Original line number Diff line number Diff line Loading @@ -758,6 +758,17 @@ Suma::execNODE_FAILREP(Signal* signal){ Restart.resetRestart(signal); } if (ERROR_INSERTED(13032)) { Uint32 node = c_subscriber_nodes.find(0); if (node != NodeBitmask::NotFound) { ndbout_c("Inserting API_FAILREQ node: %u", node); signal->theData[0] = node; EXECUTE_DIRECT(QMGR, GSN_API_FAILREQ, signal, 1); } } signal->theData[0] = SumaContinueB::RESEND_BUCKET; NdbNodeBitmask tmp; Loading Loading @@ -3463,6 +3474,9 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) f_bufferLock = 0; b_bufferLock = 0; ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0); Uint32 tableId = tabPtr.p->m_tableId; Uint32 bucket= hashValue % c_no_of_buckets; m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci); if(m_active_buckets.get(bucket) || Loading @@ -3485,7 +3499,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; data->gci = gci; data->tableId = tabPtr.p->m_tableId; data->tableId = tableId; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); data->logType = 0; Loading @@ -3508,10 +3522,11 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) else { Uint32* dst; Uint32 sz = f_trigBufferSize + b_trigBufferSize + 2; Uint32 sz = f_trigBufferSize + b_trigBufferSize + 3; if((dst = get_buffer_ptr(signal, bucket, gci, sz))) { * dst++ = tabPtr.i; * dst++ = tableId; * dst++ = tabPtr.p->m_schemaVersion; * dst++ = (event << 16) | f_trigBufferSize; memcpy(dst, f_buffer, f_trigBufferSize << 2); dst += f_trigBufferSize; Loading Loading @@ -3643,6 +3658,7 @@ Suma::execSUB_GCP_COMPLETE_REP(Signal* signal) if(c_buckets[i].m_buffer_tail != RNIL) { //Uint32* dst; get_buffer_ptr(signal, i, gci, 0); } } Loading Loading @@ -3987,6 +4003,9 @@ void Suma::completeSubRemove(SubscriptionPtr subPtr) { DBUG_ENTER("Suma::completeSubRemove"); //Uint32 subscriptionId = subPtr.p->m_subscriptionId; //Uint32 subscriptionKey = subPtr.p->m_subscriptionKey; c_subscriptions.release(subPtr); DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d", c_subscriptionPool.getSize(), Loading Loading @@ -5003,27 +5022,30 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, { g_cnt++; Uint32 table = * src++ ; Uint32 schemaVersion = * src++; Uint32 event = * src >> 16; Uint32 sz_1 = (* src ++) & 0xFFFF; ndbassert(sz - 2 >= sz_1); ndbassert(sz - 3 >= sz_1); LinearSectionPtr ptr[3]; const Uint32 nptr= reformat(signal, ptr, src, sz_1, src + sz_1, sz - 2 - sz_1); src + sz_1, sz - 3 - sz_1); Uint32 ptrLen= 0; for(Uint32 i =0; i < nptr; i++) ptrLen+= ptr[i].sz; /** * Signal to subscriber(s) */ Ptr<Table> tabPtr; ndbrequire((tabPtr.p = c_tablePool.getPtr(table)) != 0); if (c_tables.find(tabPtr, table) && tabPtr.p->m_schemaVersion == schemaVersion) { SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg; data->gci = last_gci; data->tableId = tabPtr.p->m_tableId; data->tableId = table; data->requestInfo = 0; SubTableData::setOperation(data->requestInfo, event); data->logType = 0; Loading @@ -5031,7 +5053,8 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, data->totalLen = ptrLen; { LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers); LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers); SubscriberPtr subbPtr; for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr)) { Loading @@ -5043,6 +5066,7 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, } } } } break; } Loading
storage/ndb/test/tools/listen.cpp +7 −1 Original line number Diff line number Diff line Loading @@ -168,9 +168,15 @@ main(int argc, const char** argv){ break; case NdbDictionary::Event::TE_DROP: break; case NdbDictionary::Event::TE_NODE_FAILURE: break; case NdbDictionary::Event::TE_SUBSCRIBE: case NdbDictionary::Event::TE_UNSUBSCRIBE: break; default: /* We should REALLY never get here. */ ndbout_c("Error: unknown event type"); ndbout_c("Error: unknown event type: %u", (Uint32)pOp->getEventType()); abort(); } } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI()); Loading