Loading ndb/src/kernel/blocks/backup/Backup.cpp +41 −23 Original line number Diff line number Diff line Loading @@ -274,8 +274,18 @@ Backup::execCONTINUEB(Signal* signal) BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptr_I); if (tabPtr_I == RNIL) { closeFiles(signal, ptr); return; } jam(); TablePtr tabPtr; ptr.p->tables.getPtr(tabPtr, tabPtr_I); jam(); if(tabPtr.p->fragments.getSize()) { FragmentPtr fragPtr; tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I); Loading Loading @@ -304,6 +314,8 @@ Backup::execCONTINUEB(Signal* signal) filePtr.p->operation.dataBuffer.updateWritePtr(sz); fragPtr_I++; } if (fragPtr_I == tabPtr.p->fragments.getSize()) { signal->theData[0] = tabPtr.p->tableId; Loading Loading @@ -4243,6 +4255,12 @@ Backup::execSTOP_BACKUP_REQ(Signal* signal) TablePtr tabPtr; ptr.p->tables.first(tabPtr); if (tabPtr.i == RNIL) { closeFiles(signal, ptr); return; } signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO; signal->theData[1] = ptr.i; signal->theData[2] = tabPtr.i; Loading ndb/src/mgmapi/mgmapi.cpp +6 −5 Original line number Diff line number Diff line Loading @@ -1424,6 +1424,7 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[], close(sockfd); CHECK_REPLY(reply, -1); } delete reply; return sockfd; } Loading ndb/src/mgmclient/CommandInterpreter.cpp +27 −4 Original line number Diff line number Diff line Loading @@ -173,8 +173,15 @@ class CommandInterpreter { bool rep_connected; #endif struct NdbThread* m_event_thread; NdbMutex *m_print_mutex; }; struct event_thread_param { NdbMgmHandle *m; NdbMutex **p; }; NdbMutex* print_mutex; /* * Facade object for CommandInterpreter Loading Loading @@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; m_print_mutex= NdbMutex_Create(); #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; Loading @@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); NdbMutex_Destroy(m_print_mutex); } static bool Loading Loading @@ -444,11 +453,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* event_thread_run(void* m) event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; struct event_thread_param param= *(struct event_thread_param*)p; NdbMgmHandle handle= *(param.m); NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, Loading @@ -466,8 +477,12 @@ event_thread_run(void* m) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) if(tmp && strlen(tmp)) { Guard g(printmutex); ndbout << tmp; } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } Loading Loading @@ -519,8 +534,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; struct event_thread_param p; p.m= &m_mgmsrv2; p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, (void**)&m_mgmsrv2, (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); Loading Loading @@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; return result; } Loading Loading @@ -686,6 +705,7 @@ CommandInterpreter::execute_impl(const char *_line) DBUG_RETURN(true); if (strcasecmp(firstToken, "SHOW") == 0) { Guard g(m_print_mutex); executeShow(allAfterFirstToken); DBUG_RETURN(true); } Loading Loading @@ -920,6 +940,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); Loading Loading @@ -1224,6 +1245,7 @@ CommandInterpreter::executeShow(char* parameters) if(it == 0){ ndbout_c("Unable to create config iterator"); ndb_mgm_destroy_configuration(conf); return; } NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it); Loading Loading @@ -1270,6 +1292,7 @@ CommandInterpreter::executeShow(char* parameters) print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0); print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0); // ndbout << helpTextShow; ndb_mgm_destroy_configuration(conf); return; } else if (strcasecmp(parameters, "PROPERTIES") == 0 || strcasecmp(parameters, "PROP") == 0) { Loading ndb/src/mgmsrv/MgmtSrvr.cpp +8 −3 Original line number Diff line number Diff line Loading @@ -77,7 +77,6 @@ }\ } extern int global_flag_send_heartbeat_now; extern int g_no_nodeid_checks; extern my_bool opt_core; Loading Loading @@ -1455,6 +1454,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include <ClusterMgr.hpp> void MgmtSrvr::updateStatus() { theFacade->theClusterMgr->forceHB(); } int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, Loading Loading @@ -2153,7 +2158,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, if (found_matching_type && !found_free_node) { // we have a temporary error which might be due to that // we have got the latest connect status from db-nodes. Force update. global_flag_send_heartbeat_now= 1; updateStatus(); } BaseString type_string, type_c_string; Loading Loading @@ -2507,7 +2512,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() if (!m_reserved_nodes.isclear()) { m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); // node has been reserved, force update signal to ndb nodes global_flag_send_heartbeat_now= 1; m_mgmsrv.updateStatus(); char tmp_str[128]; m_mgmsrv.m_reserved_nodes.getText(tmp_str); Loading ndb/src/mgmsrv/MgmtSrvr.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -490,6 +490,8 @@ public: void get_connected_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } void updateStatus(); //************************************************************************** private: //************************************************************************** Loading Loading
ndb/src/kernel/blocks/backup/Backup.cpp +41 −23 Original line number Diff line number Diff line Loading @@ -274,8 +274,18 @@ Backup::execCONTINUEB(Signal* signal) BackupRecordPtr ptr; c_backupPool.getPtr(ptr, ptr_I); if (tabPtr_I == RNIL) { closeFiles(signal, ptr); return; } jam(); TablePtr tabPtr; ptr.p->tables.getPtr(tabPtr, tabPtr_I); jam(); if(tabPtr.p->fragments.getSize()) { FragmentPtr fragPtr; tabPtr.p->fragments.getPtr(fragPtr, fragPtr_I); Loading Loading @@ -304,6 +314,8 @@ Backup::execCONTINUEB(Signal* signal) filePtr.p->operation.dataBuffer.updateWritePtr(sz); fragPtr_I++; } if (fragPtr_I == tabPtr.p->fragments.getSize()) { signal->theData[0] = tabPtr.p->tableId; Loading Loading @@ -4243,6 +4255,12 @@ Backup::execSTOP_BACKUP_REQ(Signal* signal) TablePtr tabPtr; ptr.p->tables.first(tabPtr); if (tabPtr.i == RNIL) { closeFiles(signal, ptr); return; } signal->theData[0] = BackupContinueB::BACKUP_FRAGMENT_INFO; signal->theData[1] = ptr.i; signal->theData[2] = tabPtr.i; Loading
ndb/src/mgmapi/mgmapi.cpp +6 −5 Original line number Diff line number Diff line Loading @@ -1424,6 +1424,7 @@ ndb_mgm_listen_event_internal(NdbMgmHandle handle, const int filter[], close(sockfd); CHECK_REPLY(reply, -1); } delete reply; return sockfd; } Loading
ndb/src/mgmclient/CommandInterpreter.cpp +27 −4 Original line number Diff line number Diff line Loading @@ -173,8 +173,15 @@ class CommandInterpreter { bool rep_connected; #endif struct NdbThread* m_event_thread; NdbMutex *m_print_mutex; }; struct event_thread_param { NdbMgmHandle *m; NdbMutex **p; }; NdbMutex* print_mutex; /* * Facade object for CommandInterpreter Loading Loading @@ -395,6 +402,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) m_connected= false; m_event_thread= 0; try_reconnect = 0; m_print_mutex= NdbMutex_Create(); #ifdef HAVE_GLOBAL_REPLICATION rep_host = NULL; m_repserver = NULL; Loading @@ -408,6 +416,7 @@ CommandInterpreter::CommandInterpreter(const char *_host,int verbose) CommandInterpreter::~CommandInterpreter() { disconnect(); NdbMutex_Destroy(m_print_mutex); } static bool Loading Loading @@ -444,11 +453,13 @@ CommandInterpreter::printError() static int do_event_thread; static void* event_thread_run(void* m) event_thread_run(void* p) { DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; struct event_thread_param param= *(struct event_thread_param*)p; NdbMgmHandle handle= *(param.m); NdbMutex* printmutex= *(param.p); int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 1, NDB_MGM_EVENT_CATEGORY_STARTUP, Loading @@ -466,8 +477,12 @@ event_thread_run(void* m) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) if(tmp && strlen(tmp)) { Guard g(printmutex); ndbout << tmp; } } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } Loading Loading @@ -519,8 +534,11 @@ CommandInterpreter::connect() assert(m_event_thread == 0); assert(do_event_thread == 0); do_event_thread= 0; struct event_thread_param p; p.m= &m_mgmsrv2; p.p= &m_print_mutex; m_event_thread = NdbThread_Create(event_thread_run, (void**)&m_mgmsrv2, (void**)&p, 32768, "CommandInterpreted_event_thread", NDB_THREAD_PRIO_LOW); Loading Loading @@ -607,6 +625,7 @@ CommandInterpreter::execute(const char *_line, int _try_reconnect, int result= execute_impl(_line); if (error) *error= m_error; return result; } Loading Loading @@ -686,6 +705,7 @@ CommandInterpreter::execute_impl(const char *_line) DBUG_RETURN(true); if (strcasecmp(firstToken, "SHOW") == 0) { Guard g(m_print_mutex); executeShow(allAfterFirstToken); DBUG_RETURN(true); } Loading Loading @@ -920,6 +940,7 @@ CommandInterpreter::executeForAll(const char * cmd, ExecuteFunction fun, ndbout_c("Trying to start all nodes of system."); ndbout_c("Use ALL STATUS to see the system start-up phases."); } else { Guard g(m_print_mutex); struct ndb_mgm_cluster_state *cl= ndb_mgm_get_status(m_mgmsrv); if(cl == 0){ ndbout_c("Unable get status from management server"); Loading Loading @@ -1224,6 +1245,7 @@ CommandInterpreter::executeShow(char* parameters) if(it == 0){ ndbout_c("Unable to create config iterator"); ndb_mgm_destroy_configuration(conf); return; } NdbAutoPtr<ndb_mgm_configuration_iterator> ptr(it); Loading Loading @@ -1270,6 +1292,7 @@ CommandInterpreter::executeShow(char* parameters) print_nodes(state, it, "ndb_mgmd", mgm_nodes, NDB_MGM_NODE_TYPE_MGM, 0); print_nodes(state, it, "mysqld", api_nodes, NDB_MGM_NODE_TYPE_API, 0); // ndbout << helpTextShow; ndb_mgm_destroy_configuration(conf); return; } else if (strcasecmp(parameters, "PROPERTIES") == 0 || strcasecmp(parameters, "PROP") == 0) { Loading
ndb/src/mgmsrv/MgmtSrvr.cpp +8 −3 Original line number Diff line number Diff line Loading @@ -77,7 +77,6 @@ }\ } extern int global_flag_send_heartbeat_now; extern int g_no_nodeid_checks; extern my_bool opt_core; Loading Loading @@ -1455,6 +1454,12 @@ MgmtSrvr::exitSingleUser(int * stopCount, bool abort) #include <ClusterMgr.hpp> void MgmtSrvr::updateStatus() { theFacade->theClusterMgr->forceHB(); } int MgmtSrvr::status(int nodeId, ndb_mgm_node_status * _status, Loading Loading @@ -2153,7 +2158,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, if (found_matching_type && !found_free_node) { // we have a temporary error which might be due to that // we have got the latest connect status from db-nodes. Force update. global_flag_send_heartbeat_now= 1; updateStatus(); } BaseString type_string, type_c_string; Loading Loading @@ -2507,7 +2512,7 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() if (!m_reserved_nodes.isclear()) { m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes); // node has been reserved, force update signal to ndb nodes global_flag_send_heartbeat_now= 1; m_mgmsrv.updateStatus(); char tmp_str[128]; m_mgmsrv.m_reserved_nodes.getText(tmp_str); Loading
ndb/src/mgmsrv/MgmtSrvr.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -490,6 +490,8 @@ public: void get_connected_nodes(NodeBitmask &connected_nodes) const; SocketServer *get_socket_server() { return m_socket_server; } void updateStatus(); //************************************************************************** private: //************************************************************************** Loading