Commit 661390b1 authored by unknown's avatar unknown
Browse files

fixed mem leak in subscriction handling in Suma

    debug printouts
    ndb documentation update


ndb/include/ndbapi/Ndb.hpp:
  ndb documentation update
ndb/src/kernel/blocks/suma/Suma.cpp:
  fixed mem leak in subscriction handling in Suma
  debug printouts
ndb/src/kernel/blocks/suma/Suma.hpp:
  fixed mem leak in subscriction handling in Suma
parent 7ec1e170
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -1218,12 +1218,12 @@ public:
  /**
   * Drop a subscription to an event
   *
   * @param eventName
   *        unique identifier of the event
   * @param eventOp
   *        Event operation
   *
   * @return 0 on success
   */
  int dropEventOperation(NdbEventOperation* eventName);
  int dropEventOperation(NdbEventOperation* eventOp);

  /**
   * Wait for an event to occur. Will return as soon as an event
+46 −36
Original line number Diff line number Diff line
@@ -50,6 +50,17 @@
//#define EVENT_DEBUG
//#define EVENT_PH3_DEBUG
//#define EVENT_DEBUG2
#if 0
#undef DBUG_ENTER
#undef DBUG_PRINT
#undef DBUG_RETURN
#undef DBUG_VOID_RETURN

#define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
#define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
#define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
#define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
#endif

/**
 * @todo:
@@ -113,14 +124,11 @@ void
Suma::execSTTOR(Signal* signal) {
  jamEntry();                            

  DBUG_ENTER("Suma::execSTTOR");
  const Uint32 startphase  = signal->theData[1];
  const Uint32 typeOfStart = signal->theData[7];

#ifdef NODEFAIL_DEBUG
  ndbout_c ("SUMA::execSTTOR startphase = %u, typeOfStart = %u",
	    startphase, typeOfStart);

#endif
  DBUG_PRINT("info",("startphase = %u, typeOfStart = %u", startphase, typeOfStart));

  if(startphase == 1){
    jam();
@@ -155,7 +163,7 @@ Suma::execSTTOR(Signal* signal) {
    g_subPtrI = subPtr.i;
    //    sendSTTORRY(signal);
#endif    
    return;
    DBUG_VOID_RETURN;
  }

  if(startphase == 5) {
@@ -178,9 +186,7 @@ Suma::execSTTOR(Signal* signal) {
      for( int i = 0; i < NO_OF_BUCKETS; i++) {
	if (getResponsibleSumaNodeId(i) == refToNode(reference())) {
	  // I'm running this bucket
#ifdef EVENT_DEBUG
	  ndbout_c("bucket %u set to true", i);
#endif
	  DBUG_PRINT("info",("bucket %u set to true", i));
	  c_buckets[i].active = true;
	}
      }
@@ -190,32 +196,31 @@ Suma::execSTTOR(Signal* signal) {
       c_masterNodeId == getOwnNodeId()) {
      jam();
      createSequence(signal);
      return;
      DBUG_VOID_RETURN;
    }//if
  }//if
  

  sendSTTORRY(signal);
  
  return;
  DBUG_VOID_RETURN;
}

void
Suma::createSequence(Signal* signal)
{
  jam();
  DBUG_ENTER("Suma::createSequence");

  UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
  
  req->senderData  = RNIL;
  req->sequenceId  = SUMA_SEQUENCE;
  req->requestType = UtilSequenceReq::Create;
#ifdef DEBUG_SUMA_SEQUENCE
  ndbout_c("SUMA: Create sequence");
#endif
  sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ, 
	     signal, UtilSequenceReq::SignalLength, JBB);
  // execUTIL_SEQUENCE_CONF will call createSequenceReply()
  DBUG_VOID_RETURN;
}

void
@@ -379,7 +384,7 @@ SumaParticipant::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
}

void
SumaParticipant::sendSubStopReq(Signal *signal){
SumaParticipant::sendSubStopReq(Signal *signal, bool unlock){
  DBUG_ENTER("SumaParticipant::sendSubStopReq");
  static bool remove_lock = false;
  jam();
@@ -399,7 +404,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){
    DBUG_VOID_RETURN;
  }

  if(remove_lock) {
  if(remove_lock && !unlock) {
    jam();
    DBUG_VOID_RETURN;
  }
@@ -424,6 +429,7 @@ SumaParticipant::sendSubStopReq(Signal *signal){
void
SumaParticipant::execSUB_STOP_CONF(Signal* signal){
  jamEntry();
  DBUG_ENTER("SumaParticipant::execSUB_STOP_CONF");

  SubStopConf * const conf = (SubStopConf*)signal->getDataPtr();

@@ -449,16 +455,17 @@ SumaParticipant::execSUB_STOP_CONF(Signal* signal){
    }
  }

  sendSubStopReq(signal);
  sendSubStopReq(signal,true);
  DBUG_VOID_RETURN;
}

void
SumaParticipant::execSUB_STOP_REF(Signal* signal){
  jamEntry();
  SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();

  DBUG_ENTER("SumaParticipant::execSUB_STOP_REF");

  SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();

  Uint32 subscriptionId = ref->subscriptionId;
  Uint32 subscriptionKey = ref->subscriptionKey;
  Uint32 part = ref->part;
@@ -845,16 +852,14 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
{
  jamEntry();

  DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
  CRASH_INSERTION(13002);

  UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
#ifdef DEBUG_SUMA_SEQUENCE
  ndbout_c("SUMA: Create sequence conf");
#endif
  if(conf->requestType == UtilSequenceReq::Create) {
    jam();
    createSequenceReply(signal, conf, NULL);
    return;
    DBUG_VOID_RETURN;
  }

  Uint64 subId;
@@ -874,18 +879,21 @@ Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
	     CreateSubscriptionIdConf::SignalLength, JBB);

  c_subscriberPool.release(subbPtr);

  DBUG_VOID_RETURN;
}

void
Suma::execUTIL_SEQUENCE_REF(Signal* signal)
{
  jamEntry();
  DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
  UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();

  if(ref->requestType == UtilSequenceReq::Create) {
    jam();
    createSequenceReply(signal, NULL, ref);
    return;
    DBUG_VOID_RETURN;
  }

  Uint32 subData = ref->senderData;
@@ -894,7 +902,7 @@ Suma::execUTIL_SEQUENCE_REF(Signal* signal)
  c_subscriberPool.getPtr(subbPtr,subData);
  sendSubIdRef(signal, GrepError::SEQUENCE_ERROR);
  c_subscriberPool.release(subbPtr);
  return;
  DBUG_VOID_RETURN;
}//execUTIL_SEQUENCE_REF()


@@ -2091,9 +2099,7 @@ SumaParticipant::execSCAN_HBREP(Signal* signal){
void
SumaParticipant::execSUB_START_REQ(Signal* signal){
  jamEntry();
#ifdef NODEFAIL_DEBUG
  ndbout_c("Suma::execSUB_START_REQ");
#endif
  DBUG_ENTER("SumaParticipant::execSUB_START_REQ");

  CRASH_INSERTION(13013);

@@ -2103,7 +2109,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
    if (RtoI(signal->getSendersBlockRef(), false) == RNIL) {
      jam();
      sendSubStartRef(signal, /** Error Code */ 0, true);
      return;
      DBUG_VOID_RETURN;
    }
    // only allow other Suma's in the nodegroup to come through for restart purposes
  }
@@ -2124,7 +2130,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
  if(!c_subscriptions.find(subPtr, key)){
    jam();
    sendSubStartRef(signal, /** Error Code */ 0);
    return;
    DBUG_VOID_RETURN;
  }
  
  Ptr<SyncRecord> syncPtr;
@@ -2135,7 +2141,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
    ndbout_c("Locked");
#endif
    sendSubStartRef(signal, /** Error Code */ 0, true);
    return;
    DBUG_VOID_RETURN;
  }
  syncPtr.p->m_locked = true;

@@ -2144,7 +2150,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
    jam();
    syncPtr.p->m_locked = false;
    sendSubStartRef(signal, /** Error Code */ 0);
    return;
    DBUG_VOID_RETURN;
  }

  Uint32 type = subPtr.p->m_subscriptionType;
@@ -2211,6 +2217,7 @@ SumaParticipant::execSUB_START_REQ(Signal* signal){
    break;
  }
  ndbrequire(ok);
  DBUG_VOID_RETURN;
}

void
@@ -2963,6 +2970,7 @@ SumaParticipant::execFIRE_TRIG_ORD(Signal* signal){
	}
      }
#endif
      DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d", refToNode(ref)));
      sendSignal(ref, GSN_SUB_TABLE_DATA, signal,
                 SubTableData::SignalLength, JBB, ptr, nptr);
      data->logType = tmp;
@@ -3263,6 +3271,7 @@ bool SumaParticipant::FailoverBuffer::nodeFailRep()
void
SumaParticipant::execSUB_STOP_REQ(Signal* signal){
  jamEntry();
  DBUG_ENTER("SumaParticipant::execSUB_STOP_REQ");
  
  CRASH_INSERTION(13019);

@@ -3292,7 +3301,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
	       SubStopConf::SignalLength, JBB);

    removeSubscribersOnNode(signal, refToNode(subscriberRef));
    return;
    DBUG_VOID_RETURN;
  }

  if(!c_subscriptions.find(subPtr, key)){
@@ -3333,7 +3342,7 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
    if (!found) {
      jam();
      sendSubStopRef(signal, GrepError::SUBSCRIBER_NOT_FOUND);
      return;
      DBUG_VOID_RETURN;
    }
  }

@@ -3346,11 +3355,12 @@ SumaParticipant::execSUB_STOP_REQ(Signal* signal){
  if (syncPtr.p->m_locked) {
    jam();
    sendSubStopRef(signal, /** Error Code */ 0, true);
    return;
    DBUG_VOID_RETURN;
  }
  syncPtr.p->m_locked = true;

  syncPtr.p->startDropTrigger(signal);
  DBUG_VOID_RETURN;
}

void
+1 −1
Original line number Diff line number Diff line
@@ -376,7 +376,7 @@ public:
  void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, 
			    SubscriptionData::Part);
  void sendSubStopComplete(Signal*, SubscriberPtr);
  void sendSubStopReq(Signal* signal);
  void sendSubStopReq(Signal* signal, bool unlock= false);

  void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr);