Loading ndb/src/ndbapi/TransporterFacade.cpp +106 −7 Original line number Diff line number Diff line Loading @@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() : theTransporterRegistry(0), theStopReceive(0), theSendThread(NULL), theReceiveThread(NULL) theReceiveThread(NULL), m_fragmented_signal_id(0) { theOwnId = 0; Loading Loading @@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ return (ss == SEND_OK ? 0 : -1); } #define CHUNK_SZ 100u int TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ NdbApiSignal tmp_signal(*(SignalHeader*)aSignal); LinearSectionPtr tmp_ptr[3]; Uint32 unique_id= m_fragmented_signal_id++; // next unique id unsigned i; for (i= 0; i < secs; i++) tmp_ptr[i]= ptr[i]; unsigned start_i= 0; unsigned chunk_sz= 0; unsigned fragment_info= 0; Uint32 *tmp_data= tmp_signal.getDataPtrSend(); for (i= 0; i < secs;) { unsigned save_sz= tmp_ptr[i].sz; tmp_data[i-start_i]= i; if (chunk_sz + save_sz > CHUNK_SZ) { // truncate unsigned send_sz= CHUNK_SZ - chunk_sz; tmp_ptr[i].sz= send_sz; if (fragment_info < 2) fragment_info++; // send tmp_signal tmp_data[i-start_i+1]= unique_id; tmp_signal.setLength(i-start_i+2); tmp_signal.m_fragmentInfo= fragment_info; // do prepare send { int ret; if(getIsNodeSendable(aNode) == true){ SendStatus ss = theTransporterRegistry->prepareSend (&tmp_signal, 1, // JBB tmp_signal.getDataPtrSend(), aNode, &ptr[start_i]); assert(ss != SEND_MESSAGE_TOO_BIG); ret = (ss == SEND_OK ? 0 : -1); } else ret = -1; if (ret != SEND_OK) return ret; } // setup variables for next signal start_i= i; chunk_sz= 0; tmp_ptr[i].sz= save_sz-send_sz; tmp_ptr[i].p+= send_sz; } else { chunk_sz+= save_sz; i++; } } unsigned a_sz= aSignal->getLength(); if (fragment_info > 0) { // update the original signal to include section info Uint32 *a_data= aSignal->getDataPtrSend(); unsigned tmp_sz= i-start_i; memcpy(a_data+a_sz, tmp_data, tmp_sz*sizeof(Uint32)); a_data[a_sz+tmp_sz]= unique_id; aSignal->setLength(a_sz+tmp_sz+1); // send last fragment aSignal->m_fragmentInfo= 3; aSignal->m_noOfSections= i-start_i; } else { aSignal->m_noOfSections= secs; } // send aSignal int ret; if(getIsNodeSendable(aNode) == true){ SendStatus ss = theTransporterRegistry->prepareSend (aSignal, 1, // JBB aSignal->getDataPtrSend(), aNode, &ptr[start_i]); assert(ss != SEND_MESSAGE_TOO_BIG); ret = (ss == SEND_OK ? 0 : -1); } else ret = -1; aSignal->m_noOfSections = 0; aSignal->m_fragmentInfo = 0; aSignal->setLength(a_sz); return ret; } #if 0 int TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ Loading Loading @@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, aSignal->m_noOfSections = 0; return -1; } #endif int Loading @@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, aSignal->theSendersBlockRef = tmp; } #endif SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 1, // JBB SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 1, // JBB aSignal->getDataPtrSend(), aNode, ptr); aNode, ptr); assert(ss != SEND_MESSAGE_TOO_BIG); aSignal->m_noOfSections = 0; return (ss == SEND_OK ? 0 : -1); Loading ndb/src/ndbapi/TransporterFacade.hpp +2 −1 Original line number Diff line number Diff line Loading @@ -224,6 +224,7 @@ private: } m_threads; Uint32 m_max_trans_id; Uint32 m_fragmented_signal_id; /** * execute function Loading Loading
ndb/src/ndbapi/TransporterFacade.cpp +106 −7 Original line number Diff line number Diff line Loading @@ -475,7 +475,8 @@ TransporterFacade::TransporterFacade() : theTransporterRegistry(0), theStopReceive(0), theSendThread(NULL), theReceiveThread(NULL) theReceiveThread(NULL), m_fragmented_signal_id(0) { theOwnId = 0; Loading Loading @@ -833,6 +834,105 @@ TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){ return (ss == SEND_OK ? 0 : -1); } #define CHUNK_SZ 100u int TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ NdbApiSignal tmp_signal(*(SignalHeader*)aSignal); LinearSectionPtr tmp_ptr[3]; Uint32 unique_id= m_fragmented_signal_id++; // next unique id unsigned i; for (i= 0; i < secs; i++) tmp_ptr[i]= ptr[i]; unsigned start_i= 0; unsigned chunk_sz= 0; unsigned fragment_info= 0; Uint32 *tmp_data= tmp_signal.getDataPtrSend(); for (i= 0; i < secs;) { unsigned save_sz= tmp_ptr[i].sz; tmp_data[i-start_i]= i; if (chunk_sz + save_sz > CHUNK_SZ) { // truncate unsigned send_sz= CHUNK_SZ - chunk_sz; tmp_ptr[i].sz= send_sz; if (fragment_info < 2) fragment_info++; // send tmp_signal tmp_data[i-start_i+1]= unique_id; tmp_signal.setLength(i-start_i+2); tmp_signal.m_fragmentInfo= fragment_info; // do prepare send { int ret; if(getIsNodeSendable(aNode) == true){ SendStatus ss = theTransporterRegistry->prepareSend (&tmp_signal, 1, // JBB tmp_signal.getDataPtrSend(), aNode, &ptr[start_i]); assert(ss != SEND_MESSAGE_TOO_BIG); ret = (ss == SEND_OK ? 0 : -1); } else ret = -1; if (ret != SEND_OK) return ret; } // setup variables for next signal start_i= i; chunk_sz= 0; tmp_ptr[i].sz= save_sz-send_sz; tmp_ptr[i].p+= send_sz; } else { chunk_sz+= save_sz; i++; } } unsigned a_sz= aSignal->getLength(); if (fragment_info > 0) { // update the original signal to include section info Uint32 *a_data= aSignal->getDataPtrSend(); unsigned tmp_sz= i-start_i; memcpy(a_data+a_sz, tmp_data, tmp_sz*sizeof(Uint32)); a_data[a_sz+tmp_sz]= unique_id; aSignal->setLength(a_sz+tmp_sz+1); // send last fragment aSignal->m_fragmentInfo= 3; aSignal->m_noOfSections= i-start_i; } else { aSignal->m_noOfSections= secs; } // send aSignal int ret; if(getIsNodeSendable(aNode) == true){ SendStatus ss = theTransporterRegistry->prepareSend (aSignal, 1, // JBB aSignal->getDataPtrSend(), aNode, &ptr[start_i]); assert(ss != SEND_MESSAGE_TOO_BIG); ret = (ss == SEND_OK ? 0 : -1); } else ret = -1; aSignal->m_noOfSections = 0; aSignal->m_fragmentInfo = 0; aSignal->setLength(a_sz); return ret; } #if 0 int TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, LinearSectionPtr ptr[3], Uint32 secs){ Loading Loading @@ -864,7 +964,7 @@ TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, aSignal->m_noOfSections = 0; return -1; } #endif int Loading @@ -886,11 +986,10 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal, aSignal->theSendersBlockRef = tmp; } #endif SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 1, // JBB SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 1, // JBB aSignal->getDataPtrSend(), aNode, ptr); aNode, ptr); assert(ss != SEND_MESSAGE_TOO_BIG); aSignal->m_noOfSections = 0; return (ss == SEND_OK ? 0 : -1); Loading
ndb/src/ndbapi/TransporterFacade.hpp +2 −1 Original line number Diff line number Diff line Loading @@ -224,6 +224,7 @@ private: } m_threads; Uint32 m_max_trans_id; Uint32 m_fragmented_signal_id; /** * execute function Loading