Loading ndb/include/util/SocketServer.hpp +5 −3 Original line number Diff line number Diff line Loading @@ -105,6 +105,7 @@ public: void stopSessions(bool wait = false); void foreachSession(void (*f)(Session*, void*), void *data); void checkSessions(); private: struct SessionInstance { Loading @@ -116,12 +117,13 @@ private: Service * m_service; NDB_SOCKET_TYPE m_socket; }; MutexVector<SessionInstance> m_sessions; NdbLockable m_session_mutex; Vector<SessionInstance> m_sessions; MutexVector<ServiceInstance> m_services; unsigned m_maxSessions; void doAccept(); void checkSessions(); void checkSessionsImpl(); void startSession(SessionInstance &); /** Loading ndb/src/common/util/SocketServer.cpp +35 −9 Original line number Diff line number Diff line Loading @@ -184,9 +184,12 @@ SocketServer::doAccept(){ SessionInstance s; s.m_service = si.m_service; s.m_session = si.m_service->newSession(childSock); if(s.m_session != 0){ if(s.m_session != 0) { m_session_mutex.lock(); m_sessions.push_back(s); startSession(m_sessions.back()); m_session_mutex.unlock(); } continue; Loading Loading @@ -240,10 +243,13 @@ void SocketServer::doRun(){ while(!m_stopThread){ checkSessions(); m_session_mutex.lock(); checkSessionsImpl(); if(m_sessions.size() < m_maxSessions){ m_session_mutex.unlock(); doAccept(); } else { m_session_mutex.unlock(); NdbSleep_MilliSleep(200); } } Loading Loading @@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){ void SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) { m_session_mutex.lock(); for(int i = m_sessions.size() - 1; i >= 0; i--){ (*func)(m_sessions[i].m_session, data); } checkSessions(); m_session_mutex.unlock(); } void SocketServer::checkSessions(){ for(int i = m_sessions.size() - 1; i >= 0; i--){ if(m_sessions[i].m_session->m_stopped){ if(m_sessions[i].m_thread != 0){ SocketServer::checkSessions() { m_session_mutex.lock(); checkSessionsImpl(); m_session_mutex.unlock(); } void SocketServer::checkSessionsImpl() { for(int i = m_sessions.size() - 1; i >= 0; i--) { if(m_sessions[i].m_session->m_stopped) { if(m_sessions[i].m_thread != 0) { void* ret; NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_Destroy(&m_sessions[i].m_thread); Loading @@ -301,19 +320,26 @@ SocketServer::checkSessions(){ void SocketServer::stopSessions(bool wait){ int i; m_session_mutex.lock(); for(i = m_sessions.size() - 1; i>=0; i--) { m_sessions[i].m_session->stopSession(); m_sessions[i].m_session->m_stop = true; // to make sure } m_session_mutex.unlock(); for(i = m_services.size() - 1; i>=0; i--) m_services[i].m_service->stopSessions(); if(wait){ m_session_mutex.lock(); while(m_sessions.size() > 0){ checkSessions(); checkSessionsImpl(); m_session_mutex.unlock(); NdbSleep_MilliSleep(100); m_session_mutex.lock(); } m_session_mutex.unlock(); } } Loading Loading @@ -348,4 +374,4 @@ sessionThread_C(void* _sc){ } template class MutexVector<SocketServer::ServiceInstance>; template class MutexVector<SocketServer::SessionInstance>; template class Vector<SocketServer::SessionInstance>; ndb/src/kernel/vm/Configuration.cpp +5 −3 Original line number Diff line number Diff line Loading @@ -49,7 +49,9 @@ extern EventLogger g_eventLogger; enum ndbd_options { OPT_INITIAL = NDB_STD_OPTIONS_LAST, OPT_NODAEMON, OPT_FOREGROUND OPT_FOREGROUND, OPT_NOWAIT_NODES, OPT_INITIAL_START }; NDB_STD_OPTS_VARS; Loading Loading @@ -88,11 +90,11 @@ static struct my_option my_long_options[] = " (implies --nodaemon)", (gptr*) &_foreground, (gptr*) &_foreground, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "nowait-nodes", NO_ARG, { "nowait-nodes", OPT_NOWAIT_NODES, "Nodes that will not be waited for during start", (gptr*) &_nowait_nodes, (gptr*) &_nowait_nodes, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "initial-start", NO_ARG, { "initial-start", OPT_INITIAL_START, "Perform initial start", (gptr*) &_initialstart, (gptr*) &_initialstart, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, Loading ndb/src/mgmsrv/Services.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -501,6 +501,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ps.tick= tick; m_mgmsrv.get_socket_server()-> foreachSession(stop_session_if_timed_out,&ps); m_mgmsrv.get_socket_server()->checkSessions(); error_string = ""; continue; } Loading Loading @@ -1558,6 +1559,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); m_mgmsrv.get_socket_server()->checkSessions(); m_output->println("purge stale sessions reply"); if (str.length() > 0) Loading Loading
ndb/include/util/SocketServer.hpp +5 −3 Original line number Diff line number Diff line Loading @@ -105,6 +105,7 @@ public: void stopSessions(bool wait = false); void foreachSession(void (*f)(Session*, void*), void *data); void checkSessions(); private: struct SessionInstance { Loading @@ -116,12 +117,13 @@ private: Service * m_service; NDB_SOCKET_TYPE m_socket; }; MutexVector<SessionInstance> m_sessions; NdbLockable m_session_mutex; Vector<SessionInstance> m_sessions; MutexVector<ServiceInstance> m_services; unsigned m_maxSessions; void doAccept(); void checkSessions(); void checkSessionsImpl(); void startSession(SessionInstance &); /** Loading
ndb/src/common/util/SocketServer.cpp +35 −9 Original line number Diff line number Diff line Loading @@ -184,9 +184,12 @@ SocketServer::doAccept(){ SessionInstance s; s.m_service = si.m_service; s.m_session = si.m_service->newSession(childSock); if(s.m_session != 0){ if(s.m_session != 0) { m_session_mutex.lock(); m_sessions.push_back(s); startSession(m_sessions.back()); m_session_mutex.unlock(); } continue; Loading Loading @@ -240,10 +243,13 @@ void SocketServer::doRun(){ while(!m_stopThread){ checkSessions(); m_session_mutex.lock(); checkSessionsImpl(); if(m_sessions.size() < m_maxSessions){ m_session_mutex.unlock(); doAccept(); } else { m_session_mutex.unlock(); NdbSleep_MilliSleep(200); } } Loading Loading @@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){ void SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data) { m_session_mutex.lock(); for(int i = m_sessions.size() - 1; i >= 0; i--){ (*func)(m_sessions[i].m_session, data); } checkSessions(); m_session_mutex.unlock(); } void SocketServer::checkSessions(){ for(int i = m_sessions.size() - 1; i >= 0; i--){ if(m_sessions[i].m_session->m_stopped){ if(m_sessions[i].m_thread != 0){ SocketServer::checkSessions() { m_session_mutex.lock(); checkSessionsImpl(); m_session_mutex.unlock(); } void SocketServer::checkSessionsImpl() { for(int i = m_sessions.size() - 1; i >= 0; i--) { if(m_sessions[i].m_session->m_stopped) { if(m_sessions[i].m_thread != 0) { void* ret; NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_Destroy(&m_sessions[i].m_thread); Loading @@ -301,19 +320,26 @@ SocketServer::checkSessions(){ void SocketServer::stopSessions(bool wait){ int i; m_session_mutex.lock(); for(i = m_sessions.size() - 1; i>=0; i--) { m_sessions[i].m_session->stopSession(); m_sessions[i].m_session->m_stop = true; // to make sure } m_session_mutex.unlock(); for(i = m_services.size() - 1; i>=0; i--) m_services[i].m_service->stopSessions(); if(wait){ m_session_mutex.lock(); while(m_sessions.size() > 0){ checkSessions(); checkSessionsImpl(); m_session_mutex.unlock(); NdbSleep_MilliSleep(100); m_session_mutex.lock(); } m_session_mutex.unlock(); } } Loading Loading @@ -348,4 +374,4 @@ sessionThread_C(void* _sc){ } template class MutexVector<SocketServer::ServiceInstance>; template class MutexVector<SocketServer::SessionInstance>; template class Vector<SocketServer::SessionInstance>;
ndb/src/kernel/vm/Configuration.cpp +5 −3 Original line number Diff line number Diff line Loading @@ -49,7 +49,9 @@ extern EventLogger g_eventLogger; enum ndbd_options { OPT_INITIAL = NDB_STD_OPTIONS_LAST, OPT_NODAEMON, OPT_FOREGROUND OPT_FOREGROUND, OPT_NOWAIT_NODES, OPT_INITIAL_START }; NDB_STD_OPTS_VARS; Loading Loading @@ -88,11 +90,11 @@ static struct my_option my_long_options[] = " (implies --nodaemon)", (gptr*) &_foreground, (gptr*) &_foreground, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, { "nowait-nodes", NO_ARG, { "nowait-nodes", OPT_NOWAIT_NODES, "Nodes that will not be waited for during start", (gptr*) &_nowait_nodes, (gptr*) &_nowait_nodes, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { "initial-start", NO_ARG, { "initial-start", OPT_INITIAL_START, "Perform initial start", (gptr*) &_initialstart, (gptr*) &_initialstart, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, Loading
ndb/src/mgmsrv/Services.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -501,6 +501,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ps.tick= tick; m_mgmsrv.get_socket_server()-> foreachSession(stop_session_if_timed_out,&ps); m_mgmsrv.get_socket_server()->checkSessions(); error_string = ""; continue; } Loading Loading @@ -1558,6 +1559,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps); m_mgmsrv.get_socket_server()->checkSessions(); m_output->println("purge stale sessions reply"); if (str.length() > 0) Loading