Commit 53def7bd authored by unknown's avatar unknown
Browse files

BUG#20821 (INSERT DELAYED fails to write some rows to binlog):

Reverting to old behaviour of writing the query before all rows
have been written.


mysql-test/r/rpl_row_delayed_ins.result:
  Result change
sql/sql_class.cc:
  Adding debug message to binlog_query()
sql/sql_insert.cc:
  - Changing write_delayed() to use a LEX_STRING for the query.
  - Adding query string to class delayed_row.
  - Removing query string from class delayed_insert.
  - Adding code to copy query string and delete it when the row
    is executed.
  - Logging query at first row instead of after all rows are 
    inserted (reverting to old behaviour).
  - Flushing the pending row event after all rows have been inserted.
    This is necessary since binlog_query() is called before all rows
    instead of after.
mysql-test/r/rpl_insert.result:
  New BitKeeper file ``mysql-test/r/rpl_insert.result''
mysql-test/t/rpl_insert.test:
  New BitKeeper file ``mysql-test/t/rpl_insert.test''
parent a042f188
Loading
Loading
Loading
Loading
+16 −0
Original line number Diff line number Diff line
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
CREATE SCHEMA IF NOT EXISTS mysqlslap;
USE mysqlslap;
CREATE TABLE t1 (id INT, name VARCHAR(64));
SELECT COUNT(*) FROM mysqlslap.t1;
COUNT(*)
20000
SELECT COUNT(*) FROM mysqlslap.t1;
COUNT(*)
20000
DROP SCHEMA IF EXISTS mysqlslap;
+4 −2
Original line number Diff line number Diff line
@@ -17,8 +17,10 @@ Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001	4	Format_desc	1	102	Server ver: VERSION, Binlog ver: 4
master-bin.000001	102	Query	1	222	use `test`; create table t1(a int not null primary key) engine=myisam
master-bin.000001	222	Table_map	1	261	table_id: # (test.t1)
master-bin.000001	261	Write_rows	1	305	table_id: # flags: STMT_END_F
master-bin.000001	305	Query	1	380	use `test`; flush tables
master-bin.000001	261	Write_rows	1	295	table_id: # flags: STMT_END_F
master-bin.000001	295	Table_map	1	334	table_id: # (test.t1)
master-bin.000001	334	Write_rows	1	373	table_id: # flags: STMT_END_F
master-bin.000001	373	Query	1	448	use `test`; flush tables
SELECT * FROM t1 ORDER BY a;
a
1
+27 −0
Original line number Diff line number Diff line

#
# Bug#20821: INSERT DELAYED fails to write some rows to binlog
#

--source include/master-slave.inc
--source include/not_embedded.inc
--source include/not_windows.inc

--disable_warnings
CREATE SCHEMA IF NOT EXISTS mysqlslap;
USE mysqlslap;
--enable_warnings

CREATE TABLE t1 (id INT, name VARCHAR(64));

let $query = "INSERT INTO t1 VALUES (1, 'Dr. No'), (2, 'From Russia With Love'), (3, 'Goldfinger'), (4, 'Thunderball'), (5, 'You Only Live Twice')";
--exec $MYSQL_SLAP --silent --concurrency=20 --iterations=200 --query=$query --delimiter=";"

SELECT COUNT(*) FROM mysqlslap.t1;
sync_slave_with_master;
SELECT COUNT(*) FROM mysqlslap.t1;

connection master;
DROP SCHEMA IF EXISTS mysqlslap;
sync_slave_with_master;
+1 −0
Original line number Diff line number Diff line
@@ -2717,6 +2717,7 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype,
                      bool is_trans, bool suppress_use)
{
  DBUG_ENTER("THD::binlog_query");
  DBUG_PRINT("enter", ("qtype=%d, query='%s'", qtype, query));
  DBUG_ASSERT(query && mysql_bin_log.is_open());

  switch (qtype) {
+58 −40
Original line number Diff line number Diff line
@@ -26,8 +26,8 @@
static int check_null_fields(THD *thd,TABLE *entry);
#ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
			 char *query, uint query_length, bool log_on);
static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
                         LEX_STRING query, bool ignore, bool log_on);
static void end_delayed_insert(THD *thd);
pthread_handler_t handle_delayed_insert(void *arg);
static void unlink_blobs(register TABLE *table);
@@ -511,7 +511,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
#ifndef EMBEDDED_LIBRARY
    if (lock_type == TL_WRITE_DELAYED)
    {
      error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
      LEX_STRING const st_query = { query, thd->query_length };
      error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
      query=0;
    }
    else
@@ -1237,11 +1238,16 @@ class delayed_row :public ilink {
  bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
  ulonglong last_insert_id;
  timestamp_auto_set_type timestamp_field_type;
  LEX_STRING query;

  delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
    :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
  delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
              bool ignore_arg, bool log_query_arg)
    : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
      query(query_arg)
    {}
  ~delayed_row()
  {
    x_free(query.str);
    x_free(record);
  }
};
@@ -1249,9 +1255,6 @@ class delayed_row :public ilink {

class delayed_insert :public ilink {
  uint locks_in_memory;
  char *query;
  ulong query_length;
  ulong query_allocated;
public:
  THD thd;
  TABLE *table;
@@ -1265,7 +1268,7 @@ class delayed_insert :public ilink {
  TABLE_LIST table_list;			// Argument

  delayed_insert()
    :locks_in_memory(0), query(0), query_length(0), query_allocated(0),
    :locks_in_memory(0),
     table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
     group_count(0)
  {
@@ -1291,7 +1294,6 @@ class delayed_insert :public ilink {
  }
  ~delayed_insert()
  {
    my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
    /* The following is not really needed, but just for safety */
    delayed_row *row;
    while ((row=rows.get()))
@@ -1311,25 +1313,6 @@ class delayed_insert :public ilink {
    VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
  }

  int set_query(char const *q, ulong qlen) {
    if (q && qlen > 0)
    {
      if (query_allocated < qlen + 1)
      {
        ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
        query= my_realloc(query, qlen + 1, MYF(flags));
        if (query == 0)
          return HA_ERR_OUT_OF_MEM;
        query_allocated= qlen;
      }
      query_length= qlen;
      memcpy(query, q, qlen + 1);
    }
    else
      query_length= 0;
    return 0;
  }

  /* The following is for checking when we can delete ourselves */
  inline void lock()
  {
@@ -1616,13 +1599,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)

/* Put a question in queue */

static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
                         bool ignore, char *query, uint query_length,
                         bool log_on)
static int
write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
              LEX_STRING query, bool ignore, bool log_on)
{
  delayed_row *row=0;
  delayed_row *row;
  delayed_insert *di=thd->di;
  DBUG_ENTER("write_delayed");
  DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));

  thd->proc_info="waiting for handler insert";
  pthread_mutex_lock(&di->mutex);
@@ -1630,13 +1614,28 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
    pthread_cond_wait(&di->cond_client,&di->mutex);
  thd->proc_info="storing row into queue";

  if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
  if (thd->killed)
    goto err;

  /*
    Take a copy of the query string, if there is any. The string will
    be free'ed when the row is destroyed. If there is no query string,
    we don't do anything special.
   */

  if (query.str)
    if (!(query.str= my_strndup(query.str, MYF(MY_WME), query.length)))
      goto err;
  row= new delayed_row(query, duplic, ignore, log_on);
  if (row == NULL)
  {
    my_free(query.str, MYF(MY_WME));
    goto err;
  }

  if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
    goto err;
  memcpy(row->record, table->record[0], table->s->reclength);
  di->set_query(query, query_length);
  row->start_time=		thd->start_time;
  row->query_start_used=	thd->query_start_used;
  row->last_insert_id_used=	thd->last_insert_id_used;
@@ -1995,7 +1994,7 @@ bool delayed_insert::handle_inserts(void)
  if (thd.killed || table->s->version != refresh_version)
  {
    thd.killed= THD::KILL_CONNECTION;
    max_rows= ~(ulong)0;                        // Do as much as possible
    max_rows= ULONG_MAX;                     // Do as much as possible
  }

  /*
@@ -2042,11 +2041,18 @@ bool delayed_insert::handle_inserts(void)
      thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
      row->log_query = 0;
    }

    if (using_ignore)
    {
      using_ignore=0;
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
    }

    if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
      thd.binlog_query(THD::ROW_QUERY_TYPE,
                       row->query.str, row->query.length,
                       FALSE, FALSE);

    if (table->s->blob_fields)
      free_delayed_insert_blobs(table);
    thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
@@ -2093,13 +2099,25 @@ bool delayed_insert::handle_inserts(void)
	pthread_cond_broadcast(&cond_client);	// If waiting clients
    }
  }

  thd.proc_info=0;
  pthread_mutex_unlock(&mutex);

  /* After releasing the mutex, to prevent deadlocks. */
  if (mysql_bin_log.is_open())
    thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
#ifdef HAVE_ROW_BASED_REPLICATION
  /*
    We need to flush the pending event when using row-based
    replication since the flushing normally done in binlog_query() is
    not done last in the statement: for delayed inserts, the insert
    statement is logged *before* all rows are inserted.

    We can flush the pending event without checking the thd->lock
    since the delayed insert *thread* is not inside a stored function
    or trigger.

    TODO: Move the logging to last in the sequence of rows.
   */
  if (thd.current_stmt_binlog_row_based)
    thd.binlog_flush_pending_rows_event(TRUE);
#endif /* HAVE_ROW_BASED_REPLICATION */

  if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
  {						// This shouldn't happen