Loading ndb/src/common/transporter/Packer.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -21,7 +21,12 @@ #include <TransporterCallback.hpp> #include <RefConvert.hpp> #ifdef ERROR_INSERT Uint32 MAX_RECEIVED_SIGNALS = 1024; #else #define MAX_RECEIVED_SIGNALS 1024 #endif Uint32 TransporterRegistry::unpack(Uint32 * readPtr, Uint32 sizeOfData, Loading ndb/src/common/transporter/TCP_Transporter.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -100,6 +100,10 @@ private: virtual void updateReceiveDataPtr(Uint32 bytesRead); virtual Uint32 get_free_buffer() const; inline bool hasReceiveData () const { return receiveBuffer.sizeOfData > 0; } protected: /** * Setup client/server and perform connect/accept Loading ndb/src/common/transporter/TransporterRegistry.cpp +39 −34 Original line number Diff line number Diff line Loading @@ -807,28 +807,13 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis) Uint32 TransporterRegistry::poll_TCP(Uint32 timeOutMillis) { bool hasdata = false; if (false && nTCPTransporters == 0) { tcpReadSelectReply = 0; return 0; } struct timeval timeout; #ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ timeout.tv_sec = 0; timeout.tv_usec = 1025; } else { timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; } #else timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; #endif NDB_SOCKET_TYPE maxSocketValue = -1; // Needed for TCP/IP connections Loading @@ -851,8 +836,27 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis) // Put the connected transporters in the socket read-set FD_SET(socket, &tcpReadset); } hasdata |= t->hasReceiveData(); } timeOutMillis = hasdata ? 0 : timeOutMillis; struct timeval timeout; #ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ timeout.tv_sec = 0; timeout.tv_usec = 1025; } else { timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; } #else timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; #endif // The highest socket value plus one maxSocketValue++; Loading @@ -867,7 +871,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis) } #endif return tcpReadSelectReply; return tcpReadSelectReply || hasdata; } #endif Loading Loading @@ -902,8 +906,6 @@ TransporterRegistry::performReceive() #endif #ifdef NDB_TCP_TRANSPORTER if(tcpReadSelectReply > 0) { for (int i=0; i<nTCPTransporters; i++) { checkJobBuffer(); Loading @@ -911,10 +913,14 @@ TransporterRegistry::performReceive() const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) if(t->isConnected()) { const int receiveSize = t->doReceive(); if(receiveSize > 0) if (FD_ISSET(socket, &tcpReadset)) { t->doReceive(); } if (t->hasReceiveData()) { Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); Loading @@ -924,7 +930,6 @@ TransporterRegistry::performReceive() } } } } #endif #ifdef NDB_SCI_TRANSPORTER Loading ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp +17 −0 Original line number Diff line number Diff line Loading @@ -135,6 +135,7 @@ Cmvmi::~Cmvmi() #ifdef ERROR_INSERT NodeBitmask c_error_9000_nodes_mask; extern Uint32 MAX_RECEIVED_SIGNALS; #endif void Cmvmi::execNDB_TAMPER(Signal* signal) Loading Loading @@ -164,6 +165,22 @@ void Cmvmi::execNDB_TAMPER(Signal* signal) kill(getpid(), SIGABRT); } #endif #ifdef ERROR_INSERT if (signal->theData[0] == 9003) { if (MAX_RECEIVED_SIGNALS < 1024) { MAX_RECEIVED_SIGNALS = 1024; } else { MAX_RECEIVED_SIGNALS = 1 + (rand() % 128); } ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS); CLEAR_ERROR_INSERT_VALUE; } #endif }//execNDB_TAMPER() void Cmvmi::execSET_LOGLEVELORD(Signal* signal) Loading ndb/test/ndbapi/testNdbApi.cpp +34 −0 Original line number Diff line number Diff line Loading @@ -1131,7 +1131,36 @@ int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){ return result; } int runBug28443(NDBT_Context* ctx, NDBT_Step* step) { int result = NDBT_OK; int records = ctx->getNumRecords(); NdbRestarter restarter; restarter.insertErrorInAllNodes(9003); for (Uint32 i = 0; i<ctx->getNumLoops(); i++) { HugoTransactions hugoTrans(*ctx->getTab()); if (hugoTrans.loadTable(GETNDB(step), records, 2048) != 0) { result = NDBT_FAILED; goto done; } if (runClearTable(ctx, step) != 0) { result = NDBT_FAILED; goto done; } } done: restarter.insertErrorInAllNodes(9003); return result; } NDBT_TESTSUITE(testNdbApi); TESTCASE("MaxNdb", Loading Loading @@ -1212,6 +1241,11 @@ TESTCASE("Bug_11133", INITIALIZER(runBug_11133); FINALIZER(runClearTable); } TESTCASE("Bug28443", ""){ INITIALIZER(runBug28443); FINALIZER(runClearTable); } NDBT_TESTSUITE_END(testNdbApi); int main(int argc, const char** argv){ Loading Loading
ndb/src/common/transporter/Packer.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -21,7 +21,12 @@ #include <TransporterCallback.hpp> #include <RefConvert.hpp> #ifdef ERROR_INSERT Uint32 MAX_RECEIVED_SIGNALS = 1024; #else #define MAX_RECEIVED_SIGNALS 1024 #endif Uint32 TransporterRegistry::unpack(Uint32 * readPtr, Uint32 sizeOfData, Loading
ndb/src/common/transporter/TCP_Transporter.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -100,6 +100,10 @@ private: virtual void updateReceiveDataPtr(Uint32 bytesRead); virtual Uint32 get_free_buffer() const; inline bool hasReceiveData () const { return receiveBuffer.sizeOfData > 0; } protected: /** * Setup client/server and perform connect/accept Loading
ndb/src/common/transporter/TransporterRegistry.cpp +39 −34 Original line number Diff line number Diff line Loading @@ -807,28 +807,13 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis) Uint32 TransporterRegistry::poll_TCP(Uint32 timeOutMillis) { bool hasdata = false; if (false && nTCPTransporters == 0) { tcpReadSelectReply = 0; return 0; } struct timeval timeout; #ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ timeout.tv_sec = 0; timeout.tv_usec = 1025; } else { timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; } #else timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; #endif NDB_SOCKET_TYPE maxSocketValue = -1; // Needed for TCP/IP connections Loading @@ -851,8 +836,27 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis) // Put the connected transporters in the socket read-set FD_SET(socket, &tcpReadset); } hasdata |= t->hasReceiveData(); } timeOutMillis = hasdata ? 0 : timeOutMillis; struct timeval timeout; #ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ timeout.tv_sec = 0; timeout.tv_usec = 1025; } else { timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; } #else timeout.tv_sec = timeOutMillis / 1000; timeout.tv_usec = (timeOutMillis % 1000) * 1000; #endif // The highest socket value plus one maxSocketValue++; Loading @@ -867,7 +871,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis) } #endif return tcpReadSelectReply; return tcpReadSelectReply || hasdata; } #endif Loading Loading @@ -902,8 +906,6 @@ TransporterRegistry::performReceive() #endif #ifdef NDB_TCP_TRANSPORTER if(tcpReadSelectReply > 0) { for (int i=0; i<nTCPTransporters; i++) { checkJobBuffer(); Loading @@ -911,10 +913,14 @@ TransporterRegistry::performReceive() const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) if(t->isConnected()) { const int receiveSize = t->doReceive(); if(receiveSize > 0) if (FD_ISSET(socket, &tcpReadset)) { t->doReceive(); } if (t->hasReceiveData()) { Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); Loading @@ -924,7 +930,6 @@ TransporterRegistry::performReceive() } } } } #endif #ifdef NDB_SCI_TRANSPORTER Loading
ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp +17 −0 Original line number Diff line number Diff line Loading @@ -135,6 +135,7 @@ Cmvmi::~Cmvmi() #ifdef ERROR_INSERT NodeBitmask c_error_9000_nodes_mask; extern Uint32 MAX_RECEIVED_SIGNALS; #endif void Cmvmi::execNDB_TAMPER(Signal* signal) Loading Loading @@ -164,6 +165,22 @@ void Cmvmi::execNDB_TAMPER(Signal* signal) kill(getpid(), SIGABRT); } #endif #ifdef ERROR_INSERT if (signal->theData[0] == 9003) { if (MAX_RECEIVED_SIGNALS < 1024) { MAX_RECEIVED_SIGNALS = 1024; } else { MAX_RECEIVED_SIGNALS = 1 + (rand() % 128); } ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS); CLEAR_ERROR_INSERT_VALUE; } #endif }//execNDB_TAMPER() void Cmvmi::execSET_LOGLEVELORD(Signal* signal) Loading
ndb/test/ndbapi/testNdbApi.cpp +34 −0 Original line number Diff line number Diff line Loading @@ -1131,7 +1131,36 @@ int runBug_11133(NDBT_Context* ctx, NDBT_Step* step){ return result; } int runBug28443(NDBT_Context* ctx, NDBT_Step* step) { int result = NDBT_OK; int records = ctx->getNumRecords(); NdbRestarter restarter; restarter.insertErrorInAllNodes(9003); for (Uint32 i = 0; i<ctx->getNumLoops(); i++) { HugoTransactions hugoTrans(*ctx->getTab()); if (hugoTrans.loadTable(GETNDB(step), records, 2048) != 0) { result = NDBT_FAILED; goto done; } if (runClearTable(ctx, step) != 0) { result = NDBT_FAILED; goto done; } } done: restarter.insertErrorInAllNodes(9003); return result; } NDBT_TESTSUITE(testNdbApi); TESTCASE("MaxNdb", Loading Loading @@ -1212,6 +1241,11 @@ TESTCASE("Bug_11133", INITIALIZER(runBug_11133); FINALIZER(runClearTable); } TESTCASE("Bug28443", ""){ INITIALIZER(runBug28443); FINALIZER(runClearTable); } NDBT_TESTSUITE_END(testNdbApi); int main(int argc, const char** argv){ Loading