Loading ndb/src/mgmclient/CommandInterpreter.cpp +24 −4 Original line number Diff line number Diff line Loading @@ -173,8 +173,15 @@ class CommandInterpreter { bool rep_connected; #endif struct NdbThread* m_event_thread; NdbMutex *m_print_mutex; }; struct event_thread_param { NdbMgmHandle *m; NdbMutex **p; }; NdbMutex* print_mutex; /* * Facade object for CommandInterpreter Loading Loading @@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; m_print_mutex= NdbMutex_Create(); #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; Loading @@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); NdbMutex_Destroy(m_print_mutex); } static bool Loading Loading @@ -444,11 +453,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* event_thread_run(void* m) event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; struct event_thread_param param= *(struct event_thread_param*)p; NdbMgmHandle handle= *(param.m); NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, Loading @@ -466,8 +477,12 @@ event_thread_run(void* m) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) if(tmp && strlen(tmp)) { Guard g(printmutex); ndbout << tmp; } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } Loading Loading @@ -519,8 +534,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; struct event_thread_param p; p.m= &m_mgmsrv2; p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, (void**)&m_mgmsrv2, (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); Loading Loading @@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; return result; } Loading Loading @@ -920,6 +939,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); Loading ndb/src/mgmsrv/MgmtSrvr.cpp +25 −0 Original line number Diff line number Diff line Loading @@ -1455,6 +1455,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include <ClusterMgr.hpp> void MgmtSrvr::updateStatus(NodeBitmask nodes) { theFacade->theClusterMgr->forceHB(nodes); } int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, Loading Loading @@ -1979,6 +1985,25 @@ MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const } } void MgmtSrvr::get_connected_ndb_nodes(NodeBitmask &connected_nodes) const { NodeBitmask ndb_nodes; if (theFacade && theFacade->theClusterMgr) { for(Uint32 i = 0; i < MAX_NODES; i++) { if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) { ndb_nodes.set(i); const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); connected_nodes.bitOR(node.m_state.m_connected_nodes); } } } connected_nodes.bitAND(ndb_nodes); } bool MgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, Loading ndb/src/mgmsrv/MgmtSrvr.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -488,8 +488,11 @@ public: const char *get_connect_address(Uint32 node_id); void get_connected_nodes(NodeBitmask &connected_nodes) const; void get_connected_ndb_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } void updateStatus(NodeBitmask nodes); //************************************************************************** private: //************************************************************************** Loading ndb/src/mgmsrv/Services.cpp +3 −0 Original line number Diff line number Diff line Loading @@ -982,6 +982,9 @@ printNodeStatus(OutputStream *output, MgmtSrvr &mgmsrv, enum ndb_mgm_node_type type) { NodeId nodeId = 0; NodeBitmask hbnodes; mgmsrv.get_connected_ndb_nodes(hbnodes); mgmsrv.updateStatus(hbnodes); while(mgmsrv.getNextNodeId(&nodeId, type)) { enum ndb_mgm_node_status status; Uint32 startPhase = 0, Loading ndb/src/ndbapi/ClusterMgr.cpp +74 −4 Original line number Diff line number Diff line Loading @@ -39,6 +39,8 @@ int global_flag_send_heartbeat_now= 0; //#define DEBUG_REG // Just a C wrapper for threadMain extern "C" void* Loading Loading @@ -67,6 +69,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); waitForHBCond= NdbCondition_Create(); waitingForHB= false; noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; Loading @@ -78,6 +82,7 @@ ClusterMgr::~ClusterMgr() { DBUG_ENTER("ClusterMgr::~ClusterMgr"); doStop(); NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN; } Loading Loading @@ -163,6 +168,56 @@ ClusterMgr::doStop( ){ DBUG_VOID_RETURN; } void ClusterMgr::forceHB(NodeBitmask waitFor) { theFacade.lock_mutex(); if(waitingForHB) { NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); theFacade.unlock_mutex(); return; } global_flag_send_heartbeat_now= 1; waitingForHB= true; waitForHBFromNodes= waitFor; #ifdef DEBUG_REG char buf[128]; ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; int nodeId= 0; for(int i=0; NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i)); i= nodeId+1) { #ifdef DEBUG_REG ndbout << "FORCE HB to " << nodeId << endl; #endif theFacade.sendSignalUnCond(&signal, nodeId); } NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); waitingForHB= false; #ifdef DEBUG_REG ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif theFacade.unlock_mutex(); } void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); Loading Loading @@ -226,7 +281,7 @@ ClusterMgr::threadMain( ){ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; } #if 0 #ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); Loading Loading @@ -278,7 +333,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); #if 0 #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif Loading Loading @@ -319,7 +374,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); #if 0 #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif Loading Loading @@ -351,6 +406,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } if(waitingForHB) { waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) { waitingForHB= false; NdbCondition_Broadcast(waitForHBCond); } } } void Loading Loading @@ -379,6 +445,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){ default: break; } waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) NdbCondition_Signal(waitForHBCond); } void Loading Loading
ndb/src/mgmclient/CommandInterpreter.cpp +24 −4 Original line number Diff line number Diff line Loading @@ -173,8 +173,15 @@ class CommandInterpreter { bool rep_connected; #endif struct NdbThread* m_event_thread; NdbMutex *m_print_mutex; }; struct event_thread_param { NdbMgmHandle *m; NdbMutex **p; }; NdbMutex* print_mutex; /* * Facade object for CommandInterpreter Loading Loading @@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; m_print_mutex= NdbMutex_Create(); #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; Loading @@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); NdbMutex_Destroy(m_print_mutex); } static bool Loading Loading @@ -444,11 +453,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* event_thread_run(void* m) event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; struct event_thread_param param= *(struct event_thread_param*)p; NdbMgmHandle handle= *(param.m); NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, Loading @@ -466,8 +477,12 @@ event_thread_run(void* m) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) if(tmp && strlen(tmp)) { Guard g(printmutex); ndbout << tmp; } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } Loading Loading @@ -519,8 +534,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; struct event_thread_param p; p.m= &m_mgmsrv2; p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, (void**)&m_mgmsrv2, (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); Loading Loading @@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; return result; } Loading Loading @@ -920,6 +939,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); Loading
ndb/src/mgmsrv/MgmtSrvr.cpp +25 −0 Original line number Diff line number Diff line Loading @@ -1455,6 +1455,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include <ClusterMgr.hpp> void MgmtSrvr::updateStatus(NodeBitmask nodes) { theFacade->theClusterMgr->forceHB(nodes); } int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, Loading Loading @@ -1979,6 +1985,25 @@ MgmtSrvr::get_connected_nodes(NodeBitmask &connected_nodes) const } } void MgmtSrvr::get_connected_ndb_nodes(NodeBitmask &connected_nodes) const { NodeBitmask ndb_nodes; if (theFacade && theFacade->theClusterMgr) { for(Uint32 i = 0; i < MAX_NODES; i++) { if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) { ndb_nodes.set(i); const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i); connected_nodes.bitOR(node.m_state.m_connected_nodes); } } } connected_nodes.bitAND(ndb_nodes); } bool MgmtSrvr::alloc_node_id(NodeId * nodeId, enum ndb_mgm_node_type type, Loading
ndb/src/mgmsrv/MgmtSrvr.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -488,8 +488,11 @@ public: const char *get_connect_address(Uint32 node_id); void get_connected_nodes(NodeBitmask &connected_nodes) const; void get_connected_ndb_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } void updateStatus(NodeBitmask nodes); //************************************************************************** private: //************************************************************************** Loading
ndb/src/mgmsrv/Services.cpp +3 −0 Original line number Diff line number Diff line Loading @@ -982,6 +982,9 @@ printNodeStatus(OutputStream *output, MgmtSrvr &mgmsrv, enum ndb_mgm_node_type type) { NodeId nodeId = 0; NodeBitmask hbnodes; mgmsrv.get_connected_ndb_nodes(hbnodes); mgmsrv.updateStatus(hbnodes); while(mgmsrv.getNextNodeId(&nodeId, type)) { enum ndb_mgm_node_status status; Uint32 startPhase = 0, Loading
ndb/src/ndbapi/ClusterMgr.cpp +74 −4 Original line number Diff line number Diff line Loading @@ -39,6 +39,8 @@ int global_flag_send_heartbeat_now= 0; //#define DEBUG_REG // Just a C wrapper for threadMain extern "C" void* Loading Loading @@ -67,6 +69,8 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): DBUG_ENTER("ClusterMgr::ClusterMgr"); ndbSetOwnVersion(); clusterMgrThreadMutex = NdbMutex_Create(); waitForHBCond= NdbCondition_Create(); waitingForHB= false; noOfAliveNodes= 0; noOfConnectedNodes= 0; theClusterMgrThread= 0; Loading @@ -78,6 +82,7 @@ ClusterMgr::~ClusterMgr() { DBUG_ENTER("ClusterMgr::~ClusterMgr"); doStop(); NdbCondition_Destroy(waitForHBCond); NdbMutex_Destroy(clusterMgrThreadMutex); DBUG_VOID_RETURN; } Loading Loading @@ -163,6 +168,56 @@ ClusterMgr::doStop( ){ DBUG_VOID_RETURN; } void ClusterMgr::forceHB(NodeBitmask waitFor) { theFacade.lock_mutex(); if(waitingForHB) { NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); theFacade.unlock_mutex(); return; } global_flag_send_heartbeat_now= 1; waitingForHB= true; waitForHBFromNodes= waitFor; #ifdef DEBUG_REG char buf[128]; ndbout << "Waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); signal.theVerId_signalNumber = GSN_API_REGREQ; signal.theReceiversBlockNumber = QMGR; signal.theTrace = 0; signal.theLength = ApiRegReq::SignalLength; ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend()); req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId()); req->version = NDB_VERSION; int nodeId= 0; for(int i=0; NodeBitmask::NotFound!=(nodeId= waitForHBFromNodes.find(i)); i= nodeId+1) { #ifdef DEBUG_REG ndbout << "FORCE HB to " << nodeId << endl; #endif theFacade.sendSignalUnCond(&signal, nodeId); } NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000); waitingForHB= false; #ifdef DEBUG_REG ndbout << "Still waiting for HB from " << waitForHBFromNodes.getText(buf) << endl; #endif theFacade.unlock_mutex(); } void ClusterMgr::threadMain( ){ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId())); Loading Loading @@ -226,7 +281,7 @@ ClusterMgr::threadMain( ){ if (theNode.m_info.m_type == NodeInfo::REP) { signal.theReceiversBlockNumber = API_CLUSTERMGR; } #if 0 #ifdef DEBUG_REG ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId); #endif theFacade.sendSignalUnCond(&signal, nodeId); Loading Loading @@ -278,7 +333,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){ const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0]; const NodeId nodeId = refToNode(apiRegReq->ref); #if 0 #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId); #endif Loading Loading @@ -319,7 +374,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0]; const NodeId nodeId = refToNode(apiRegConf->qmgrRef); #if 0 #ifdef DEBUG_REG ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId); #endif Loading Loading @@ -351,6 +406,17 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){ if (node.m_info.m_type != NodeInfo::REP) { node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50; } if(waitingForHB) { waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) { waitingForHB= false; NdbCondition_Broadcast(waitForHBCond); } } } void Loading Loading @@ -379,6 +445,10 @@ ClusterMgr::execAPI_REGREF(const Uint32 * theData){ default: break; } waitForHBFromNodes.clear(nodeId); if(waitForHBFromNodes.isclear()) NdbCondition_Signal(waitForHBCond); } void Loading