Loading ndb/src/mgmsrv/MgmtSrvr.cpp +16 −1 Original line number Diff line number Diff line Loading @@ -2378,6 +2378,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted) MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m) : m_mgmsrv(m) { m_reserved_nodes.clear(); m_alloc_timeout= 0; } MgmtSrvr::Allocated_resources::~Allocated_resources() Loading @@ -2396,9 +2398,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() } void MgmtSrvr::Allocated_resources::reserve_node(NodeId id) MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout) { m_reserved_nodes.set(id); m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout; } bool MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick) { if (m_alloc_timeout && tick > m_alloc_timeout) { g_eventLogger.info("Mgmt server state: nodeid %d timed out.", get_nodeid()); return true; } return false; } NodeId Loading ndb/src/mgmsrv/MgmtSrvr.hpp +3 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,8 @@ public: ~Allocated_resources(); // methods to reserve/allocate resources which // will be freed when running destructor void reserve_node(NodeId id); void reserve_node(NodeId id, NDB_TICKS timeout); bool is_timed_out(NDB_TICKS tick); bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } bool isclear() { return m_reserved_nodes.isclear(); } Loading @@ -114,6 +115,7 @@ public: private: MgmtSrvr &m_mgmsrv; NodeBitmask m_reserved_nodes; NDB_TICKS m_alloc_timeout; }; NdbMutex *m_node_id_mutex; Loading ndb/src/mgmsrv/Services.cpp +45 −12 Original line number Diff line number Diff line Loading @@ -137,6 +137,7 @@ ParserRow<MgmApiSession> commands[] = { MGM_ARG("public key", String, Mandatory, "Public key"), MGM_ARG("endian", String, Optional, "Endianness"), MGM_ARG("name", String, Optional, "Name of connection"), MGM_ARG("timeout", Int, Optional, "Timeout in seconds"), MGM_CMD("get version", &MgmApiSession::getVersion, ""), Loading Loading @@ -259,6 +260,15 @@ ParserRow<MgmApiSession> commands[] = { MGM_END() }; struct PurgeStruct { NodeBitmask free_nodes;/* free nodes as reported * by ndbd in apiRegReqConf */ BaseString *str; NDB_TICKS tick; }; MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) : SocketServer::Session(sock), m_mgmsrv(mgm) { Loading Loading @@ -408,6 +418,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, { const char *cmd= "get nodeid reply"; Uint32 version, nodeid= 0, nodetype= 0xff; Uint32 timeout= 20; // default seconds timeout const char * transporter; const char * user; const char * password; Loading @@ -425,6 +436,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, args.get("public key", &public_key); args.get("endian", &endian); args.get("name", &name); args.get("timeout", &timeout); endian_check.l = 1; if(endian Loading Loading @@ -464,8 +476,24 @@ MgmApiSession::get_nodeid(Parser_t::Context &, NodeId tmp= nodeid; if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){ BaseString error_string; if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, &addr, &addrlen, error_string)){ NDB_TICKS tick= 0; while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, &addr, &addrlen, error_string)) { if (tick == 0) { // attempt to free any timed out reservations tick= NdbTick_CurrentMillisecond(); struct PurgeStruct ps; m_mgmsrv.get_connected_nodes(ps.free_nodes); // invert connected_nodes to get free nodes ps.free_nodes.bitXORC(NodeBitmask()); ps.str= 0; ps.tick= tick; m_mgmsrv.get_socket_server()-> foreachSession(stop_session_if_timed_out,&ps); continue; } const char *alias; const char *str; alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type) Loading @@ -491,7 +519,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, m_output->println("nodeid: %u", tmp); m_output->println("result: Ok"); m_output->println(""); m_allocated_resources->reserve_node(tmp); m_allocated_resources->reserve_node(tmp, timeout*1000); if (name) g_eventLogger.info("Node %d: %s", tmp, name); Loading Loading @@ -1480,14 +1508,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx, m_output->println(""); } struct PurgeStruct { NodeBitmask free_nodes;/* free nodes as reported * by ndbd in apiRegReqConf */ BaseString *str; }; void MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) { Loading @@ -1495,11 +1515,24 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da struct PurgeStruct &ps= *(struct PurgeStruct *)data; if (s->m_allocated_resources->is_reserved(ps.free_nodes)) { if (ps.str) ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); s->stopSession(); } } void MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data) { MgmApiSession *s= (MgmApiSession *)_s; struct PurgeStruct &ps= *(struct PurgeStruct *)data; if (s->m_allocated_resources->is_reserved(ps.free_nodes) && s->m_allocated_resources->is_timed_out(ps.tick)) { s->stopSession(); } } void MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args) Loading ndb/src/mgmsrv/Services.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ class MgmApiSession : public SocketServer::Session { static void stop_session_if_timed_out(SocketServer::Session *_s, void *data); static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); private: typedef Parser<MgmApiSession> Parser_t; Loading Loading
ndb/src/mgmsrv/MgmtSrvr.cpp +16 −1 Original line number Diff line number Diff line Loading @@ -2378,6 +2378,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted) MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m) : m_mgmsrv(m) { m_reserved_nodes.clear(); m_alloc_timeout= 0; } MgmtSrvr::Allocated_resources::~Allocated_resources() Loading @@ -2396,9 +2398,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() } void MgmtSrvr::Allocated_resources::reserve_node(NodeId id) MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout) { m_reserved_nodes.set(id); m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout; } bool MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick) { if (m_alloc_timeout && tick > m_alloc_timeout) { g_eventLogger.info("Mgmt server state: nodeid %d timed out.", get_nodeid()); return true; } return false; } NodeId Loading
ndb/src/mgmsrv/MgmtSrvr.hpp +3 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,8 @@ public: ~Allocated_resources(); // methods to reserve/allocate resources which // will be freed when running destructor void reserve_node(NodeId id); void reserve_node(NodeId id, NDB_TICKS timeout); bool is_timed_out(NDB_TICKS tick); bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } bool isclear() { return m_reserved_nodes.isclear(); } Loading @@ -114,6 +115,7 @@ public: private: MgmtSrvr &m_mgmsrv; NodeBitmask m_reserved_nodes; NDB_TICKS m_alloc_timeout; }; NdbMutex *m_node_id_mutex; Loading
ndb/src/mgmsrv/Services.cpp +45 −12 Original line number Diff line number Diff line Loading @@ -137,6 +137,7 @@ ParserRow<MgmApiSession> commands[] = { MGM_ARG("public key", String, Mandatory, "Public key"), MGM_ARG("endian", String, Optional, "Endianness"), MGM_ARG("name", String, Optional, "Name of connection"), MGM_ARG("timeout", Int, Optional, "Timeout in seconds"), MGM_CMD("get version", &MgmApiSession::getVersion, ""), Loading Loading @@ -259,6 +260,15 @@ ParserRow<MgmApiSession> commands[] = { MGM_END() }; struct PurgeStruct { NodeBitmask free_nodes;/* free nodes as reported * by ndbd in apiRegReqConf */ BaseString *str; NDB_TICKS tick; }; MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) : SocketServer::Session(sock), m_mgmsrv(mgm) { Loading Loading @@ -408,6 +418,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, { const char *cmd= "get nodeid reply"; Uint32 version, nodeid= 0, nodetype= 0xff; Uint32 timeout= 20; // default seconds timeout const char * transporter; const char * user; const char * password; Loading @@ -425,6 +436,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, args.get("public key", &public_key); args.get("endian", &endian); args.get("name", &name); args.get("timeout", &timeout); endian_check.l = 1; if(endian Loading Loading @@ -464,8 +476,24 @@ MgmApiSession::get_nodeid(Parser_t::Context &, NodeId tmp= nodeid; if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){ BaseString error_string; if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, &addr, &addrlen, error_string)){ NDB_TICKS tick= 0; while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, &addr, &addrlen, error_string)) { if (tick == 0) { // attempt to free any timed out reservations tick= NdbTick_CurrentMillisecond(); struct PurgeStruct ps; m_mgmsrv.get_connected_nodes(ps.free_nodes); // invert connected_nodes to get free nodes ps.free_nodes.bitXORC(NodeBitmask()); ps.str= 0; ps.tick= tick; m_mgmsrv.get_socket_server()-> foreachSession(stop_session_if_timed_out,&ps); continue; } const char *alias; const char *str; alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type) Loading @@ -491,7 +519,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, m_output->println("nodeid: %u", tmp); m_output->println("result: Ok"); m_output->println(""); m_allocated_resources->reserve_node(tmp); m_allocated_resources->reserve_node(tmp, timeout*1000); if (name) g_eventLogger.info("Node %d: %s", tmp, name); Loading Loading @@ -1480,14 +1508,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx, m_output->println(""); } struct PurgeStruct { NodeBitmask free_nodes;/* free nodes as reported * by ndbd in apiRegReqConf */ BaseString *str; }; void MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) { Loading @@ -1495,11 +1515,24 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da struct PurgeStruct &ps= *(struct PurgeStruct *)data; if (s->m_allocated_resources->is_reserved(ps.free_nodes)) { if (ps.str) ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); s->stopSession(); } } void MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data) { MgmApiSession *s= (MgmApiSession *)_s; struct PurgeStruct &ps= *(struct PurgeStruct *)data; if (s->m_allocated_resources->is_reserved(ps.free_nodes) && s->m_allocated_resources->is_timed_out(ps.tick)) { s->stopSession(); } } void MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, const class Properties &args) Loading
ndb/src/mgmsrv/Services.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -30,6 +30,7 @@ class MgmApiSession : public SocketServer::Session { static void stop_session_if_timed_out(SocketServer::Session *_s, void *data); static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); private: typedef Parser<MgmApiSession> Parser_t; Loading