Loading ndb/docs/wl2077.txt +25 −12 Original line number Diff line number Diff line Loading @@ -10,11 +10,17 @@ results in 1000 rows / sec wl2077-read committed 6.4 (+30%) 10.8 (+45%) wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%) -- Comparision e) serial pk: 10.9' batched (1000): 59' serial uniq index: 8.4' batched (1000): 33' 5.0-ndb batch read committed f) 50' (+680%) g) 50' (+360%) 5.0-ndb batch read hold lock h) 12' (+160%) i) 13' (+79%) shm-mem read committed (cmp. wl2077) a) 9.5' (+48%) b) 14' (+30%) read hold lock c) 6.7' (+45%) d) 9.8' (+46%) -- Comparision e) shm serial pk: 10.9' 20' (+83%) batched (1000): 59' 62' (+5%) serial uniq index: 8.4' 14' (+66%) batched (1000): 33' 36' (+9%) index range (1000): 186' ---- Loading @@ -25,6 +31,8 @@ b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1 c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1 d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1 e) testReadPerf -i 25 -c 0 -d 0 T1 f) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 3 -q 0 -m 1000 -i 10 T1 g) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 3 -q 1 -m 1000 -i 10 T1 --- music join 1db-co 2db-co Loading @@ -33,3 +41,8 @@ e) testReadPerf -i 25 -c 0 -d 0 T1 wl2077 12s 14s wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%) pekka-blob-fix 1.3s shm 1.2s 2.0s shm wo/ blobs 1.1s 2.0s ndb/include/transporter/TransporterRegistry.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -87,6 +87,7 @@ public: */ class TransporterRegistry { friend class OSE_Receiver; friend class SHM_Transporter; friend class Transporter; friend class TransporterService; public: Loading Loading @@ -312,6 +313,8 @@ private: Uint32 poll_TCP(Uint32 timeOutMillis); Uint32 poll_SCI(Uint32 timeOutMillis); Uint32 poll_SHM(Uint32 timeOutMillis); int m_shm_own_pid; }; #endif // Define of TransporterRegistry_H ndb/src/common/transporter/SHM_Transporter.cpp +41 −44 Original line number Diff line number Diff line Loading @@ -26,6 +26,8 @@ #include <InputStream.hpp> #include <OutputStream.hpp> extern int g_shm_pid; SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, Loading @@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, #ifdef DEBUG_TRANSPORTER printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey); #endif m_signal_threshold = 4096; } SHM_Transporter::~SHM_Transporter(){ Loading Loading @@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){ #endif } #if 0 SendStatus SHM_Transporter::prepareSend(const SignalHeader * const signalHeader, Uint8 prio, const Uint32 * const signalData, const LinearSegmentPtr ptr[3], bool force){ if(isConnected()){ const Uint32 lenBytes = m_packer.getMessageLength(signalHeader, ptr); Uint32 * insertPtr = (Uint32 *)writer->getWritePtr(lenBytes); if(insertPtr != 0){ m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr); /** * Do funky membar stuff */ writer->updateWritePtr(lenBytes); return SEND_OK; } else { // NdbSleep_MilliSleep(3); //goto tryagain; return SEND_BUFFER_FULL; } } return SEND_DISCONNECTED; } #endif bool SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) { Loading Loading @@ -247,10 +214,18 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) } // Send ok to client s_output.println("shm server 1 ok"); s_output.println("shm server 1 ok: %d", m_transporter_registry.m_shm_own_pid); // Wait for ok from client if (s_input.gets(buf, 256) == 0) { if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } Loading Loading @@ -289,6 +264,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) DBUG_RETURN(false); } if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Create if(!_shmSegCreated){ if (!ndb_shm_get()) { Loading @@ -313,7 +294,8 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) } // Send ok to server s_output.println("shm client 1 ok"); s_output.println("shm client 1 ok: %d", m_transporter_registry.m_shm_own_pid); int r= connect_common(sockfd); Loading Loading @@ -344,18 +326,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) return false; } if(!setupBuffersDone) { if(!setupBuffersDone) { setupBuffers(); setupBuffersDone=true; } if(setupBuffersDone) { if(setupBuffersDone) { NdbSleep_MilliSleep(m_timeOutMillis); if(*serverStatusFlag == 1 && *clientStatusFlag == 1) { m_last_signal = 0; return true; } } DBUG_PRINT("error", ("Failed to set up buffers to node %d", remoteNodeId)); return false; } void SHM_Transporter::doSend() { if(m_last_signal) { m_last_signal = 0; kill(m_remote_pid, SIGUSR1); } } ndb/src/common/transporter/SHM_Transporter.hpp +24 −9 Original line number Diff line number Diff line Loading @@ -53,12 +53,19 @@ public: */ bool initTransporter(); Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio) { return (Uint32 *)writer->getWritePtr(lenBytes); } void updateWritePtr(Uint32 lenBytes, Uint32 prio){ void updateWritePtr(Uint32 lenBytes, Uint32 prio) { writer->updateWritePtr(lenBytes); m_last_signal += lenBytes; if(m_last_signal >= m_signal_threshold) { doSend(); } } void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ Loading Loading @@ -123,6 +130,14 @@ protected: */ void setupBuffers(); /** * doSend (i.e signal receiver) */ void doSend(); int m_remote_pid; Uint32 m_last_signal; Uint32 m_signal_threshold; private: bool _shmSegCreated; bool _attached; Loading ndb/src/common/transporter/TransporterRegistry.cpp +209 −184 Original line number Diff line number Diff line Loading @@ -47,6 +47,8 @@ #include <InputStream.hpp> #include <OutputStream.hpp> int g_shm_pid = 0; SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) { DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); Loading Loading @@ -622,11 +624,28 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ return retVal; #endif if((nSHMTransporters+nSCITransporters) > 0) if((nSCITransporters) > 0) { timeOutMillis=0; } #ifdef NDB_SHM_TRANSPORTER if(nSHMTransporters > 0) { Uint32 res = poll_SHM(0); if(res) { retVal |= res; timeOutMillis = 0; } } #endif #ifdef NDB_TCP_TRANSPORTER if(nTCPTransporters > 0) if(nTCPTransporters > 0 || retVal == 0) { retVal |= poll_TCP(timeOutMillis); } else tcpReadSelectReply = 0; #endif Loading @@ -635,8 +654,11 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ retVal |= poll_SCI(timeOutMillis); #endif #ifdef NDB_SHM_TRANSPORTER if(nSHMTransporters > 0) retVal |= poll_SHM(timeOutMillis); if(nSHMTransporters > 0 && retVal == 0) { int res = poll_SHM(0); retVal |= res; } #endif return retVal; } Loading @@ -644,8 +666,8 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ #ifdef NDB_SCI_TRANSPORTER Uint32 TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ TransporterRegistry::poll_SCI(Uint32 timeOutMillis) { for (int i=0; i<nSCITransporters; i++) { SCI_Transporter * t = theSCITransporters[i]; if (t->isConnected()) { Loading @@ -659,73 +681,29 @@ TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ #ifdef NDB_SHM_TRANSPORTER static int g_shm_counter = 0; Uint32 TransporterRegistry::poll_SHM(Uint32 timeOutMillis) { for(int j=0; j < 20; j++) for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) { return 1; } } } /** * @note: granularity of linux/i386 timer is not good enough. * Can't sleep if using SHM as it is now. */ /* if(timeOutMillis > 0) NdbSleep_MilliSleep(timeOutMillis); else NdbSleep_MilliSleep(1); */ return 0; #if 0 NDB_TICKS startTime = NdbTick_CurrentMillisecond(); for(int i=0; i<100; i++) { for(int j=0; j < 100; j++) { for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) { return 1; } else continue; } else continue; } if(NdbTick_CurrentMillisecond() > (startTime +timeOutMillis)) return 0; } NdbSleep_MilliSleep(5); return 0; #endif #if 0 for(int j=0; j < 100; j++) { for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) return 1; } } } return 0; #endif } #endif #ifdef NDB_OSE_TRANSPORTER Uint32 TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ TransporterRegistry::poll_OSE(Uint32 timeOutMillis) { if(theOSEReceiver != NULL){ return theOSEReceiver->doReceive(timeOutMillis); } Loading @@ -736,16 +714,16 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ #ifdef NDB_TCP_TRANSPORTER Uint32 TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ if (nTCPTransporters == 0){ TransporterRegistry::poll_TCP(Uint32 timeOutMillis) { if (false && nTCPTransporters == 0) { tcpReadSelectReply = 0; return 0; } struct timeval timeout; #ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ Loading @@ -760,7 +738,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ timeout.tv_usec = (timeOutMillis % 1000) * 1000; #endif NDB_SOCKET_TYPE maxSocketValue = 0; NDB_SOCKET_TYPE maxSocketValue = -1; // Needed for TCP/IP connections // The read- and writeset are used by select Loading Loading @@ -788,6 +766,9 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ maxSocketValue++; tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout); if(false && tcpReadSelectReply == -1 && errno == EINTR) ndbout_c("woke-up by signal"); #ifdef NDB_WIN32 if(tcpReadSelectReply == SOCKET_ERROR) { Loading @@ -801,10 +782,13 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ void TransporterRegistry::performReceive(){ TransporterRegistry::performReceive() { #ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != 0){ while(theOSEReceiver->hasData()){ if(theOSEReceiver != 0) { while(theOSEReceiver->hasData()) { NodeId remoteNodeId; Uint32 * readPtr; Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr); Loading @@ -827,16 +811,20 @@ TransporterRegistry::performReceive(){ #endif #ifdef NDB_TCP_TRANSPORTER if(tcpReadSelectReply > 0){ for (int i=0; i<nTCPTransporters; i++) { if(tcpReadSelectReply > 0) { for (int i=0; i<nTCPTransporters; i++) { checkJobBuffer(); TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { const int receiveSize = t->doReceive(); if(receiveSize > 0){ if(receiveSize > 0) { Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]); Loading @@ -848,16 +836,18 @@ TransporterRegistry::performReceive(){ } #endif #ifdef NDB_SCI_TRANSPORTER //performReceive //do prepareReceive on the SCI transporters (prepareReceive(t,,,,)) for (int i=0; i<nSCITransporters; i++) { for (int i=0; i<nSCITransporters; i++) { checkJobBuffer(); SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ if(is_connected(nodeId)) { if(t->isConnected() && t->checkConnected()) { Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); Loading @@ -867,12 +857,14 @@ TransporterRegistry::performReceive(){ } #endif #ifdef NDB_SHM_TRANSPORTER for (int i=0; i<nSHMTransporters; i++) { for (int i=0; i<nSHMTransporters; i++) { checkJobBuffer(); SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ if(t->isConnected() && t->checkConnected()) { Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); Loading @@ -885,15 +877,17 @@ TransporterRegistry::performReceive(){ static int x = 0; void TransporterRegistry::performSend(){ TransporterRegistry::performSend() { int i; sendCounter = 1; #ifdef NDB_OSE_TRANSPORTER for (int i = 0; i < nOSETransporters; i++){ for (int i = 0; i < nOSETransporters; i++) { OSE_Transporter *t = theOSETransporters[i]; if((is_connected(t->getRemoteNodeId()) && (t->isConnected())) { if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected())) { t->doSend(); }//if }//for Loading Loading @@ -932,7 +926,8 @@ TransporterRegistry::performSend(){ struct timeval timeout = { 0, 1025 }; Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout); if (tmp == 0) { if (tmp == 0) { return; }//if for (i = 0; i < nTCPTransporters; i++) { Loading @@ -948,24 +943,24 @@ TransporterRegistry::performSend(){ } #endif #ifdef NDB_TCP_TRANSPORTER for (i = x; i < nTCPTransporters; i++) { for (i = x; i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && (t->hasDataToSend()) && (t->isConnected()) && (is_connected(t->getRemoteNodeId()))) { if (t && t->hasDataToSend() && t->isConnected() && is_connected(t->getRemoteNodeId())) { t->doSend(); }//if }//for for (i = 0; i < x && i < nTCPTransporters; i++) { } } for (i = 0; i < x && i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && (t->hasDataToSend()) && (t->isConnected()) && (is_connected(t->getRemoteNodeId()))) { if (t && t->hasDataToSend() && t->isConnected() && is_connected(t->getRemoteNodeId())) { t->doSend(); }//if }//for } } x++; if (x == nTCPTransporters) x = 0; #endif Loading @@ -977,12 +972,28 @@ TransporterRegistry::performSend(){ SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(is_connected(nodeId)) { if(t->isConnected() && t->hasDataToSend()) { t->doSend(); } //if } //if } //if } #endif #ifdef NDB_SHM_TRANSPORTER for (i=0; i<nSHMTransporters; i++) { SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)) { if(t->isConnected()) { t->doSend(); } } } #endif } Loading Loading @@ -1232,6 +1243,15 @@ TransporterRegistry::start_service(SocketServer& socket_server) return true; } #ifdef NDB_SHM_TRANSPORTER static RETSIGTYPE shm_sig_handler(int signo) { g_shm_counter++; } #endif void TransporterRegistry::startReceiving() { Loading @@ -1250,6 +1270,11 @@ TransporterRegistry::startReceiving() for(int i = 0; i<nTCPTransporters; i++) theTCPTransporters[i]->theReceiverPid = theReceiverPid; #endif #ifdef NDB_SHM_TRANSPORTER m_shm_own_pid = getpid(); signal(SIGUSR1, shm_sig_handler); #endif } void Loading Loading
ndb/docs/wl2077.txt +25 −12 Original line number Diff line number Diff line Loading @@ -10,11 +10,17 @@ results in 1000 rows / sec wl2077-read committed 6.4 (+30%) 10.8 (+45%) wl2077-read hold lock 4.6 (-1%) 6.7 (+ 0%) -- Comparision e) serial pk: 10.9' batched (1000): 59' serial uniq index: 8.4' batched (1000): 33' 5.0-ndb batch read committed f) 50' (+680%) g) 50' (+360%) 5.0-ndb batch read hold lock h) 12' (+160%) i) 13' (+79%) shm-mem read committed (cmp. wl2077) a) 9.5' (+48%) b) 14' (+30%) read hold lock c) 6.7' (+45%) d) 9.8' (+46%) -- Comparision e) shm serial pk: 10.9' 20' (+83%) batched (1000): 59' 62' (+5%) serial uniq index: 8.4' 14' (+66%) batched (1000): 33' 36' (+9%) index range (1000): 186' ---- Loading @@ -25,6 +31,8 @@ b) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 2 -q 1 T1 c) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 0 T1 d) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 1 -r 2 -q 1 T1 e) testReadPerf -i 25 -c 0 -d 0 T1 f) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 3 -q 0 -m 1000 -i 10 T1 g) testScanPerf -s 100000 -c 0 -d 0 -a 1 -l 0 -r 3 -q 1 -m 1000 -i 10 T1 --- music join 1db-co 2db-co Loading @@ -33,3 +41,8 @@ e) testReadPerf -i 25 -c 0 -d 0 T1 wl2077 12s 14s wl2077 wo/ blobs 1.2s (-30%) 2.5s (-22%) pekka-blob-fix 1.3s shm 1.2s 2.0s shm wo/ blobs 1.1s 2.0s
ndb/include/transporter/TransporterRegistry.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -87,6 +87,7 @@ public: */ class TransporterRegistry { friend class OSE_Receiver; friend class SHM_Transporter; friend class Transporter; friend class TransporterService; public: Loading Loading @@ -312,6 +313,8 @@ private: Uint32 poll_TCP(Uint32 timeOutMillis); Uint32 poll_SCI(Uint32 timeOutMillis); Uint32 poll_SHM(Uint32 timeOutMillis); int m_shm_own_pid; }; #endif // Define of TransporterRegistry_H
ndb/src/common/transporter/SHM_Transporter.cpp +41 −44 Original line number Diff line number Diff line Loading @@ -26,6 +26,8 @@ #include <InputStream.hpp> #include <OutputStream.hpp> extern int g_shm_pid; SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, const char *lHostName, const char *rHostName, Loading @@ -52,6 +54,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg, #ifdef DEBUG_TRANSPORTER printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey); #endif m_signal_threshold = 4096; } SHM_Transporter::~SHM_Transporter(){ Loading Loading @@ -182,42 +185,6 @@ SHM_Transporter::setupBuffers(){ #endif } #if 0 SendStatus SHM_Transporter::prepareSend(const SignalHeader * const signalHeader, Uint8 prio, const Uint32 * const signalData, const LinearSegmentPtr ptr[3], bool force){ if(isConnected()){ const Uint32 lenBytes = m_packer.getMessageLength(signalHeader, ptr); Uint32 * insertPtr = (Uint32 *)writer->getWritePtr(lenBytes); if(insertPtr != 0){ m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr); /** * Do funky membar stuff */ writer->updateWritePtr(lenBytes); return SEND_OK; } else { // NdbSleep_MilliSleep(3); //goto tryagain; return SEND_BUFFER_FULL; } } return SEND_DISCONNECTED; } #endif bool SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) { Loading Loading @@ -247,10 +214,18 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd) } // Send ok to client s_output.println("shm server 1 ok"); s_output.println("shm server 1 ok: %d", m_transporter_registry.m_shm_own_pid); // Wait for ok from client if (s_input.gets(buf, 256) == 0) { if (s_input.gets(buf, 256) == 0) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } Loading Loading @@ -289,6 +264,12 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) DBUG_RETURN(false); } if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1) { NDB_CLOSE_SOCKET(sockfd); DBUG_RETURN(false); } // Create if(!_shmSegCreated){ if (!ndb_shm_get()) { Loading @@ -313,7 +294,8 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd) } // Send ok to server s_output.println("shm client 1 ok"); s_output.println("shm client 1 ok: %d", m_transporter_registry.m_shm_own_pid); int r= connect_common(sockfd); Loading Loading @@ -344,18 +326,33 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd) return false; } if(!setupBuffersDone) { if(!setupBuffersDone) { setupBuffers(); setupBuffersDone=true; } if(setupBuffersDone) { if(setupBuffersDone) { NdbSleep_MilliSleep(m_timeOutMillis); if(*serverStatusFlag == 1 && *clientStatusFlag == 1) { m_last_signal = 0; return true; } } DBUG_PRINT("error", ("Failed to set up buffers to node %d", remoteNodeId)); return false; } void SHM_Transporter::doSend() { if(m_last_signal) { m_last_signal = 0; kill(m_remote_pid, SIGUSR1); } }
ndb/src/common/transporter/SHM_Transporter.hpp +24 −9 Original line number Diff line number Diff line Loading @@ -53,12 +53,19 @@ public: */ bool initTransporter(); Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio){ Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio) { return (Uint32 *)writer->getWritePtr(lenBytes); } void updateWritePtr(Uint32 lenBytes, Uint32 prio){ void updateWritePtr(Uint32 lenBytes, Uint32 prio) { writer->updateWritePtr(lenBytes); m_last_signal += lenBytes; if(m_last_signal >= m_signal_threshold) { doSend(); } } void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){ Loading Loading @@ -123,6 +130,14 @@ protected: */ void setupBuffers(); /** * doSend (i.e signal receiver) */ void doSend(); int m_remote_pid; Uint32 m_last_signal; Uint32 m_signal_threshold; private: bool _shmSegCreated; bool _attached; Loading
ndb/src/common/transporter/TransporterRegistry.cpp +209 −184 Original line number Diff line number Diff line Loading @@ -47,6 +47,8 @@ #include <InputStream.hpp> #include <OutputStream.hpp> int g_shm_pid = 0; SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd) { DBUG_ENTER("SocketServer::Session * TransporterService::newSession"); Loading Loading @@ -622,11 +624,28 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ return retVal; #endif if((nSHMTransporters+nSCITransporters) > 0) if((nSCITransporters) > 0) { timeOutMillis=0; } #ifdef NDB_SHM_TRANSPORTER if(nSHMTransporters > 0) { Uint32 res = poll_SHM(0); if(res) { retVal |= res; timeOutMillis = 0; } } #endif #ifdef NDB_TCP_TRANSPORTER if(nTCPTransporters > 0) if(nTCPTransporters > 0 || retVal == 0) { retVal |= poll_TCP(timeOutMillis); } else tcpReadSelectReply = 0; #endif Loading @@ -635,8 +654,11 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ retVal |= poll_SCI(timeOutMillis); #endif #ifdef NDB_SHM_TRANSPORTER if(nSHMTransporters > 0) retVal |= poll_SHM(timeOutMillis); if(nSHMTransporters > 0 && retVal == 0) { int res = poll_SHM(0); retVal |= res; } #endif return retVal; } Loading @@ -644,8 +666,8 @@ TransporterRegistry::pollReceive(Uint32 timeOutMillis){ #ifdef NDB_SCI_TRANSPORTER Uint32 TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ TransporterRegistry::poll_SCI(Uint32 timeOutMillis) { for (int i=0; i<nSCITransporters; i++) { SCI_Transporter * t = theSCITransporters[i]; if (t->isConnected()) { Loading @@ -659,73 +681,29 @@ TransporterRegistry::poll_SCI(Uint32 timeOutMillis){ #ifdef NDB_SHM_TRANSPORTER static int g_shm_counter = 0; Uint32 TransporterRegistry::poll_SHM(Uint32 timeOutMillis) { for(int j=0; j < 20; j++) for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) { return 1; } } } /** * @note: granularity of linux/i386 timer is not good enough. * Can't sleep if using SHM as it is now. */ /* if(timeOutMillis > 0) NdbSleep_MilliSleep(timeOutMillis); else NdbSleep_MilliSleep(1); */ return 0; #if 0 NDB_TICKS startTime = NdbTick_CurrentMillisecond(); for(int i=0; i<100; i++) { for(int j=0; j < 100; j++) { for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) { return 1; } else continue; } else continue; } if(NdbTick_CurrentMillisecond() > (startTime +timeOutMillis)) return 0; } NdbSleep_MilliSleep(5); return 0; #endif #if 0 for(int j=0; j < 100; j++) { for (int i=0; i<nSHMTransporters; i++) { SHM_Transporter * t = theSHMTransporters[i]; if (t->isConnected()) { if(t->hasDataToRead()) return 1; } } } return 0; #endif } #endif #ifdef NDB_OSE_TRANSPORTER Uint32 TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ TransporterRegistry::poll_OSE(Uint32 timeOutMillis) { if(theOSEReceiver != NULL){ return theOSEReceiver->doReceive(timeOutMillis); } Loading @@ -736,16 +714,16 @@ TransporterRegistry::poll_OSE(Uint32 timeOutMillis){ #ifdef NDB_TCP_TRANSPORTER Uint32 TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ if (nTCPTransporters == 0){ TransporterRegistry::poll_TCP(Uint32 timeOutMillis) { if (false && nTCPTransporters == 0) { tcpReadSelectReply = 0; return 0; } struct timeval timeout; #ifdef NDB_OSE // Return directly if there are no TCP transporters configured if(timeOutMillis <= 1){ Loading @@ -760,7 +738,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ timeout.tv_usec = (timeOutMillis % 1000) * 1000; #endif NDB_SOCKET_TYPE maxSocketValue = 0; NDB_SOCKET_TYPE maxSocketValue = -1; // Needed for TCP/IP connections // The read- and writeset are used by select Loading Loading @@ -788,6 +766,9 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ maxSocketValue++; tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout); if(false && tcpReadSelectReply == -1 && errno == EINTR) ndbout_c("woke-up by signal"); #ifdef NDB_WIN32 if(tcpReadSelectReply == SOCKET_ERROR) { Loading @@ -801,10 +782,13 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis){ void TransporterRegistry::performReceive(){ TransporterRegistry::performReceive() { #ifdef NDB_OSE_TRANSPORTER if(theOSEReceiver != 0){ while(theOSEReceiver->hasData()){ if(theOSEReceiver != 0) { while(theOSEReceiver->hasData()) { NodeId remoteNodeId; Uint32 * readPtr; Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr); Loading @@ -827,16 +811,20 @@ TransporterRegistry::performReceive(){ #endif #ifdef NDB_TCP_TRANSPORTER if(tcpReadSelectReply > 0){ for (int i=0; i<nTCPTransporters; i++) { if(tcpReadSelectReply > 0) { for (int i=0; i<nTCPTransporters; i++) { checkJobBuffer(); TCP_Transporter *t = theTCPTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); const NDB_SOCKET_TYPE socket = t->getSocket(); if(is_connected(nodeId)){ if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) { const int receiveSize = t->doReceive(); if(receiveSize > 0){ if(receiveSize > 0) { Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]); Loading @@ -848,16 +836,18 @@ TransporterRegistry::performReceive(){ } #endif #ifdef NDB_SCI_TRANSPORTER //performReceive //do prepareReceive on the SCI transporters (prepareReceive(t,,,,)) for (int i=0; i<nSCITransporters; i++) { for (int i=0; i<nSCITransporters; i++) { checkJobBuffer(); SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ if(is_connected(nodeId)) { if(t->isConnected() && t->checkConnected()) { Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); Loading @@ -867,12 +857,14 @@ TransporterRegistry::performReceive(){ } #endif #ifdef NDB_SHM_TRANSPORTER for (int i=0; i<nSHMTransporters; i++) { for (int i=0; i<nSHMTransporters; i++) { checkJobBuffer(); SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()){ if(t->isConnected() && t->checkConnected()) { Uint32 * readPtr, * eodPtr; t->getReceivePtr(&readPtr, &eodPtr); Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]); Loading @@ -885,15 +877,17 @@ TransporterRegistry::performReceive(){ static int x = 0; void TransporterRegistry::performSend(){ TransporterRegistry::performSend() { int i; sendCounter = 1; #ifdef NDB_OSE_TRANSPORTER for (int i = 0; i < nOSETransporters; i++){ for (int i = 0; i < nOSETransporters; i++) { OSE_Transporter *t = theOSETransporters[i]; if((is_connected(t->getRemoteNodeId()) && (t->isConnected())) { if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected())) { t->doSend(); }//if }//for Loading Loading @@ -932,7 +926,8 @@ TransporterRegistry::performSend(){ struct timeval timeout = { 0, 1025 }; Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout); if (tmp == 0) { if (tmp == 0) { return; }//if for (i = 0; i < nTCPTransporters; i++) { Loading @@ -948,24 +943,24 @@ TransporterRegistry::performSend(){ } #endif #ifdef NDB_TCP_TRANSPORTER for (i = x; i < nTCPTransporters; i++) { for (i = x; i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && (t->hasDataToSend()) && (t->isConnected()) && (is_connected(t->getRemoteNodeId()))) { if (t && t->hasDataToSend() && t->isConnected() && is_connected(t->getRemoteNodeId())) { t->doSend(); }//if }//for for (i = 0; i < x && i < nTCPTransporters; i++) { } } for (i = 0; i < x && i < nTCPTransporters; i++) { TCP_Transporter *t = theTCPTransporters[i]; if (t && (t->hasDataToSend()) && (t->isConnected()) && (is_connected(t->getRemoteNodeId()))) { if (t && t->hasDataToSend() && t->isConnected() && is_connected(t->getRemoteNodeId())) { t->doSend(); }//if }//for } } x++; if (x == nTCPTransporters) x = 0; #endif Loading @@ -977,12 +972,28 @@ TransporterRegistry::performSend(){ SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)){ if(is_connected(nodeId)) { if(t->isConnected() && t->hasDataToSend()) { t->doSend(); } //if } //if } //if } #endif #ifdef NDB_SHM_TRANSPORTER for (i=0; i<nSHMTransporters; i++) { SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); if(is_connected(nodeId)) { if(t->isConnected()) { t->doSend(); } } } #endif } Loading Loading @@ -1232,6 +1243,15 @@ TransporterRegistry::start_service(SocketServer& socket_server) return true; } #ifdef NDB_SHM_TRANSPORTER static RETSIGTYPE shm_sig_handler(int signo) { g_shm_counter++; } #endif void TransporterRegistry::startReceiving() { Loading @@ -1250,6 +1270,11 @@ TransporterRegistry::startReceiving() for(int i = 0; i<nTCPTransporters; i++) theTCPTransporters[i]->theReceiverPid = theReceiverPid; #endif #ifdef NDB_SHM_TRANSPORTER m_shm_own_pid = getpid(); signal(SIGUSR1, shm_sig_handler); #endif } void Loading