Loading ndb/include/ndbapi/Ndb.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -1218,12 +1218,12 @@ public: /** * Drop a subscription to an event * * @param eventName * unique identifier of the event * @param eventOp * Event operation * * @return 0 on success */ int dropEventOperation(NdbEventOperation* eventName); int dropEventOperation(NdbEventOperation* eventOp); /** * Wait for an event to occur. Will return as soon as an event Loading ndb/src/kernel/blocks/suma/Suma.cpp +46 −36 Original line number Diff line number Diff line Loading @@ -50,6 +50,17 @@ //#define EVENT_DEBUG //#define EVENT_PH3_DEBUG //#define EVENT_DEBUG2 #if 0 #undef DBUG_ENTER #undef DBUG_PRINT #undef DBUG_RETURN #undef DBUG_VOID_RETURN #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);} #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;} #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); } #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; } #endif /** * @todo: Loading Loading @@ -113,14 +124,11 @@ void Suma::execSTTOR(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSTTOR"); const Uint32 startphase = signal->theData[1]; const Uint32 typeOfStart = signal->theData[7]; #ifdef NODEFAIL_DEBUG ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u", startphase, typeOfStart); #endif DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart)); if(startphase == 1){ jam(); Loading Loading @@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) { g_subPtrI = subPtr.i; // sendSTTORRY(signal); #endif return; DBUG_VOID_RETURN; } if(startphase == 5) { Loading @@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) { for( int i = 0; i < NO_OF_BUCKETS; i++) { if (getResponsibleSumaNodeId(i) == refToNode(reference())) { // I'm running this bucket #ifdef EVENT_DEBUG ndbout_c("bucket %u set to true", i); #endif DBUG_PRINT("info",("bucket %u set to true", i)); c_buckets[i].active = true; } } Loading @@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) { c_masterNodeId == getOwnNodeId()) { jam(); createSequence(signal); return; DBUG_VOID_RETURN; }//if }//if sendSTTORRY(signal); return; DBUG_VOID_RETURN; } void Suma::createSequence(Signal* signal) { jam(); DBUG_ENTER("Suma::createSequence"); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); req->senderData = RNIL; req->sequenceId = SUMA_SEQUENCE; req->requestType = UtilSequenceReq::Create; #ifdef DEBUG_SUMA_SEQUENCE ndbout_c("SUMA: Create sequence"); #endif sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); // execUTIL_SEQUENCE_CONF will call createSequenceReply() DBUG_VOID_RETURN; } void Loading Loading @@ -379,7 +384,7 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) } void SumaParticipant::sendSubStopReq(Signal *signal){ SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){ DBUG_ENTER("SumaParticipant::sendSubStopReq"); static bool remove_lock = false; jam(); Loading @@ -399,7 +404,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){ DBUG_VOID_RETURN; } if(remove_lock) { if(remove_lock && !unlock) { jam(); DBUG_VOID_RETURN; } Loading @@ -424,6 +429,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){ void SumaParticipant::execSUB_STOP_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF"); SubStopConf * const conf = (SubStopConf*)signal->getDataPtr(); Loading @@ -449,16 +455,17 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){ } } sendSubStopReq(signal); sendSubStopReq(signal,true); DBUG_VOID_RETURN; } void SumaParticipant::execSUB_STOP_REF(Signal* signal){ jamEntry(); SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); DBUG_ENTER("SumaParticipant::execSUB_STOP_REF"); SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); Uint32 subscriptionId = ref->subscriptionId; Uint32 subscriptionKey = ref->subscriptionKey; Uint32 part = ref->part; Loading Loading @@ -845,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); CRASH_INSERTION(13002); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); #ifdef DEBUG_SUMA_SEQUENCE ndbout_c("SUMA: Create sequence conf"); #endif if(conf->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, conf, NULL); return; DBUG_VOID_RETURN; } Uint64 subId; Loading @@ -874,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) CreateSubscriptionIdConf::SignalLength, JBB); c_subscriberPool.release(subbPtr); DBUG_VOID_RETURN; } void Suma::execUTIL_SEQUENCE_REF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); if(ref->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, NULL, ref); return; DBUG_VOID_RETURN; } Uint32 subData = ref->senderData; Loading @@ -894,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal) c_subscriberPool.getPtr(subbPtr,subData); sendSubIdRef(signal, GrepError::SEQUENCE_ERROR); c_subscriberPool.release(subbPtr); return; DBUG_VOID_RETURN; }//execUTIL_SEQUENCE_REF() Loading Loading @@ -2091,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){ void SumaParticipant::execSUB_START_REQ(Signal* signal){ jamEntry(); #ifdef NODEFAIL_DEBUG ndbout_c("Suma::execSUB_START_REQ"); #endif DBUG_ENTER("SumaParticipant::execSUB_START_REQ"); CRASH_INSERTION(13013); Loading @@ -2103,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { jam(); sendSubStartRef(signal, /** Error Code */ 0, true); return; DBUG_VOID_RETURN; } // only allow other Suma's in the nodegroup to come through for restart purposes } Loading @@ -2124,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ if(!c_subscriptions.find(subPtr, key)){ jam(); sendSubStartRef(signal, /** Error Code */ 0); return; DBUG_VOID_RETURN; } Ptr<SyncRecord> syncPtr; Loading @@ -2135,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ndbout_c("Locked"); #endif sendSubStartRef(signal, /** Error Code */ 0, true); return; DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; Loading @@ -2144,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ jam(); syncPtr.p->m_locked = false; sendSubStartRef(signal, /** Error Code */ 0); return; DBUG_VOID_RETURN; } Uint32 type = subPtr.p->m_subscriptionType; Loading Loading @@ -2211,6 +2217,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ break; } ndbrequire(ok); DBUG_VOID_RETURN; } void Loading Loading @@ -2963,6 +2970,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ } } #endif DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref))); sendSignal(ref, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB, ptr, nptr); data->logType = tmp; Loading Loading @@ -3263,6 +3271,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep() void SumaParticipant::execSUB_STOP_REQ(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ"); CRASH_INSERTION(13019); Loading Loading @@ -3292,7 +3301,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ SubStopConf::SignalLength, JBB); removeSubscribersOnNode(signal, refToNode(subscriberRef)); return; DBUG_VOID_RETURN; } if(!c_subscriptions.find(subPtr, key)){ Loading Loading @@ -3333,7 +3342,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ if (!found) { jam(); sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND); return; DBUG_VOID_RETURN; } } Loading @@ -3346,11 +3355,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ if (syncPtr.p->m_locked) { jam(); sendSubStopRef(signal, /** Error Code */ 0, true); return; DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; syncPtr.p->startDropTrigger(signal); DBUG_VOID_RETURN; } void Loading ndb/src/kernel/blocks/suma/Suma.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -376,7 +376,7 @@ public: void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, SubscriptionData::Part); void sendSubStopComplete(Signal*, SubscriberPtr); void sendSubStopReq(Signal* signal); void sendSubStopReq(Signal* signal, bool unlock= false); void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr); Loading Loading
ndb/include/ndbapi/Ndb.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -1218,12 +1218,12 @@ public: /** * Drop a subscription to an event * * @param eventName * unique identifier of the event * @param eventOp * Event operation * * @return 0 on success */ int dropEventOperation(NdbEventOperation* eventName); int dropEventOperation(NdbEventOperation* eventOp); /** * Wait for an event to occur. Will return as soon as an event Loading
ndb/src/kernel/blocks/suma/Suma.cpp +46 −36 Original line number Diff line number Diff line Loading @@ -50,6 +50,17 @@ //#define EVENT_DEBUG //#define EVENT_PH3_DEBUG //#define EVENT_DEBUG2 #if 0 #undef DBUG_ENTER #undef DBUG_PRINT #undef DBUG_RETURN #undef DBUG_VOID_RETURN #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);} #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;} #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); } #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; } #endif /** * @todo: Loading Loading @@ -113,14 +124,11 @@ void Suma::execSTTOR(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execSTTOR"); const Uint32 startphase = signal->theData[1]; const Uint32 typeOfStart = signal->theData[7]; #ifdef NODEFAIL_DEBUG ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u", startphase, typeOfStart); #endif DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart)); if(startphase == 1){ jam(); Loading Loading @@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) { g_subPtrI = subPtr.i; // sendSTTORRY(signal); #endif return; DBUG_VOID_RETURN; } if(startphase == 5) { Loading @@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) { for( int i = 0; i < NO_OF_BUCKETS; i++) { if (getResponsibleSumaNodeId(i) == refToNode(reference())) { // I'm running this bucket #ifdef EVENT_DEBUG ndbout_c("bucket %u set to true", i); #endif DBUG_PRINT("info",("bucket %u set to true", i)); c_buckets[i].active = true; } } Loading @@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) { c_masterNodeId == getOwnNodeId()) { jam(); createSequence(signal); return; DBUG_VOID_RETURN; }//if }//if sendSTTORRY(signal); return; DBUG_VOID_RETURN; } void Suma::createSequence(Signal* signal) { jam(); DBUG_ENTER("Suma::createSequence"); UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend(); req->senderData = RNIL; req->sequenceId = SUMA_SEQUENCE; req->requestType = UtilSequenceReq::Create; #ifdef DEBUG_SUMA_SEQUENCE ndbout_c("SUMA: Create sequence"); #endif sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, signal, UtilSequenceReq::SignalLength, JBB); // execUTIL_SEQUENCE_CONF will call createSequenceReply() DBUG_VOID_RETURN; } void Loading Loading @@ -379,7 +384,7 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId) } void SumaParticipant::sendSubStopReq(Signal *signal){ SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){ DBUG_ENTER("SumaParticipant::sendSubStopReq"); static bool remove_lock = false; jam(); Loading @@ -399,7 +404,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){ DBUG_VOID_RETURN; } if(remove_lock) { if(remove_lock && !unlock) { jam(); DBUG_VOID_RETURN; } Loading @@ -424,6 +429,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){ void SumaParticipant::execSUB_STOP_CONF(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF"); SubStopConf * const conf = (SubStopConf*)signal->getDataPtr(); Loading @@ -449,16 +455,17 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){ } } sendSubStopReq(signal); sendSubStopReq(signal,true); DBUG_VOID_RETURN; } void SumaParticipant::execSUB_STOP_REF(Signal* signal){ jamEntry(); SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); DBUG_ENTER("SumaParticipant::execSUB_STOP_REF"); SubStopRef * const ref = (SubStopRef*)signal->getDataPtr(); Uint32 subscriptionId = ref->subscriptionId; Uint32 subscriptionKey = ref->subscriptionKey; Uint32 part = ref->part; Loading Loading @@ -845,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF"); CRASH_INSERTION(13002); UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr(); #ifdef DEBUG_SUMA_SEQUENCE ndbout_c("SUMA: Create sequence conf"); #endif if(conf->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, conf, NULL); return; DBUG_VOID_RETURN; } Uint64 subId; Loading @@ -874,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal) CreateSubscriptionIdConf::SignalLength, JBB); c_subscriberPool.release(subbPtr); DBUG_VOID_RETURN; } void Suma::execUTIL_SEQUENCE_REF(Signal* signal) { jamEntry(); DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF"); UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr(); if(ref->requestType == UtilSequenceReq::Create) { jam(); createSequenceReply(signal, NULL, ref); return; DBUG_VOID_RETURN; } Uint32 subData = ref->senderData; Loading @@ -894,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal) c_subscriberPool.getPtr(subbPtr,subData); sendSubIdRef(signal, GrepError::SEQUENCE_ERROR); c_subscriberPool.release(subbPtr); return; DBUG_VOID_RETURN; }//execUTIL_SEQUENCE_REF() Loading Loading @@ -2091,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){ void SumaParticipant::execSUB_START_REQ(Signal* signal){ jamEntry(); #ifdef NODEFAIL_DEBUG ndbout_c("Suma::execSUB_START_REQ"); #endif DBUG_ENTER("SumaParticipant::execSUB_START_REQ"); CRASH_INSERTION(13013); Loading @@ -2103,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ if (RtoI(signal->getSendersBlockRef(), false) == RNIL) { jam(); sendSubStartRef(signal, /** Error Code */ 0, true); return; DBUG_VOID_RETURN; } // only allow other Suma's in the nodegroup to come through for restart purposes } Loading @@ -2124,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ if(!c_subscriptions.find(subPtr, key)){ jam(); sendSubStartRef(signal, /** Error Code */ 0); return; DBUG_VOID_RETURN; } Ptr<SyncRecord> syncPtr; Loading @@ -2135,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ ndbout_c("Locked"); #endif sendSubStartRef(signal, /** Error Code */ 0, true); return; DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; Loading @@ -2144,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ jam(); syncPtr.p->m_locked = false; sendSubStartRef(signal, /** Error Code */ 0); return; DBUG_VOID_RETURN; } Uint32 type = subPtr.p->m_subscriptionType; Loading Loading @@ -2211,6 +2217,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){ break; } ndbrequire(ok); DBUG_VOID_RETURN; } void Loading Loading @@ -2963,6 +2970,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){ } } #endif DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref))); sendSignal(ref, GSN_SUB_TABLE_DATA, signal, SubTableData::SignalLength, JBB, ptr, nptr); data->logType = tmp; Loading Loading @@ -3263,6 +3271,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep() void SumaParticipant::execSUB_STOP_REQ(Signal* signal){ jamEntry(); DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ"); CRASH_INSERTION(13019); Loading Loading @@ -3292,7 +3301,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ SubStopConf::SignalLength, JBB); removeSubscribersOnNode(signal, refToNode(subscriberRef)); return; DBUG_VOID_RETURN; } if(!c_subscriptions.find(subPtr, key)){ Loading Loading @@ -3333,7 +3342,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ if (!found) { jam(); sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND); return; DBUG_VOID_RETURN; } } Loading @@ -3346,11 +3355,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){ if (syncPtr.p->m_locked) { jam(); sendSubStopRef(signal, /** Error Code */ 0, true); return; DBUG_VOID_RETURN; } syncPtr.p->m_locked = true; syncPtr.p->startDropTrigger(signal); DBUG_VOID_RETURN; } void Loading
ndb/src/kernel/blocks/suma/Suma.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -376,7 +376,7 @@ public: void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, SubscriptionData::Part); void sendSubStopComplete(Signal*, SubscriberPtr); void sendSubStopReq(Signal* signal); void sendSubStopReq(Signal* signal, bool unlock= false); void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr); Loading