Commit d9ca0909 authored by sasha@mysql.sashanet.com's avatar sasha@mysql.sashanet.com
Browse files

Merge work:/home/bk/mysql

into mysql.sashanet.com:/home/sasha/src/bk/mysql
parents f3382efd 204ae847
Loading
Loading
Loading
Loading
+60 −16
Original line number Diff line number Diff line
@@ -71,23 +71,39 @@ int Log_event::write_header(FILE* file)

#ifndef MYSQL_CLIENT

int Log_event::read_log_event(FILE* file, String* packet)
int Log_event::read_log_event(FILE* file, String* packet,
			      pthread_mutex_t* log_lock)
{
  ulong data_len;
  char buf[LOG_EVENT_HEADER_LEN];
  if(log_lock)
    pthread_mutex_lock(log_lock);
  if (my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP)))
    {
      if(log_lock) pthread_mutex_unlock(log_lock);
      return feof(file) ? LOG_READ_EOF: LOG_READ_IO;
  
    }
  data_len = uint4korr(buf + EVENT_LEN_OFFSET);
  if (data_len < LOG_EVENT_HEADER_LEN || data_len > MAX_EVENT_LEN)
    {
      if(log_lock) pthread_mutex_unlock(log_lock);
      return LOG_READ_BOGUS;

    }
  packet->append(buf, sizeof(buf));
  data_len -= LOG_EVENT_HEADER_LEN;
  if (!data_len)
    {
      if(log_lock) pthread_mutex_unlock(log_lock);
      return 0; // the event does not have a data section
    }
  if (packet->append(file, data_len, MYF(MY_WME|MY_NABP)))
    {
      if(log_lock)
	pthread_mutex_unlock(log_lock);
      return feof(file) ? LOG_READ_BOGUS: LOG_READ_IO;
    }

  if(log_lock) pthread_mutex_unlock(log_lock);
  return 0;
}

@@ -95,14 +111,18 @@ int Log_event::read_log_event(FILE* file, String* packet)

// allocates memory - the caller is responsible for clean-up

Log_event* Log_event::read_log_event(FILE* file)
Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
{
  time_t timestamp;
  uint32 server_id;
  
  char buf[LOG_EVENT_HEADER_LEN-4];
  if(log_lock) pthread_mutex_lock(log_lock);
  if (my_fread(file, (byte *) buf, sizeof(buf), MY_NABP))
    {
      if(log_lock) pthread_mutex_unlock(log_lock);
      return NULL;
    }
  timestamp = uint4korr(buf);
  server_id = uint4korr(buf + 5);
  
@@ -111,6 +131,8 @@ Log_event* Log_event::read_log_event(FILE* file)
  case QUERY_EVENT:
  {
    Query_log_event* q = new Query_log_event(file, timestamp, server_id);
    if(log_lock) pthread_mutex_unlock(log_lock);
    
    if (!q->query)
    {
      delete q;
@@ -123,6 +145,8 @@ Log_event* Log_event::read_log_event(FILE* file)
  case LOAD_EVENT:
  {
    Load_log_event* l = new Load_log_event(file, timestamp, server_id);
    if(log_lock) pthread_mutex_unlock(log_lock);
    
    if (!l->table_name)
    {
      delete l;
@@ -136,6 +160,8 @@ Log_event* Log_event::read_log_event(FILE* file)
  case ROTATE_EVENT:
  {
    Rotate_log_event* r = new Rotate_log_event(file, timestamp, server_id);
    if(log_lock) pthread_mutex_unlock(log_lock);
    
    if (!r->new_log_ident)
    {
      delete r;
@@ -148,6 +174,8 @@ Log_event* Log_event::read_log_event(FILE* file)
  case INTVAR_EVENT:
  {
    Intvar_log_event* e = new Intvar_log_event(file, timestamp, server_id);
    if(log_lock) pthread_mutex_unlock(log_lock);
    
    if (e->type == INVALID_INT_EVENT)
    {
      delete e;
@@ -157,12 +185,25 @@ Log_event* Log_event::read_log_event(FILE* file)
    return e;
  }
  
  case START_EVENT: return new Start_log_event(file, timestamp, server_id);
  case STOP_EVENT: return new Stop_log_event(file, timestamp, server_id);
  default: return NULL;
  case START_EVENT:
    {
      Start_log_event* e = new Start_log_event(file, timestamp, server_id);
      if(log_lock) pthread_mutex_unlock(log_lock);
      return e;
    }	  
  case STOP_EVENT:
    {
      Stop_log_event* e = new Stop_log_event(file, timestamp, server_id);
      if(log_lock) pthread_mutex_unlock(log_lock);
      return e;
    }
  default:
    if(log_lock) pthread_mutex_unlock(log_lock);
    return NULL;
  }

  //impossible
  if(log_lock) pthread_mutex_unlock(log_lock);
  return NULL;
}

@@ -356,6 +397,7 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg,
  data_len -= QUERY_EVENT_OVERHEAD;
  exec_time = uint4korr(buf + 8);
  db_len = (uint)buf[12];
  error_code = uint2korr(buf + 13);
  
  if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
    return;
@@ -384,11 +426,12 @@ Query_log_event::Query_log_event(const char* buf, int max_buf):

  data_len -= QUERY_EVENT_OVERHEAD;
  exec_time = uint4korr(buf + 8);
  error_code = uint2korr(buf + 13);

  if (!(data_buf = (char*) my_malloc( data_len + 1, MYF(MY_WME))))
    return;

  memcpy(data_buf, buf + 13, data_len);
  memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len);
  thread_id = uint4korr(buf + 4);
  db = data_buf;
  db_len = (uint)buf[12];
@@ -402,8 +445,8 @@ void Query_log_event::print(FILE* file, bool short_form)
  if (!short_form)
  {
    print_header(file);
    fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\n",
	    (ulong) thread_id, (ulong) exec_time);
    fprintf(file, "\tQuery\tthread_id=%lu\texec_time=%lu\terror_code=%d\n",
	    (ulong) thread_id, (ulong) exec_time, error_code);
  }

  if(db && db[0])
@@ -423,7 +466,8 @@ int Query_log_event::write_data(FILE* file)
  int4store(pos, exec_time);
  pos += 4;
  *pos++ = (char)db_len;
    
  int2store(pos, error_code);
  pos += 2;

  if (my_fwrite(file, (byte*) buf, (uint)(pos - buf), MYF(MY_NABP | MY_WME)) ||
      my_fwrite(file, (db) ? (byte*) db : (byte*)"",
+9 −3
Original line number Diff line number Diff line
@@ -31,7 +31,8 @@
#define BINLOG_VERSION    1

#define LOG_EVENT_HEADER_LEN 13
#define QUERY_HEADER_LEN     (sizeof(uint32) + sizeof(uint32) + sizeof(uchar))
#define QUERY_HEADER_LEN     (sizeof(uint32) + sizeof(uint32) + \
 sizeof(uchar) + sizeof(uint16))
#define LOAD_HEADER_LEN      (sizeof(uint32) + sizeof(uint32) + \
  + sizeof(uint32) + 2 + sizeof(uint32))
#define EVENT_LEN_OFFSET     9
@@ -88,11 +89,13 @@ class Log_event
  void print_timestamp(FILE* file, time_t *ts = 0);
  void print_header(FILE* file);

  static Log_event* read_log_event(FILE* file);
  // if mutex is 0, the read will proceed without mutex
  static Log_event* read_log_event(FILE* file, pthread_mutex_t* log_lock);
  static Log_event* read_log_event(const char* buf, int max_buf);

#ifndef MYSQL_CLIENT
  static int read_log_event(FILE* file, String* packet);
  static int read_log_event(FILE* file, String* packet,
			    pthread_mutex_t* log_lock);
#endif
  
};
@@ -109,12 +112,14 @@ class Query_log_event: public Log_event
  // we pass it here, so we would not have to call strlen()
  // otherwise, set it to 0, in which case, we compute it with strlen()
  uint32 db_len;
  uint16 error_code;
  int thread_id;
#if !defined(MYSQL_CLIENT)
  THD* thd;
  Query_log_event(THD* thd_arg, const char* query_arg):
    Log_event(thd_arg->start_time,0,0,thd_arg->server_id), data_buf(0),
    query(query_arg),  db(thd_arg->db), q_len(thd_arg->query_length),
    error_code(thd_arg->net.last_errno),
    thread_id(thd_arg->thread_id), thd(thd_arg)
  {
    time_t end_time;
@@ -142,6 +147,7 @@ class Query_log_event: public Log_event
    return q_len + db_len + 2 +
      sizeof(uint32) // thread_id
      + sizeof(uint32) // exec_time
      + sizeof(uint16) // error_code
      ;
  }

+4 −2
Original line number Diff line number Diff line
@@ -284,7 +284,9 @@ static void dump_remote_log_entries(const char* logname)
      break; // end of data
    DBUG_PRINT("info",( "len= %u, net->read_pos[5] = %d\n",
			len, net->read_pos[5]));
    Log_event * ev = Log_event::read_log_event((const char*) net->read_pos + 1 , len);
    Log_event * ev = Log_event::read_log_event(
					  (const char*) net->read_pos + 1 ,
					  len);
    if(ev)
    {
      ev->print(stdout, short_form);
@@ -315,7 +317,7 @@ static void dump_local_log_entries(const char* logname)
 
 while(1)
   {
     Log_event* ev = Log_event::read_log_event(file);
     Log_event* ev = Log_event::read_log_event(file, 0);
     if(!ev)
       if(!feof(file))
	die("Could not read entry at offset %ld : Error in log format or \
+15 −28
Original line number Diff line number Diff line
@@ -599,36 +599,23 @@ static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
	thd->query_id = query_id++;
	VOID(pthread_mutex_unlock(&LOCK_thread_count));
	thd->last_nx_table = thd->last_nx_db = 0;
	for(;;)
	{
	thd->query_error = 0; // clear error
	  thd->last_nx_table = thd->last_nx_db  = 0;
	thd->net.last_errno = 0;
	thd->net.last_error[0] = 0;
	  mysql_parse(thd, thd->query, q_len); // try query
	  if(!thd->query_error || slave_killed(thd)) // break if ok
	    break;
	  // if not ok
	  if(thd->last_nx_table && thd->last_nx_db)
	  {
	    // for now, let's just fail if the table is not
	    // there, and not try to be a smart alec...
			
	    // if table was not there
	    //if(fetch_nx_table(thd,&glob_mi))
	    // try to to fetch from master
	    break;  // if we can't, just break
	  }
	  else
	    break; // if failed for some other reason, bail out

	  // if fetched the table from master successfully
	  // we need to restore query info in thd because
	  // fetch_nx_table executes create table
	  thd->query = (char*)qev->query;
	  thd->set_time((time_t)qev->when);
	  thd->current_tablenr = 0;
	mysql_parse(thd, thd->query, q_len);
	int expected_error,actual_error;
	if((expected_error = qev->error_code) !=
	   (actual_error = thd->net.last_errno) && expected_error)
	  {
	    sql_print_error("Slave: did not get the expected error\
 running query from master - expected: '%s', got '%s'",
			    ER(expected_error),
			    actual_error ? ER(actual_error):"no error"
			    );
	    thd->query_error = 1;
	  }
	else if(expected_error == actual_error)
	  thd->query_error = 0;
      }
      thd->db = 0;// prevent db from being freed
      thd->query = 0; // just to be sure
+15 −12
Original line number Diff line number Diff line
@@ -121,7 +121,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
      errmsg = "Could not find first log";
      goto err;
    }
  log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME));
  log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));

  if(!log)
    {
@@ -143,14 +143,17 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)

  while(!net->error && net->vio != 0 && !thd->killed)
    {
      while(!(error = Log_event::read_log_event(log, packet)))
      pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
      
      while(!(error = Log_event::read_log_event(log, packet, log_lock)))
	{
	  if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
	    {
	      errmsg = "Failed on my_net_write()";
	      goto err;
	    }
	  DBUG_PRINT("info", ("log event code %d",(*packet)[LOG_EVENT_OFFSET+1] ));
	  DBUG_PRINT("info", ("log event code %d",
			      (*packet)[LOG_EVENT_OFFSET+1] ));
	  if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
	    {
	      if(send_file(thd))
@@ -168,7 +171,8 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
	  goto err;
	}

      if(!(flags & BINLOG_DUMP_NON_BLOCK) &&  mysql_bin_log.is_active(log_file_name))
      if(!(flags & BINLOG_DUMP_NON_BLOCK) &&
	 mysql_bin_log.is_active(log_file_name))
	// block until there is more data in the log
	// unless non-blocking mode requested
	{
@@ -183,7 +187,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
	  // if we did not miss anything, we just wait for other threads
	  // to signal us
          {
	    pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
	    clearerr(log);

            // tell the kill thread how to wake us up
@@ -196,18 +199,19 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)

	    bool read_packet = 0, fatal_error = 0;

	    pthread_mutex_lock(log_lock); // no one will update the log while we are reading
	    // no one will update the log while we are reading
	    // now, but we'll be quick and just read one record


	    switch(Log_event::read_log_event(log, packet))
	    switch(Log_event::read_log_event(log, packet, log_lock))
	      {
	      case 0:
		read_packet = 1; // we read successfully, so we'll need to send it to the
		read_packet = 1;
		// we read successfully, so we'll need to send it to the
		// slave
		break;
	      case LOG_READ_EOF:
		pthread_mutex_lock(log_lock);
	        pthread_cond_wait(&COND_binlog_update, log_lock);
		pthread_mutex_unlock(log_lock);
		break;

	      default:
@@ -215,7 +219,6 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
		break;
	      }

	    pthread_mutex_unlock(log_lock);

	    pthread_mutex_lock(&thd->mysys_var->mutex);
	    thd->mysys_var->current_mutex= 0;
@@ -275,7 +278,7 @@ void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
	   break;

	  (void) my_fclose(log, MYF(MY_WME));
	  log = my_fopen(log_file_name, O_RDONLY, MYF(MY_WME));
	  log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));
	  if(!log)
	    goto err;
	  // fake Rotate_log event just in case it did not make it to the log