Loading ndb/src/mgmclient/CommandInterpreter.cpp +28 −6 Original line number Diff line number Diff line Loading @@ -455,11 +455,13 @@ static int do_event_thread; static void* event_thread_run(void* m) { DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 }; int fd = ndb_mgm_listen_event(handle, filter); if (fd > 0) if (fd != NDB_INVALID_SOCKET) { do_event_thread= 1; char *tmp= 0; Loading @@ -468,20 +470,26 @@ event_thread_run(void* m) do { if (tmp == 0) NdbSleep_MilliSleep(10); if((tmp = in.gets(buf, 1024))) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) ndbout << tmp; } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } else { do_event_thread= -1; } return NULL; DBUG_RETURN(NULL); } bool CommandInterpreter::connect() { DBUG_ENTER("CommandInterpreter::connect"); if(!m_connected) { if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) Loading Loading @@ -512,8 +520,19 @@ CommandInterpreter::connect() do_event_thread == 0 || do_event_thread == -1) { printf("Warning, event thread startup failed, degraded printouts as result\n"); DBUG_PRINT("info",("Warning, event thread startup failed, " "degraded printouts as result, errno=%d", errno)); printf("Warning, event thread startup failed, " "degraded printouts as result, errno=%d\n", errno); do_event_thread= 0; if (m_event_thread) { void *res; NdbThread_WaitFor(m_event_thread, &res); NdbThread_Destroy(&m_event_thread); } ndb_mgm_disconnect(m_mgmsrv2); } } else Loading @@ -521,6 +540,8 @@ CommandInterpreter::connect() printf("Warning, event connect failed, degraded printouts as result\n"); } m_connected= true; DBUG_PRINT("info",("Connected to Management Server at: %s:%d", host,port)); if (m_verbose) { printf("Connected to Management Server at: %s:%d\n", Loading @@ -528,12 +549,13 @@ CommandInterpreter::connect() } } } return m_connected; DBUG_RETURN(m_connected); } bool CommandInterpreter::disconnect() { DBUG_ENTER("CommandInterpreter::disconnect"); if (m_event_thread) { void *res; do_event_thread= 0; Loading @@ -550,7 +572,7 @@ CommandInterpreter::disconnect() } m_connected= false; } return true; DBUG_RETURN(true); } //***************************************************************************** Loading ndb/src/mgmsrv/MgmtSrvr.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ public: } void add_listener(const Event_listener&); void check_listeners(); void update_max_log_level(const LogLevel&); void update_log_level(const LogLevel&); Loading ndb/src/mgmsrv/Services.cpp +76 −17 Original line number Diff line number Diff line Loading @@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = { }; MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) : SocketServer::Session(sock), m_mgmsrv(mgm) { : SocketServer::Session(sock), m_mgmsrv(mgm) { DBUG_ENTER("MgmApiSession::MgmApiSession"); m_input = new SocketInputStream(sock); m_output = new SocketOutputStream(sock); m_parser = new Parser_t(commands, *m_input, true, true, true); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); DBUG_VOID_RETURN; } MgmApiSession::~MgmApiSession() { DBUG_ENTER("MgmApiSession::~MgmApiSession"); if (m_input) delete m_input; if (m_output) Loading @@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession() delete m_parser; if (m_allocated_resources) delete m_allocated_resources; if(m_socket != NDB_INVALID_SOCKET) { NDB_CLOSE_SOCKET(m_socket); m_socket= NDB_INVALID_SOCKET; } DBUG_VOID_RETURN; } void MgmApiSession::runSession() { MgmApiSession::runSession() { DBUG_ENTER("MgmApiSession::runSession"); Parser_t::Context ctx; while(!m_stop) { m_parser->run(ctx, *this); Loading Loading @@ -301,8 +314,13 @@ MgmApiSession::runSession() { break; } } if(m_socket >= 0) if(m_socket != NDB_INVALID_SOCKET) { NDB_CLOSE_SOCKET(m_socket); m_socket= NDB_INVALID_SOCKET; } DBUG_VOID_RETURN; } #ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT Loading Loading @@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) Uint32 threshold; LogLevel::EventCategory cat; Logger::LoggerLevel severity; int i; int i, n; DBUG_ENTER("Ndb_mgmd_event_service::log"); DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId)); Loading @@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) Vector<NDB_SOCKET_TYPE> copy; m_clients.lock(); for(i = m_clients.size() - 1; i >= 0; i--){ if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){ if(m_clients[i].m_socket != NDB_INVALID_SOCKET && println_socket(m_clients[i].m_socket, MAX_WRITE_TIMEOUT, m_text) == -1){ copy.push_back(m_clients[i].m_socket); for(i = m_clients.size() - 1; i >= 0; i--) { if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)) { int fd= m_clients[i].m_socket; if(fd != NDB_INVALID_SOCKET && println_socket(fd, MAX_WRITE_TIMEOUT, m_text) == -1) { copy.push_back(fd); m_clients.erase(i, false); } } } m_clients.unlock(); for(i = 0; (unsigned)i < copy.size(); i++){ if ((n= (int)copy.size())) { for(i= 0; i < n; i++) NDB_CLOSE_SOCKET(copy[i]); } if(copy.size()){ LogLevel tmp; tmp.clear(); m_clients.lock(); for(i = 0; (unsigned)i < m_clients.size(); i++){ for(i= m_clients.size() - 1; i >= 0; i--) tmp.set_max(m_clients[i].m_logLevel); } m_clients.unlock(); update_log_level(tmp); } Loading Loading @@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp) } void Ndb_mgmd_event_service::add_listener(const Event_listener& client){ Ndb_mgmd_event_service::check_listeners() { int i, n= 0; DBUG_ENTER("Ndb_mgmd_event_service::check_listeners"); m_clients.lock(); for(i= m_clients.size() - 1; i >= 0; i--) { int fd= m_clients[i].m_socket; DBUG_PRINT("info",("%d %d",i,fd)); char buf[1]; buf[0]=0; if (fd != NDB_INVALID_SOCKET && println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1) { NDB_CLOSE_SOCKET(fd); m_clients.erase(i, false); n=1; } } if (n) { LogLevel tmp; tmp.clear(); for(i= m_clients.size() - 1; i >= 0; i--) tmp.set_max(m_clients[i].m_logLevel); update_log_level(tmp); } m_clients.unlock(); DBUG_VOID_RETURN; } void Ndb_mgmd_event_service::add_listener(const Event_listener& client) { DBUG_ENTER("Ndb_mgmd_event_service::add_listener"); DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket)); check_listeners(); m_clients.push_back(client); update_max_log_level(client.m_logLevel); DBUG_VOID_RETURN; } void Loading Loading
ndb/src/mgmclient/CommandInterpreter.cpp +28 −6 Original line number Diff line number Diff line Loading @@ -455,11 +455,13 @@ static int do_event_thread; static void* event_thread_run(void* m) { DBUG_ENTER("event_thread_run"); NdbMgmHandle handle= *(NdbMgmHandle*)m; int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP, 0 }; int fd = ndb_mgm_listen_event(handle, filter); if (fd > 0) if (fd != NDB_INVALID_SOCKET) { do_event_thread= 1; char *tmp= 0; Loading @@ -468,20 +470,26 @@ event_thread_run(void* m) do { if (tmp == 0) NdbSleep_MilliSleep(10); if((tmp = in.gets(buf, 1024))) { const char ping_token[]= "<PING>"; if (memcmp(ping_token,tmp,sizeof(ping_token)-1)) ndbout << tmp; } } while(do_event_thread); NDB_CLOSE_SOCKET(fd); } else { do_event_thread= -1; } return NULL; DBUG_RETURN(NULL); } bool CommandInterpreter::connect() { DBUG_ENTER("CommandInterpreter::connect"); if(!m_connected) { if(!ndb_mgm_connect(m_mgmsrv, try_reconnect-1, 5, 1)) Loading Loading @@ -512,8 +520,19 @@ CommandInterpreter::connect() do_event_thread == 0 || do_event_thread == -1) { printf("Warning, event thread startup failed, degraded printouts as result\n"); DBUG_PRINT("info",("Warning, event thread startup failed, " "degraded printouts as result, errno=%d", errno)); printf("Warning, event thread startup failed, " "degraded printouts as result, errno=%d\n", errno); do_event_thread= 0; if (m_event_thread) { void *res; NdbThread_WaitFor(m_event_thread, &res); NdbThread_Destroy(&m_event_thread); } ndb_mgm_disconnect(m_mgmsrv2); } } else Loading @@ -521,6 +540,8 @@ CommandInterpreter::connect() printf("Warning, event connect failed, degraded printouts as result\n"); } m_connected= true; DBUG_PRINT("info",("Connected to Management Server at: %s:%d", host,port)); if (m_verbose) { printf("Connected to Management Server at: %s:%d\n", Loading @@ -528,12 +549,13 @@ CommandInterpreter::connect() } } } return m_connected; DBUG_RETURN(m_connected); } bool CommandInterpreter::disconnect() { DBUG_ENTER("CommandInterpreter::disconnect"); if (m_event_thread) { void *res; do_event_thread= 0; Loading @@ -550,7 +572,7 @@ CommandInterpreter::disconnect() } m_connected= false; } return true; DBUG_RETURN(true); } //***************************************************************************** Loading
ndb/src/mgmsrv/MgmtSrvr.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -60,6 +60,7 @@ public: } void add_listener(const Event_listener&); void check_listeners(); void update_max_log_level(const LogLevel&); void update_log_level(const LogLevel&); Loading
ndb/src/mgmsrv/Services.cpp +76 −17 Original line number Diff line number Diff line Loading @@ -253,15 +253,19 @@ ParserRow<MgmApiSession> commands[] = { }; MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) : SocketServer::Session(sock), m_mgmsrv(mgm) { : SocketServer::Session(sock), m_mgmsrv(mgm) { DBUG_ENTER("MgmApiSession::MgmApiSession"); m_input = new SocketInputStream(sock); m_output = new SocketOutputStream(sock); m_parser = new Parser_t(commands, *m_input, true, true, true); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); DBUG_VOID_RETURN; } MgmApiSession::~MgmApiSession() { DBUG_ENTER("MgmApiSession::~MgmApiSession"); if (m_input) delete m_input; if (m_output) Loading @@ -270,10 +274,19 @@ MgmApiSession::~MgmApiSession() delete m_parser; if (m_allocated_resources) delete m_allocated_resources; if(m_socket != NDB_INVALID_SOCKET) { NDB_CLOSE_SOCKET(m_socket); m_socket= NDB_INVALID_SOCKET; } DBUG_VOID_RETURN; } void MgmApiSession::runSession() { MgmApiSession::runSession() { DBUG_ENTER("MgmApiSession::runSession"); Parser_t::Context ctx; while(!m_stop) { m_parser->run(ctx, *this); Loading Loading @@ -301,8 +314,13 @@ MgmApiSession::runSession() { break; } } if(m_socket >= 0) if(m_socket != NDB_INVALID_SOCKET) { NDB_CLOSE_SOCKET(m_socket); m_socket= NDB_INVALID_SOCKET; } DBUG_VOID_RETURN; } #ifdef MGM_GET_CONFIG_BACKWARDS_COMPAT Loading Loading @@ -1236,7 +1254,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) Uint32 threshold; LogLevel::EventCategory cat; Logger::LoggerLevel severity; int i; int i, n; DBUG_ENTER("Ndb_mgmd_event_service::log"); DBUG_PRINT("enter",("eventType=%d, nodeid=%d", eventType, nodeId)); Loading @@ -1248,28 +1266,30 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) Vector<NDB_SOCKET_TYPE> copy; m_clients.lock(); for(i = m_clients.size() - 1; i >= 0; i--){ if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)){ if(m_clients[i].m_socket != NDB_INVALID_SOCKET && println_socket(m_clients[i].m_socket, MAX_WRITE_TIMEOUT, m_text) == -1){ copy.push_back(m_clients[i].m_socket); for(i = m_clients.size() - 1; i >= 0; i--) { if(threshold <= m_clients[i].m_logLevel.getLogLevel(cat)) { int fd= m_clients[i].m_socket; if(fd != NDB_INVALID_SOCKET && println_socket(fd, MAX_WRITE_TIMEOUT, m_text) == -1) { copy.push_back(fd); m_clients.erase(i, false); } } } m_clients.unlock(); for(i = 0; (unsigned)i < copy.size(); i++){ if ((n= (int)copy.size())) { for(i= 0; i < n; i++) NDB_CLOSE_SOCKET(copy[i]); } if(copy.size()){ LogLevel tmp; tmp.clear(); m_clients.lock(); for(i = 0; (unsigned)i < m_clients.size(); i++){ for(i= m_clients.size() - 1; i >= 0; i--) tmp.set_max(m_clients[i].m_logLevel); } m_clients.unlock(); update_log_level(tmp); } Loading Loading @@ -1297,9 +1317,48 @@ Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp) } void Ndb_mgmd_event_service::add_listener(const Event_listener& client){ Ndb_mgmd_event_service::check_listeners() { int i, n= 0; DBUG_ENTER("Ndb_mgmd_event_service::check_listeners"); m_clients.lock(); for(i= m_clients.size() - 1; i >= 0; i--) { int fd= m_clients[i].m_socket; DBUG_PRINT("info",("%d %d",i,fd)); char buf[1]; buf[0]=0; if (fd != NDB_INVALID_SOCKET && println_socket(fd,MAX_WRITE_TIMEOUT,"<PING>") == -1) { NDB_CLOSE_SOCKET(fd); m_clients.erase(i, false); n=1; } } if (n) { LogLevel tmp; tmp.clear(); for(i= m_clients.size() - 1; i >= 0; i--) tmp.set_max(m_clients[i].m_logLevel); update_log_level(tmp); } m_clients.unlock(); DBUG_VOID_RETURN; } void Ndb_mgmd_event_service::add_listener(const Event_listener& client) { DBUG_ENTER("Ndb_mgmd_event_service::add_listener"); DBUG_PRINT("enter",("client.m_socket: %d", client.m_socket)); check_listeners(); m_clients.push_back(client); update_max_log_level(client.m_logLevel); DBUG_VOID_RETURN; } void Loading