Commit 6ed9fc6b authored by unknown's avatar unknown
Browse files

BUG#25688 (RBR: circular replication may cause STMT_END_F flags to be

skipped):

By moving statement end actions from Rows_log_event::do_apply_event() to
Rows_log_event::do_update_pos() they will always be executed, even if
Rows_log_event::do_apply_event() is skipped because the event originated
at the same server. This because Rows_log_event::do_update_pos() is always
executed (unless Rows_log_event::do_apply_event() failed with an error,
in which case the slave stops with an error anyway). 

Adding test case.

Fixing logic to detect if inside a group. If a rotate event occured
when an initial prefix of events for a statement, but for which the
table did contain a key, last_event_start_time is set to zero, causing
rotate to end the group but without unlocking any tables. This left a
lock hanging around, which subsequently triggered an assertion when a
second attempt was made to lock the same sequence of tables.

In order to solve the above problem, a new flag was added to the relay
log info structure that is used to indicate that the replication thread
is currently executing a statement. Using this flag, the replication
thread is in a group if it is either in a statement or inside a trans-
action.

The patch also eliminates some gratuitous header file inclusions that
were not needed (and caused compile errors) and replaced them with
forward definitions.


sql/item_func.cc:
  Including definition of MASTER_INFO.
sql/log.cc:
  Including definition of RELAY_LOG_INFO since it is used in the file.
sql/log_event.cc:
  Moving statement end actions from Rows_log_event::do_apply_event() to
  Rows_log_event::do_update_pos().
  Factoring out code to update group positions and event positions into
  relay log info structure.
  ---
  Adding debugging printouts.
  Fixing logic to detect if inside a group.
sql/log_event.h:
  Adding Rows_log_event::do_update_pos().
sql/mysqld.cc:
  Including definition of MASTER_INFO.
sql/repl_failsafe.cc:
  Including definition of MASTER_INFO.
sql/rpl_mi.h:
  Including definition of RELAY_LOG_INFO since it is used in the file.
sql/rpl_rli.cc:
  Adding member function stmt_done() to do after-statement updates of the
  relay log info structure.
sql/rpl_rli.h:
  Adding member function stmt_done() to do after-statement updates of the
  relay log info structure.
sql/set_var.cc:
  Including definition of MASTER_INFO.
sql/slave.cc:
  Adding debuging printouts.
sql/slave.h:
  Removing inclusion definitions of MASTER_INFO and RELAY_LOG_INFO and
  replacing them with forward declarations since the classes are not
  used in the file. The gratuitous inclusion lead to compile errors in
  the two classes above in files that used neither.
sql/sql_binlog.cc:
  Including definition of RELAY_LOG_INFO since it is used in the file.
sql/sql_class.cc:
  Including definition of RELAY_LOG_INFO since it is used in the file.
sql/sql_class.h:
  Removing inclusion definitions of RELAY_LOG_INFO and replacing it
  with forward declaration since the class is not used in the file.
  The gratuitous inclusion lead to compile errors in the class above
  in files didn't use the class.
sql/sql_insert.cc:
  Including definition of MASTER_INFO.
sql/sql_repl.cc:
  Including definition of MASTER_INFO.
mysql-test/r/rpl_ndb_circular_simplex.result:
  New BitKeeper file ``mysql-test/r/rpl_ndb_circular_simplex.result''
mysql-test/t/rpl_ndb_circular_simplex.test:
  New BitKeeper file ``mysql-test/t/rpl_ndb_circular_simplex.test''
parent 6d9d64ba
Loading
Loading
Loading
Loading
+61 −0
Original line number Diff line number Diff line
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
CREATE TABLE t1 (a int key, b int) ENGINE=NDB;
SHOW TABLES;
Tables_in_test
t1
RESET MASTER;
INSERT INTO t1 VALUES (1,2);
INSERT INTO t1 VALUES (2,3);
STOP SLAVE;
CHANGE MASTER TO MASTER_HOST="127.0.0.1",MASTER_PORT=SLAVE_PORT,MASTER_USER="root";
RESET MASTER;
START SLAVE;
SHOW SLAVE STATUS;
Slave_IO_State	#
Master_Host	127.0.0.1
Master_User	root
Master_Port	9308
Connect_Retry	60
Master_Log_File	slave-bin.000001
Read_Master_Log_Pos	468
Relay_Log_File	#
Relay_Log_Pos	#
Relay_Master_Log_File	slave-bin.000001
Slave_IO_Running	Yes
Slave_SQL_Running	Yes
Replicate_Do_DB	
Replicate_Ignore_DB	
Replicate_Do_Table	
Replicate_Ignore_Table	
Replicate_Wild_Do_Table	
Replicate_Wild_Ignore_Table	
Last_Errno	0
Last_Error	
Skip_Counter	0
Exec_Master_Log_Pos	468
Relay_Log_Space	#
Until_Condition	None
Until_Log_File	
Until_Log_Pos	0
Master_SSL_Allowed	No
Master_SSL_CA_File	
Master_SSL_CA_Path	
Master_SSL_Cert	
Master_SSL_Cipher	
Master_SSL_Key	
Seconds_Behind_Master	#
SELECT * FROM t1 ORDER BY a;
a	b
1	2
2	3
STOP SLAVE;
START SLAVE;
SELECT * FROM t1 ORDER BY a;
a	b
1	2
2	3
+77 −0
Original line number Diff line number Diff line
--source include/have_ndb.inc
--source include/have_binlog_format_row.inc
--source include/master-slave.inc

connection master;
CREATE TABLE t1 (a int key, b int) ENGINE=NDB;
sync_slave_with_master;
SHOW TABLES;

# Lose the events from the slave binary log: there is no
# need to re-create the table on the master.
connection slave;
RESET MASTER;

# Insert some values on the slave and master
connection master;
INSERT INTO t1 VALUES (1,2);
# Switch to slave once event is applied and insert a row
sync_slave_with_master;
connection slave;
INSERT INTO t1 VALUES (2,3);

# ... it is now very probable that we have a mixed event in the binary
# log.  If we don't, the test should still pass, but will not test the
# mixed event situation.

# The statement is disabled since it cannot reliably show the same
# info all the time.  Use it for debug purposes.

#SHOW BINLOG EVENTS;

# Replicate back to the master to test this mixed event on the master
STOP SLAVE;

connection master;
--replace_result $SLAVE_MYPORT SLAVE_PORT
eval CHANGE MASTER TO MASTER_HOST="127.0.0.1",MASTER_PORT=$SLAVE_MYPORT,MASTER_USER="root";

RESET MASTER;
START SLAVE;

connection slave;
save_master_pos;
connection master;
sync_with_master;

# The statement is disabled since it cannot reliably show the same
# info all the time.  Use it for debug purposes.

#SHOW BINLOG EVENTS;

# Check that there is no error in replication
--replace_result $MASTER_MYPORT MASTER_PORT
--replace_column 1 # 8 # 9 # 23 # 33 #
query_vertical SHOW SLAVE STATUS;

# Check that we have the data on the master
SELECT * FROM t1 ORDER BY a;

# We should now have another mixed event, likely with "slave" server
# id last, and with the STMT_END_F flag set.

# The statement is disabled since it cannot reliably show the same
# info all the time.  Use it for debug purposes.

#SHOW BINLOG EVENTS;

# now lets see that this data is applied correctly on the slave
STOP SLAVE;
save_master_pos;

connection slave;
START SLAVE;

# check that we have the data on the slave
sync_with_master;
SELECT * FROM t1 ORDER BY a;
+1 −0
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@

#include "mysql_priv.h"
#include "slave.h"				// for wait_for_master_pos
#include "rpl_mi.h"
#include <m_ctype.h>
#include <hash.h>
#include <time.h>
+1 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@
#include "mysql_priv.h"
#include "sql_repl.h"
#include "rpl_filter.h"
#include "rpl_rli.h"

#include <my_dir.h>
#include <stdarg.h>
+96 −86
Original line number Diff line number Diff line
@@ -22,6 +22,8 @@

#include "mysql_priv.h"
#include "slave.h"
#include "rpl_rli.h"
#include "rpl_mi.h"
#include "rpl_filter.h"
#include "rpl_utility.h"
#include <my_dir.h>
@@ -31,6 +33,8 @@

#define log_cs	&my_charset_latin1

#define FLAGSTR(V,F) ((V)&(F)?#F" ":"")

/*
  Cache that will automatically be written to a dedicated file on
  destruction.
@@ -547,49 +551,7 @@ int Log_event::do_update_pos(RELAY_LOG_INFO *rli)
    Matz: I don't think we will need this check with this refactoring.
  */
  if (rli)
  {
    /*
      If in a transaction, and if the slave supports transactions, just
      inc_event_relay_log_pos(). We only have to check for OPTION_BEGIN
      (not OPTION_NOT_AUTOCOMMIT) as transactions are logged with
      BEGIN/COMMIT, not with SET AUTOCOMMIT= .

      CAUTION: opt_using_transactions means
      innodb || bdb ; suppose the master supports InnoDB and BDB,
      but the slave supports only BDB, problems
      will arise:
      - suppose an InnoDB table is created on the master,
      - then it will be MyISAM on the slave
      - but as opt_using_transactions is true, the slave will believe he
      is transactional with the MyISAM table. And problems will come
      when one does START SLAVE; STOP SLAVE; START SLAVE; (the slave
      will resume at BEGIN whereas there has not been any rollback).
      This is the problem of using opt_using_transactions instead of a
      finer "does the slave support
      _the_transactional_handler_used_on_the_master_".

      More generally, we'll have problems when a query mixes a
      transactional handler and MyISAM and STOP SLAVE is issued in the
      middle of the "transaction". START SLAVE will resume at BEGIN
      while the MyISAM table has already been updated.
    */
    if ((thd->options & OPTION_BEGIN) && opt_using_transactions)
      rli->inc_event_relay_log_pos();
    else
    {
      rli->inc_group_relay_log_pos(log_pos);
      flush_relay_log_info(rli);
      /*
         Note that Rotate_log_event::do_apply_event() does not call
         this function, so there is no chance that a fake rotate event
         resets last_master_timestamp.  Note that we update without
         mutex (probably ok - except in some very rare cases, only
         consequence is that value may take some time to display in
         Seconds_Behind_Master - not critical).
      */
      rli->last_master_timestamp= when;
    }
  }
    rli->stmt_done(log_pos, when);

  return 0;                                   // Cannot fail currently
}
@@ -1024,6 +986,10 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
    break;
  }

  DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)",
                            ev ? ev->get_type_str() : "<unknown>",
                            buf[EVENT_TYPE_OFFSET],
                            event_len));
  /*
    is_valid() are small event-specific sanity tests which are
    important; for example there are some my_malloc() in constructors
@@ -3578,17 +3544,6 @@ bool Rotate_log_event::write(IO_CACHE* file)
}
#endif

/**
   Helper function to detect if the event is inside a group.
 */
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
static bool is_in_group(THD *const thd, RELAY_LOG_INFO *const rli)
{
  return (thd->options & OPTION_BEGIN) != 0 ||
         (rli->last_event_start_time > 0);
}
#endif


/*
  Rotate_log_event::do_apply_event()
@@ -3639,7 +3594,7 @@ int Rotate_log_event::do_update_pos(RELAY_LOG_INFO *rli)
    relay log, which shall not change the group positions.
  */
  if ((server_id != ::server_id || rli->replicate_same_server_id) &&
      !is_in_group(thd, rli))
      !rli->is_in_group())
  {
    DBUG_PRINT("info", ("old group_master_log_name: '%s'  "
                        "old group_master_log_pos: %lu",
@@ -3803,6 +3758,12 @@ void Intvar_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION)&& !defined(MYSQL_CLIENT)
int Intvar_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
  /*
    We are now in a statement until the associated query log event has
    been processed.
   */
  const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);

  switch (type) {
  case LAST_INSERT_ID_EVENT:
    thd->stmt_depends_on_first_successful_insert_id_in_prev_stmt= 1;
@@ -3903,6 +3864,12 @@ void Rand_log_event::print(FILE* file, PRINT_EVENT_INFO* print_event_info)
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
int Rand_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
{
  /*
    We are now in a statement until the associated query log event has
    been processed.
   */
  const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);

  thd->rand.seed1= (ulong) seed1;
  thd->rand.seed2= (ulong) seed2;
  return 0;
@@ -4297,6 +4264,12 @@ int User_var_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
  double real_val;
  longlong int_val;

  /*
    We are now in a statement until the associated query log event has
    been processed.
   */
  const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);

  if (is_null)
  {
    it= new Item_null();
@@ -6171,6 +6144,17 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
    /* A small test to verify that objects have consistent types */
    DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));

    /*
      Now we are in a statement and will stay in a statement until we
      see a STMT_END_F.

      We set this flag here, before actually applying any rows, in
      case the SQL thread is stopped and we need to detect that we're
      inside a statement and halting abruptly might cause problems
      when restarting.
     */
    const_cast<RELAY_LOG_INFO*>(rli)->set_flag(RELAY_LOG_INFO::IN_STMT);

    error= do_before_row_operations(table);
    while (error == 0 && row_start < (const char*) m_rows_end)
    {
@@ -6243,6 +6227,45 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
    DBUG_RETURN(error);
  }

  /*
    This code would ideally be placed in do_update_pos() instead, but
    since we have no access to table there, we do the setting of
    last_event_start_time here instead.
  */
  if (table && (table->s->primary_key == MAX_KEY) &&
      !cache_stmt && get_flags(STMT_END_F) == RLE_NO_FLAGS)
  {
    /*
      ------------ Temporary fix until WL#2975 is implemented ---------

      This event is not the last one (no STMT_END_F). If we stop now
      (in case of terminate_slave_thread()), how will we restart? We
      have to restart from Table_map_log_event, but as this table is
      not transactional, the rows already inserted will still be
      present, and idempotency is not guaranteed (no PK) so we risk
      that repeating leads to double insert. So we desperately try to
      continue, hope we'll eventually leave this buggy situation (by
      executing the final Rows_log_event). If we are in a hopeless
      wait (reached end of last relay log and nothing gets appended
      there), we timeout after one minute, and notify DBA about the
      problem.  When WL#2975 is implemented, just remove the member
      st_relay_log_info::last_event_start_time and all its occurences.
    */
    const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
  }

  DBUG_RETURN(0);
}

int
Rows_log_event::do_update_pos(RELAY_LOG_INFO *rli)
{
  DBUG_ENTER("Rows_log_event::do_update_pos");
  int error= 0;

  DBUG_PRINT("info", ("flags: %s",
                      get_flags(STMT_END_F) ? "STMT_END_F " : ""));

  if (get_flags(STMT_END_F))
  {
    /*
@@ -6261,6 +6284,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
      replicate-ignore rules).
    */
    thd->binlog_flush_pending_rows_event(true);

    /*
      If this event is not in a transaction, the call below will, if some
      transactional storage engines are involved, commit the statement into
@@ -6271,6 +6295,7 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
      binlog.
    */
    error= ha_autocommit_or_rollback(thd, 0);

    /*
      Now what if this is not a transactional engine? we still need to
      flush the pending event to the binlog; we did it with
@@ -6282,10 +6307,16 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
    */

    thd->reset_current_stmt_binlog_row_based();
    const_cast<RELAY_LOG_INFO*>(rli)->cleanup_context(thd, 0);

    rli->cleanup_context(thd, 0);
    if (error == 0)
    {
      /*
        Indicate that a statement is finished.
        Step the group log position if we are not in a transaction,
        otherwise increase the event log position.
       */
      rli->stmt_done(log_pos, when);

      /*
        Clear any errors pushed in thd->net.last_err* if for example "no key
        found" (as this is allowed). This is a safety measure; apparently
@@ -6298,38 +6329,15 @@ int Rows_log_event::do_apply_event(RELAY_LOG_INFO const *rli)
    }
    else
      slave_print_msg(ERROR_LEVEL, rli, error,
                      "Error in %s event: commit of row events failed, "
                      "table `%s`.`%s`",
                      get_type_str(), table->s->db.str, 
                      table->s->table_name.str);
    DBUG_RETURN(error);
                      "Error in %s event: commit of row events failed",
                      get_type_str());
  }

  if (table && (table->s->primary_key == MAX_KEY) && !cache_stmt)
  else
  {
    /*
      ------------ Temporary fix until WL#2975 is implemented ---------

      This event is not the last one (no STMT_END_F). If we stop now
      (in case of terminate_slave_thread()), how will we restart? We
      have to restart from Table_map_log_event, but as this table is
      not transactional, the rows already inserted will still be
      present, and idempotency is not guaranteed (no PK) so we risk
      that repeating leads to double insert. So we desperately try to
      continue, hope we'll eventually leave this buggy situation (by
      executing the final Rows_log_event). If we are in a hopeless
      wait (reached end of last relay log and nothing gets appended
      there), we timeout after one minute, and notify DBA about the
      problem.  When WL#2975 is implemented, just remove the member
      st_relay_log_info::last_event_start_time and all its occurences.
    */
    const_cast<RELAY_LOG_INFO*>(rli)->last_event_start_time= time(0);
    rli->inc_event_relay_log_pos();
  }

  DBUG_ASSERT(error == 0);
  thd->clear_error();

  DBUG_RETURN(0);
  DBUG_RETURN(error);
}

#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
@@ -6413,7 +6421,9 @@ void Rows_log_event::print_helper(FILE *file,
  {
    bool const last_stmt_event= get_flags(STMT_END_F);
    print_header(head, print_event_info, !last_stmt_event);
    my_b_printf(head, "\t%s: table id %lu\n", name, m_table_id);
    my_b_printf(head, "\t%s: table id %lu%s\n",
                name, m_table_id,
                last_stmt_event ? " flags: STMT_END_F" : "");
    print_base64(body, print_event_info, !last_stmt_event);
  }

Loading