Commit d8be3113 authored by unknown's avatar unknown
Browse files

BUG#19459 (BINLOG RBR command does not lock tables correctly causing

crash for, e.g., NDB):

Before, mysqlbinlog printed table map events as a separate statement, so
when executing the event, the opened table was subsequently closed
when the statement ended. Instead, the row-based events that make up
a statement are now printed as *one* BINLOG statement, which means
that the table maps and the following *_rows_log_event events are
executed fully before the statement ends.

Changing implementation of BINLOG statement to be able to read the 
emitted format, which now consists of several chunks of BASE64-encoded
data.


client/mysqlbinlog.cc:
  Using IO_CACHE to print events instead of directly to file.
  Factoring out code to write event header and base64 representation into
  separate function.
mysys/mf_iocache2.c:
  Correcting name in documentation.
sql/log_event.cc:
  Adding class Write_on_release_cache that holds an IO_CACHE and that
  will write contents of IO_CACHE to a designated file on destruction.
  
  Changing signature of event printing functions print_header() and print_base64()
  to write to IO_CACHE and changing *all* calls in those functions in accordance.
  This means that all printing functions now print to an IO_CACHE instead of to a file,
  and that the IO_CACHE is then copied to the file.
  
  The print() function have the same signature as before, but since it is
  using print_header() and print_base64(), the data will now be printed
  to an IO_CACHE and then copied to the file.
  
  Changing row-based replication events to incrementally build one
  BINLOG statement for all events making up a statement.
sql/log_event.h:
  Changing signature of event printing functions print_header() and
  print_base64() to write to an IO_CACHE instead of a file.
  
  Changing row-based replication events to incrementally build one
  BINLOG statement for all events making up a statement.
  
  Adding a head_cache and a body_cache to cache statement comment 
  and statement body respectively. In addition, the head_cache is used
  when printing other events than the RBR events.
sql/sql_binlog.cc:
  Changing code to be able to decode several pieces of base64-encoded data
  for a BINLOG statement. The BINLOG statement now consists of several pieces
  of BASE64-encoded data, so once a block has been decoded and executed, the
  next block has to be read from the statement until there is no more
  data to read.
parent d9b29280
Loading
Loading
Loading
Loading
+29 −6
Original line number Diff line number Diff line
@@ -475,6 +475,30 @@ static bool check_database(const char *log_dbname)
}



static int
write_event_header_and_base64(Log_event *ev, FILE *result_file,
                              PRINT_EVENT_INFO *print_event_info)
{
  DBUG_ENTER("write_event_header_and_base64");
  /* Write header and base64 output to cache */
  IO_CACHE result_cache;
  if (init_io_cache(&result_cache, -1, 0, WRITE_CACHE, 0L, FALSE,
                    MYF(MY_WME | MY_NABP)))
  {
    return 1;
  }

  ev->print_header(&result_cache, print_event_info, FALSE);
  ev->print_base64(&result_cache, print_event_info, FALSE);

  /* Read data from cache and write to result file */
  my_b_copy_to_file(&result_cache, result_file);
  end_io_cache(&result_cache);
  DBUG_RETURN(0);
}


/*
  Process an event

@@ -537,18 +561,18 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,

    print_event_info->base64_output= opt_base64_output;

    DBUG_PRINT("debug", ("event_type: %s", ev->get_type_str()));

    switch (ev_type) {
    case QUERY_EVENT:
      if (check_database(((Query_log_event*)ev)->db))
        goto end;
      if (opt_base64_output)
      {
        ev->print_header(result_file, print_event_info);
        ev->print_base64(result_file, print_event_info);
      }
        write_event_header_and_base64(ev, result_file, print_event_info);
      else
        ev->print(result_file, print_event_info);
      break;

    case CREATE_FILE_EVENT:
    {
      Create_file_log_event* ce= (Create_file_log_event*)ev;
@@ -569,8 +593,7 @@ int process_event(PRINT_EVENT_INFO *print_event_info, Log_event *ev,
      */
      if (opt_base64_output)
      {
        ce->print_header(result_file, print_event_info);
        ce->print_base64(result_file, print_event_info);
        write_event_header_and_base64(ce, result_file, print_event_info);
      }
      else
        ce->print(result_file, print_event_info, TRUE);
+1 −1
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@
  Copy contents of an IO_CACHE to a file.

  SYNOPSIS
    copy_io_cache_to_file()
    my_b_copy_to_file()
    cache  IO_CACHE to copy from
    file   File to copy to

+370 −205

File changed.

Preview size limit exceeded, changes collapsed.

+26 −4
Original line number Diff line number Diff line
@@ -519,14 +519,30 @@ typedef struct st_print_event_info
      bzero(db, sizeof(db));
      bzero(charset, sizeof(charset));
      bzero(time_zone_str, sizeof(time_zone_str));
      uint const flags = MYF(MY_WME | MY_NABP);
      init_io_cache(&head_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags);
      init_io_cache(&body_cache, -1, 0, WRITE_CACHE, 0L, FALSE, flags);
    }

  ~st_print_event_info() {
    end_io_cache(&head_cache);
    end_io_cache(&body_cache);
  }


  /* Settings on how to print the events */
  bool short_form;
  bool base64_output;
  my_off_t hexdump_from;
  uint8 common_header_len;

  /*
     These two caches are used by the row-based replication events to
     collect the header information and the main body of the events
     making up a statement.
   */
  IO_CACHE head_cache;
  IO_CACHE body_cache;
} PRINT_EVENT_INFO;
#endif

@@ -637,9 +653,11 @@ class Log_event
                                   const Format_description_log_event *description_event);
  /* print*() functions are used by mysqlbinlog */
  virtual void print(FILE* file, PRINT_EVENT_INFO* print_event_info) = 0;
  void print_timestamp(FILE* file, time_t *ts = 0);
  void print_header(FILE* file, PRINT_EVENT_INFO* print_event_info);
  void print_base64(FILE* file, PRINT_EVENT_INFO* print_event_info);
  void print_timestamp(IO_CACHE* file, time_t *ts = 0);
  void print_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
                    bool is_more);
  void print_base64(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info,
                    bool is_more);
#endif

  static void *operator new(size_t size)
@@ -804,7 +822,7 @@ class Query_log_event: public Log_event
                 uint32 q_len_arg);
#endif /* HAVE_REPLICATION */
#else
  void print_query_header(FILE* file, PRINT_EVENT_INFO* print_event_info);
  void print_query_header(IO_CACHE* file, PRINT_EVENT_INFO* print_event_info);
  void print(FILE* file, PRINT_EVENT_INFO* print_event_info);
#endif

@@ -1843,6 +1861,10 @@ class Rows_log_event : public Log_event
		 Log_event_type event_type,
		 const Format_description_log_event *description_event);

#ifdef MYSQL_CLIENT
  void print_helper(FILE *, PRINT_EVENT_INFO *, char const *const name);
#endif

#ifndef MYSQL_CLIENT
  virtual int do_add_row_data(byte *data, my_size_t length);
#endif
+92 −41
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@

void mysql_client_binlog_statement(THD* thd)
{
  DBUG_ENTER("mysql_client_binlog_statement");
  DBUG_PRINT("info",("binlog base64: '%*s'",
                     (thd->lex->comment.length < 2048 ?
                      thd->lex->comment.length : 2048),
@@ -43,8 +44,8 @@ void mysql_client_binlog_statement(THD* thd)
  my_bool nsok= thd->net.no_send_ok;
  thd->net.no_send_ok= TRUE;

  const my_size_t coded_len= thd->lex->comment.length + 1;
  const my_size_t event_len= base64_needed_decoded_length(coded_len);
  my_size_t coded_len= thd->lex->comment.length + 1;
  my_size_t decoded_len= base64_needed_decoded_length(coded_len);
  DBUG_ASSERT(coded_len > 0);

  /*
@@ -57,9 +58,8 @@ void mysql_client_binlog_statement(THD* thd)
    new Format_description_log_event(4);

  const char *error= 0;
  char *buf= (char *) my_malloc(event_len, MYF(MY_WME));
  char *buf= (char *) my_malloc(decoded_len, MYF(MY_WME));
  Log_event *ev = 0;
  int res;

  /*
    Out of memory check
@@ -73,45 +73,99 @@ void mysql_client_binlog_statement(THD* thd)
  thd->rli_fake->sql_thd= thd;
  thd->rli_fake->no_storage= TRUE;

  res= base64_decode(thd->lex->comment.str, coded_len, buf);
  for (char const *strptr= thd->lex->comment.str ;
       strptr < thd->lex->comment.str + thd->lex->comment.length ; )
  {
    char const *endptr= 0;
    int bytes_decoded= base64_decode(strptr, coded_len, buf, &endptr);

    DBUG_PRINT("info",
               ("bytes_decoded=%d; strptr=0x%lu; endptr=0x%lu ('%c':%d)",
                bytes_decoded, strptr, endptr, *endptr, *endptr));

    if (bytes_decoded < 0)
    {
      my_error(ER_BASE64_DECODE_ERROR, MYF(0));
      goto end;
    }
    else if (bytes_decoded == 0)
      break; // If no bytes where read, the string contained only whitespace

    DBUG_ASSERT(bytes_decoded > 0);
    DBUG_ASSERT(endptr > strptr);
    coded_len-= endptr - strptr;
    strptr= endptr;

    /*
      Now we have one or more events stored in the buffer. The size of
      the buffer is computed based on how much base64-encoded data
      there were, so there should be ample space for the data (maybe
      even too much, since a statement can consist of a considerable
      number of events).

      TODO: Switch to use a stream-based base64 encoder/decoder in
      order to be able to read exactly what is necessary.
    */

    DBUG_PRINT("info",("binlog base64 decoded_len=%d, bytes_decoded=%d",
                       decoded_len, bytes_decoded));

  DBUG_PRINT("info",("binlog base64 decoded_len=%d, event_len=%d\n",
                     res, uint4korr(buf + EVENT_LEN_OFFSET)));
    /*
    Note that 'res' is the correct event length, 'event_len' was
    calculated based on the base64-string that possibly contained
    extra spaces, so it can be longer than the real event.
      Now we start to read events of the buffer, until there are no
      more.
    */
  if (res < EVENT_LEN_OFFSET
      || (uint) res != uint4korr(buf+EVENT_LEN_OFFSET))
    for (char *bufptr= buf ; bytes_decoded > 0 ; )
    {
      /*
        Checking that the first event in the buffer is not truncated.
      */
      ulong event_len= uint4korr(bufptr + EVENT_LEN_OFFSET);
      DBUG_PRINT("info", ("event_len=%lu, bytes_decoded=%d",
                          event_len, bytes_decoded));
      if (bytes_decoded < EVENT_LEN_OFFSET || (uint) bytes_decoded < event_len)
      {
        my_error(ER_SYNTAX_ERROR, MYF(0));
        goto end;
      }

  ev= Log_event::read_log_event(buf, res, &error, desc);
      ev= Log_event::read_log_event(bufptr, event_len, &error, desc);

      DBUG_PRINT("info",("binlog base64 err=%s", error));
      if (!ev)
      {
        /*
      This could actually be an out-of-memory, but it is more
      likely causes by a bad statement
          This could actually be an out-of-memory, but it is more likely
          causes by a bad statement
        */
        my_error(ER_SYNTAX_ERROR, MYF(0));
        goto end;
      }

  DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
  DBUG_PRINT("info",("buf+EVENT_TYPE_OFFSET=%d", buf+EVENT_TYPE_OFFSET));
      bytes_decoded -= event_len;
      bufptr += event_len;

      DBUG_PRINT("info",("ev->get_type_code()=%d", ev->get_type_code()));
      DBUG_PRINT("info",("bufptr+EVENT_TYPE_OFFSET=0x%lx",
                         bufptr+EVENT_TYPE_OFFSET));
      DBUG_PRINT("info", ("bytes_decoded=%d; bufptr=0x%lx; buf[EVENT_LEN_OFFSET]=%u",
                          bytes_decoded, bufptr, uint4korr(bufptr+EVENT_LEN_OFFSET)));
      ev->thd= thd;
  if (ev->exec_event(thd->rli_fake))
      if (int err= ev->exec_event(thd->rli_fake))
      {
        DBUG_PRINT("info", ("exec_event() - error=%d", error));
        /*
          TODO: Maybe a better error message since the BINLOG statement
          now contains several events.
        */
        my_error(ER_UNKNOWN_ERROR, MYF(0), "Error executing BINLOG statement");
        goto end;
      }

      delete ev;
      ev= 0;
    }
  }

  /*
    Restore setting of no_send_ok
  */
@@ -126,10 +180,7 @@ void mysql_client_binlog_statement(THD* thd)
  */
  thd->net.no_send_ok= nsok;

  if (ev)
    delete ev;
  if (desc)
  delete desc;
  if (buf)
    my_free(buf, MYF(0));
  my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
  DBUG_VOID_RETURN;
}