Loading sql/log.cc +21 −0 Original line number Diff line number Diff line Loading @@ -532,6 +532,15 @@ void MYSQL_LOG::new_file() */ Rotate_log_event r(new_name+dirname_length(new_name)); r.write(&log_file); // if we have a master, record current master info in a slave // event if(glob_mi.inited) { Slave_log_event s(current_thd, &glob_mi); if(s.master_host) s.write(&log_file); } VOID(pthread_cond_broadcast(&COND_binlog_update)); } name=0; Loading Loading @@ -626,6 +635,18 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, /* Write to binary log in a format to be used for replication */ bool MYSQL_LOG::write(Slave_log_event* event_info) { bool error; if (!inited) // Can't use mutex if not init return 0; VOID(pthread_mutex_lock(&LOCK_log)); error = event_info->write(&log_file); VOID(pthread_mutex_unlock(&LOCK_log)); return error; } bool MYSQL_LOG::write(Query_log_event* event_info) { /* In most cases this is only called if 'is_open()' is true */ Loading sql/log_event.cc +144 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ #pragma implementation // gcc: Class implementation #endif #include "mysql_priv.h" #include "slave.h" #endif /* MYSQL_CLIENT */ Loading Loading @@ -154,6 +155,18 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) return l; } case SLAVE_EVENT: { Slave_log_event* l = new Slave_log_event(file, timestamp, server_id); if(log_lock) pthread_mutex_unlock(log_lock); if (!l->master_host) { delete l; l=NULL; } return l; } case ROTATE_EVENT: { Loading Loading @@ -222,6 +235,18 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) return q; } case SLAVE_EVENT: { Slave_log_event* s = new Slave_log_event(buf, event_len); if (!s->master_host) { delete s; return NULL; } return s; } case LOAD_EVENT: { Load_log_event* l = new Load_log_event(buf, event_len); Loading Loading @@ -729,4 +754,123 @@ void Load_log_event::set_fields(List<Item> &fields) } Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi): Log_event(thd_arg->start_time, 0, 1, thd_arg->server_id), mem_pool(0),master_host(0) { if(!mi->inited) return; pthread_mutex_lock(&mi->lock); master_host_len = strlen(mi->host); master_log_len = strlen(mi->log_file_name); // on OOM, just do not initialize the structure and print the error if((mem_pool = (char*)my_malloc(get_data_size() + 1, MYF(MY_WME)))) { master_host = mem_pool + sizeof(uint32) + sizeof(ulonglong) + sizeof(uint16); memcpy(master_host, mi->host, master_host_len + 1); master_log = master_host + master_host_len + 1; memcpy(master_log, mi->log_file_name, master_log_len + 1); master_port = mi->port; master_pos = mi->pos; } else sql_print_error("Out of memory while recording slave event"); pthread_mutex_unlock(&mi->lock); } #endif Slave_log_event::~Slave_log_event() { my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR)); } void Slave_log_event::print(FILE* file, bool short_form = 0, char* last_db = 0) { char llbuff[22]; if(short_form) return; print_header(file); fputc('\n', file); fprintf(file, "Slave: master_host='%s' master_port=%d \ master_log=%s master_pos=%s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } int Slave_log_event::get_data_size() { return master_host_len + master_log_len + 1 + sizeof(uint32) + sizeof(ulonglong) + sizeof(uint16); } int Slave_log_event::write_data(IO_CACHE* file) { int data_size = get_data_size(); int4store(mem_pool, data_size); int8store(mem_pool + 4, master_pos); int2store(mem_pool + 12, master_port); // log and host are already there return my_b_write(file, (byte*)mem_pool, data_size); } Slave_log_event::Slave_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg): Log_event(when,0,0,server_id),master_host(0) { char buf[4]; if(my_b_read(file, (byte*)buf, 4)) return; uint32 data_size; data_size = uint4korr(buf); if(data_size > max_allowed_packet) return; // safety if(!(mem_pool = (char*)my_malloc(data_size + 1, MYF(MY_WME)))) return; if(my_b_read(file, (byte*)mem_pool + 4, data_size - 4)) return; mem_pool[data_size] = 0; init_from_mem_pool(data_size); } void Slave_log_event::init_from_mem_pool(int data_size) { master_pos = uint8korr(mem_pool + 4); master_port = uint2korr(mem_pool + 12); master_host = mem_pool + 14; master_host_len = strlen(master_host); // safety master_log = master_host + master_host_len; if(master_log >= mem_pool + data_size) { master_host = 0; return; } master_log_len = strlen(master_log); } Slave_log_event::Slave_log_event(const char* buf, int event_len): Log_event(buf),mem_pool(0),master_host(0) { if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) return; memcpy(mem_pool, buf, event_len); mem_pool[event_len] = 0; init_from_mem_pool(event_len); } sql/log_event.h +30 −1 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, LOAD_EVENT=6}; LOAD_EVENT=6, SLAVE_EVENT=7}; enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2 }; Loading @@ -61,6 +61,8 @@ class String; extern uint32 server_id; struct st_master_info; class Log_event { public: Loading Loading @@ -172,6 +174,33 @@ class Query_log_event: public Log_event void print(FILE* file, bool short_form = 0, char* last_db = 0); }; class Slave_log_event: public Log_event { protected: char* mem_pool; void init_from_mem_pool(int data_size); public: char* master_host; int master_host_len; uint16 master_port; char* master_log; int master_log_len; ulonglong master_pos; #ifndef MYSQL_CLIENT Slave_log_event(THD* thd_arg, struct st_master_info* mi); #endif Slave_log_event(const char* buf, int event_len); Slave_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg); ~Slave_log_event(); int get_data_size(); Log_event_type get_type_code() { return SLAVE_EVENT; } void print(FILE* file, bool short_form = 0, char* last_db = 0); int write_data(IO_CACHE* file ); }; #define DUMPFILE_FLAG 0x1 #define OPT_ENCLOSED_FLAG 0x2 #define REPLACE_FLAG 0x4 Loading sql/slave.cc +20 −1 Original line number Diff line number Diff line Loading @@ -1015,6 +1015,18 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) break; } case SLAVE_EVENT: { if(mysql_bin_log.is_open()) { Slave_log_event *sev = (Slave_log_event*)ev; mysql_bin_log.write(sev); } delete ev; break; } case LOAD_EVENT: { Load_log_event* lev = (Load_log_event*)ev; Loading Loading @@ -1168,6 +1180,13 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) if(abort_slave_event_count) ++events_till_abort; #endif if(mysql_bin_log.is_open()) { mysql_bin_log.new_file(); Slave_log_event sev(slave_thd, mi); if(sev.master_host) mysql_bin_log.write(&sev); } delete ev; break; } Loading sql/sql_class.h +2 −0 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ class Query_log_event; class Load_log_event; class Slave_log_event; enum enum_enable_or_disable { LEAVE_AS_IS, ENABLE, DISABLE }; enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY }; Loading Loading @@ -83,6 +84,7 @@ class MYSQL_LOG { time_t query_start=0); bool write(Query_log_event* event_info); // binary log write bool write(Load_log_event* event_info); bool write(Slave_log_event* event_info); bool write(IO_CACHE *cache); int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); Loading Loading
sql/log.cc +21 −0 Original line number Diff line number Diff line Loading @@ -532,6 +532,15 @@ void MYSQL_LOG::new_file() */ Rotate_log_event r(new_name+dirname_length(new_name)); r.write(&log_file); // if we have a master, record current master info in a slave // event if(glob_mi.inited) { Slave_log_event s(current_thd, &glob_mi); if(s.master_host) s.write(&log_file); } VOID(pthread_cond_broadcast(&COND_binlog_update)); } name=0; Loading Loading @@ -626,6 +635,18 @@ bool MYSQL_LOG::write(THD *thd,enum enum_server_command command, /* Write to binary log in a format to be used for replication */ bool MYSQL_LOG::write(Slave_log_event* event_info) { bool error; if (!inited) // Can't use mutex if not init return 0; VOID(pthread_mutex_lock(&LOCK_log)); error = event_info->write(&log_file); VOID(pthread_mutex_unlock(&LOCK_log)); return error; } bool MYSQL_LOG::write(Query_log_event* event_info) { /* In most cases this is only called if 'is_open()' is true */ Loading
sql/log_event.cc +144 −0 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ #pragma implementation // gcc: Class implementation #endif #include "mysql_priv.h" #include "slave.h" #endif /* MYSQL_CLIENT */ Loading Loading @@ -154,6 +155,18 @@ Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock) return l; } case SLAVE_EVENT: { Slave_log_event* l = new Slave_log_event(file, timestamp, server_id); if(log_lock) pthread_mutex_unlock(log_lock); if (!l->master_host) { delete l; l=NULL; } return l; } case ROTATE_EVENT: { Loading Loading @@ -222,6 +235,18 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len) return q; } case SLAVE_EVENT: { Slave_log_event* s = new Slave_log_event(buf, event_len); if (!s->master_host) { delete s; return NULL; } return s; } case LOAD_EVENT: { Load_log_event* l = new Load_log_event(buf, event_len); Loading Loading @@ -729,4 +754,123 @@ void Load_log_event::set_fields(List<Item> &fields) } Slave_log_event::Slave_log_event(THD* thd_arg,MASTER_INFO* mi): Log_event(thd_arg->start_time, 0, 1, thd_arg->server_id), mem_pool(0),master_host(0) { if(!mi->inited) return; pthread_mutex_lock(&mi->lock); master_host_len = strlen(mi->host); master_log_len = strlen(mi->log_file_name); // on OOM, just do not initialize the structure and print the error if((mem_pool = (char*)my_malloc(get_data_size() + 1, MYF(MY_WME)))) { master_host = mem_pool + sizeof(uint32) + sizeof(ulonglong) + sizeof(uint16); memcpy(master_host, mi->host, master_host_len + 1); master_log = master_host + master_host_len + 1; memcpy(master_log, mi->log_file_name, master_log_len + 1); master_port = mi->port; master_pos = mi->pos; } else sql_print_error("Out of memory while recording slave event"); pthread_mutex_unlock(&mi->lock); } #endif Slave_log_event::~Slave_log_event() { my_free(mem_pool, MYF(MY_ALLOW_ZERO_PTR)); } void Slave_log_event::print(FILE* file, bool short_form = 0, char* last_db = 0) { char llbuff[22]; if(short_form) return; print_header(file); fputc('\n', file); fprintf(file, "Slave: master_host='%s' master_port=%d \ master_log=%s master_pos=%s\n", master_host, master_port, master_log, llstr(master_pos, llbuff)); } int Slave_log_event::get_data_size() { return master_host_len + master_log_len + 1 + sizeof(uint32) + sizeof(ulonglong) + sizeof(uint16); } int Slave_log_event::write_data(IO_CACHE* file) { int data_size = get_data_size(); int4store(mem_pool, data_size); int8store(mem_pool + 4, master_pos); int2store(mem_pool + 12, master_port); // log and host are already there return my_b_write(file, (byte*)mem_pool, data_size); } Slave_log_event::Slave_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg): Log_event(when,0,0,server_id),master_host(0) { char buf[4]; if(my_b_read(file, (byte*)buf, 4)) return; uint32 data_size; data_size = uint4korr(buf); if(data_size > max_allowed_packet) return; // safety if(!(mem_pool = (char*)my_malloc(data_size + 1, MYF(MY_WME)))) return; if(my_b_read(file, (byte*)mem_pool + 4, data_size - 4)) return; mem_pool[data_size] = 0; init_from_mem_pool(data_size); } void Slave_log_event::init_from_mem_pool(int data_size) { master_pos = uint8korr(mem_pool + 4); master_port = uint2korr(mem_pool + 12); master_host = mem_pool + 14; master_host_len = strlen(master_host); // safety master_log = master_host + master_host_len; if(master_log >= mem_pool + data_size) { master_host = 0; return; } master_log_len = strlen(master_log); } Slave_log_event::Slave_log_event(const char* buf, int event_len): Log_event(buf),mem_pool(0),master_host(0) { if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) return; memcpy(mem_pool, buf, event_len); mem_pool[event_len] = 0; init_from_mem_pool(event_len); }
sql/log_event.h +30 −1 Original line number Diff line number Diff line Loading @@ -51,7 +51,7 @@ enum Log_event_type { START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4, INTVAR_EVENT=5, LOAD_EVENT=6}; LOAD_EVENT=6, SLAVE_EVENT=7}; enum Int_event_type { INVALID_INT_EVENT = 0, LAST_INSERT_ID_EVENT = 1, INSERT_ID_EVENT = 2 }; Loading @@ -61,6 +61,8 @@ class String; extern uint32 server_id; struct st_master_info; class Log_event { public: Loading Loading @@ -172,6 +174,33 @@ class Query_log_event: public Log_event void print(FILE* file, bool short_form = 0, char* last_db = 0); }; class Slave_log_event: public Log_event { protected: char* mem_pool; void init_from_mem_pool(int data_size); public: char* master_host; int master_host_len; uint16 master_port; char* master_log; int master_log_len; ulonglong master_pos; #ifndef MYSQL_CLIENT Slave_log_event(THD* thd_arg, struct st_master_info* mi); #endif Slave_log_event(const char* buf, int event_len); Slave_log_event(IO_CACHE* file, time_t when, uint32 server_id_arg); ~Slave_log_event(); int get_data_size(); Log_event_type get_type_code() { return SLAVE_EVENT; } void print(FILE* file, bool short_form = 0, char* last_db = 0); int write_data(IO_CACHE* file ); }; #define DUMPFILE_FLAG 0x1 #define OPT_ENCLOSED_FLAG 0x2 #define REPLACE_FLAG 0x4 Loading
sql/slave.cc +20 −1 Original line number Diff line number Diff line Loading @@ -1015,6 +1015,18 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) break; } case SLAVE_EVENT: { if(mysql_bin_log.is_open()) { Slave_log_event *sev = (Slave_log_event*)ev; mysql_bin_log.write(sev); } delete ev; break; } case LOAD_EVENT: { Load_log_event* lev = (Load_log_event*)ev; Loading Loading @@ -1168,6 +1180,13 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len) if(abort_slave_event_count) ++events_till_abort; #endif if(mysql_bin_log.is_open()) { mysql_bin_log.new_file(); Slave_log_event sev(slave_thd, mi); if(sev.master_host) mysql_bin_log.write(&sev); } delete ev; break; } Loading
sql/sql_class.h +2 −0 Original line number Diff line number Diff line Loading @@ -23,6 +23,7 @@ class Query_log_event; class Load_log_event; class Slave_log_event; enum enum_enable_or_disable { LEAVE_AS_IS, ENABLE, DISABLE }; enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY }; Loading Loading @@ -83,6 +84,7 @@ class MYSQL_LOG { time_t query_start=0); bool write(Query_log_event* event_info); // binary log write bool write(Load_log_event* event_info); bool write(Slave_log_event* event_info); bool write(IO_CACHE *cache); int generate_new_name(char *new_name,const char *old_name); void make_log_name(char* buf, const char* log_ident); Loading