Loading sql/mysql_priv.h +1 −0 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ char *sql_strdup(const char *str); char *sql_strmake(const char *str,uint len); gptr sql_memdup(const void * ptr,unsigned size); void sql_element_free(void *ptr); void kill_one_thread(THD *thd, ulong id); #define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); } #define safeFree(x) { if(x) { my_free((gptr) x,MYF(0)); x = NULL; } } Loading sql/slave.cc +31 −7 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ MASTER_INFO glob_mi; extern bool opt_log_slave_updates ; static inline void skip_load_data_infile(NET* net); static inline bool slave_killed(THD* thd); static int init_slave_thread(THD* thd); int init_master_info(MASTER_INFO* mi); Loading @@ -53,6 +54,15 @@ static inline bool slave_killed(THD* thd) return abort_slave || abort_loop || thd->killed; } static inline void skip_load_data_infile(NET* net) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); (void)my_net_read(net); // discard response send_ok(net); // the master expects it } int db_ok(const char* db, I_List<i_string> &do_list, I_List<i_string> &ignore_list ) { Loading Loading @@ -553,9 +563,26 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) { Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, event_len); if (ev) { switch(ev->get_type_code()) int type_code = ev->get_type_code(); if(ev->server_id == ::server_id) { if(type_code == LOAD_EVENT) skip_load_data_infile(net); mi->inc_pos(event_len); flush_master_info(mi); delete ev; return 0; // avoid infinite update loops } thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query ev->when = time(NULL); switch(type_code) { case QUERY_EVENT: { Loading Loading @@ -706,10 +733,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) else // we will just ask the master to send us /dev/null if we do not want to // load the data :-) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); (void)my_net_read(net); // discard response send_ok(net); // the master expects it skip_load_data_infile(net); } thd->net.vio = 0; Loading Loading @@ -799,14 +823,14 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) if(!server_id) { sql_print_error("Server id not set, will not start slave"); pthread_exit(1); pthread_exit((void*)1); } pthread_mutex_lock(&LOCK_slave); if(slave_running) { pthread_mutex_unlock(&LOCK_slave); pthread_exit(1); // safety just in case pthread_exit((void*)1); // safety just in case } slave_running = 1; abort_slave = 0; Loading sql/sql_parse.cc +4 −2 Original line number Diff line number Diff line Loading @@ -39,7 +39,6 @@ static bool check_dup(THD *thd,const char *db,const char *name, TABLE_LIST *tables); static void mysql_init_query(THD *thd); static void remove_escape(char *name); static void kill_one_thread(THD *thd, ulong thread); static void refresh_status(void); const char *any_db="*any*"; // Special symbol for check_access Loading Loading @@ -712,6 +711,9 @@ bool do_command(THD *thd) thd->server_id = slave_server_id; pthread_mutex_unlock(&LOCK_server_id); mysql_binlog_send(thd, strdup(packet + 11), pos, flags); // fake COM_QUIT -- if we get here, the thread needs to terminate error = TRUE; net->error = 0; break; } case COM_REFRESH: Loading Loading @@ -2451,7 +2453,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables) } static void kill_one_thread(THD *thd, ulong id) void kill_one_thread(THD *thd, ulong id) { VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list I_List_iterator<THD> it(threads); Loading sql/sql_repl.cc +33 −2 Original line number Diff line number Diff line Loading @@ -225,6 +225,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) if(read_packet) { thd->proc_info = "sending update to slave"; if(my_net_write(net, (char*)packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; Loading Loading @@ -257,7 +258,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) else { bool loop_breaker = 0; // need this to break out of the for loop from switch thd->proc_info = "switching to next log"; switch(mysql_bin_log.find_next_log(&linfo)) { case LOG_INFO_EOF: Loading Loading @@ -309,8 +310,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) (void)my_fclose(log, MYF(MY_WME)); send_eof(&thd->net); thd->proc_info = "waiting to finalize termination"; DBUG_VOID_RETURN; err: thd->proc_info = "waiting to finalize termination"; if(log) (void) my_fclose(log, MYF(MY_WME)); send_error(&thd->net, 0, errmsg); Loading Loading @@ -408,6 +411,34 @@ void reset_slave() void kill_zombie_dump_threads(uint32 slave_server_id) { pthread_mutex_lock(&LOCK_thread_count); I_List_iterator<THD> it(threads); THD *tmp; while((tmp=it++)) { if(tmp->command == COM_BINLOG_DUMP && tmp->server_id == slave_server_id) { // here we do not call kill_one_thread() // it will be slow because it will iterate through the list // again. Plus it double-locks LOCK_thread_count, which // make safe_mutex complain and abort // so we just to our own thread murder thr_alarm_kill(tmp->real_id); tmp->killed = 1; pthread_mutex_lock(&tmp->mysys_var->mutex); tmp->mysys_var->abort = 1; if(tmp->mysys_var->current_mutex) { pthread_mutex_lock(tmp->mysys_var->current_mutex); pthread_cond_broadcast(tmp->mysys_var->current_cond); pthread_mutex_unlock(tmp->mysys_var->current_mutex); } pthread_mutex_unlock(&tmp->mysys_var->mutex); } } pthread_mutex_unlock(&LOCK_thread_count); } Loading Loading
sql/mysql_priv.h +1 −0 Original line number Diff line number Diff line Loading @@ -41,6 +41,7 @@ char *sql_strdup(const char *str); char *sql_strmake(const char *str,uint len); gptr sql_memdup(const void * ptr,unsigned size); void sql_element_free(void *ptr); void kill_one_thread(THD *thd, ulong id); #define x_free(A) { my_free((gptr) (A),MYF(MY_WME | MY_FAE | MY_ALLOW_ZERO_PTR)); } #define safeFree(x) { if(x) { my_free((gptr) x,MYF(0)); x = NULL; } } Loading
sql/slave.cc +31 −7 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ MASTER_INFO glob_mi; extern bool opt_log_slave_updates ; static inline void skip_load_data_infile(NET* net); static inline bool slave_killed(THD* thd); static int init_slave_thread(THD* thd); int init_master_info(MASTER_INFO* mi); Loading @@ -53,6 +54,15 @@ static inline bool slave_killed(THD* thd) return abort_slave || abort_loop || thd->killed; } static inline void skip_load_data_infile(NET* net) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); (void)my_net_read(net); // discard response send_ok(net); // the master expects it } int db_ok(const char* db, I_List<i_string> &do_list, I_List<i_string> &ignore_list ) { Loading Loading @@ -553,9 +563,26 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) { Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1, event_len); if (ev) { switch(ev->get_type_code()) int type_code = ev->get_type_code(); if(ev->server_id == ::server_id) { if(type_code == LOAD_EVENT) skip_load_data_infile(net); mi->inc_pos(event_len); flush_master_info(mi); delete ev; return 0; // avoid infinite update loops } thd->server_id = ev->server_id; // use the original server id for logging thd->set_time(); // time the query ev->when = time(NULL); switch(type_code) { case QUERY_EVENT: { Loading Loading @@ -706,10 +733,7 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) else // we will just ask the master to send us /dev/null if we do not want to // load the data :-) { (void)my_net_write(net, "\xfb/dev/null", 10); (void)net_flush(net); (void)my_net_read(net); // discard response send_ok(net); // the master expects it skip_load_data_infile(net); } thd->net.vio = 0; Loading Loading @@ -799,14 +823,14 @@ pthread_handler_decl(handle_slave,arg __attribute__((unused))) if(!server_id) { sql_print_error("Server id not set, will not start slave"); pthread_exit(1); pthread_exit((void*)1); } pthread_mutex_lock(&LOCK_slave); if(slave_running) { pthread_mutex_unlock(&LOCK_slave); pthread_exit(1); // safety just in case pthread_exit((void*)1); // safety just in case } slave_running = 1; abort_slave = 0; Loading
sql/sql_parse.cc +4 −2 Original line number Diff line number Diff line Loading @@ -39,7 +39,6 @@ static bool check_dup(THD *thd,const char *db,const char *name, TABLE_LIST *tables); static void mysql_init_query(THD *thd); static void remove_escape(char *name); static void kill_one_thread(THD *thd, ulong thread); static void refresh_status(void); const char *any_db="*any*"; // Special symbol for check_access Loading Loading @@ -712,6 +711,9 @@ bool do_command(THD *thd) thd->server_id = slave_server_id; pthread_mutex_unlock(&LOCK_server_id); mysql_binlog_send(thd, strdup(packet + 11), pos, flags); // fake COM_QUIT -- if we get here, the thread needs to terminate error = TRUE; net->error = 0; break; } case COM_REFRESH: Loading Loading @@ -2451,7 +2453,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables) } static void kill_one_thread(THD *thd, ulong id) void kill_one_thread(THD *thd, ulong id) { VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list I_List_iterator<THD> it(threads); Loading
sql/sql_repl.cc +33 −2 Original line number Diff line number Diff line Loading @@ -225,6 +225,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) if(read_packet) { thd->proc_info = "sending update to slave"; if(my_net_write(net, (char*)packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; Loading Loading @@ -257,7 +258,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) else { bool loop_breaker = 0; // need this to break out of the for loop from switch thd->proc_info = "switching to next log"; switch(mysql_bin_log.find_next_log(&linfo)) { case LOG_INFO_EOF: Loading Loading @@ -309,8 +310,10 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) (void)my_fclose(log, MYF(MY_WME)); send_eof(&thd->net); thd->proc_info = "waiting to finalize termination"; DBUG_VOID_RETURN; err: thd->proc_info = "waiting to finalize termination"; if(log) (void) my_fclose(log, MYF(MY_WME)); send_error(&thd->net, 0, errmsg); Loading Loading @@ -408,6 +411,34 @@ void reset_slave() void kill_zombie_dump_threads(uint32 slave_server_id) { pthread_mutex_lock(&LOCK_thread_count); I_List_iterator<THD> it(threads); THD *tmp; while((tmp=it++)) { if(tmp->command == COM_BINLOG_DUMP && tmp->server_id == slave_server_id) { // here we do not call kill_one_thread() // it will be slow because it will iterate through the list // again. Plus it double-locks LOCK_thread_count, which // make safe_mutex complain and abort // so we just to our own thread murder thr_alarm_kill(tmp->real_id); tmp->killed = 1; pthread_mutex_lock(&tmp->mysys_var->mutex); tmp->mysys_var->abort = 1; if(tmp->mysys_var->current_mutex) { pthread_mutex_lock(tmp->mysys_var->current_mutex); pthread_cond_broadcast(tmp->mysys_var->current_cond); pthread_mutex_unlock(tmp->mysys_var->current_mutex); } pthread_mutex_unlock(&tmp->mysys_var->mutex); } } pthread_mutex_unlock(&LOCK_thread_count); } Loading