Commit cabebacd authored by unknown's avatar unknown
Browse files

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new

parents d56ca9c7 63aba707
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
--let $binlog_start=102
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 #
--eval show binlog events from $binlog_start
+67 −2
Original line number Diff line number Diff line
drop database if exists mysqltest;
drop table if exists t1,t2;
drop table if exists t1,t2,t3;
drop database if exists mysqltest;
drop table if exists t1,t2;
drop table if exists t1,t2,t3;
reset master;
reset master;
create database mysqltest;
@@ -123,3 +123,68 @@ master-bin1.000001 # Table_map # # cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
master-bin1.000001	#	Query	#	#	use `test`; drop table `t1`
reset master;
show tables;
Tables_in_test
reset master;
show tables;
Tables_in_test
create table t1 (a int key) engine=ndb;
create table t2 (a int key) engine=ndb;
create table t3 (a int key) engine=ndb;
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
show binlog events from <binlog_start>;
Log_name	Pos	Event_type	Server_id	End_log_pos	Info
master-bin1.000001	#	Query	#	#	use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001	#	Query	#	#	use `test`; create table t2 (a int key) engine=ndb
master-bin1.000001	#	Query	#	#	use `test`; create table t3 (a int key) engine=ndb
master-bin1.000001	#	Query	#	#	BEGIN
master-bin1.000001	#	Table_map	#	#	cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
master-bin1.000001	#	Query	#	#	use `test`; rename table `test.t3` to `test.t4`
master-bin1.000001	#	Query	#	#	BEGIN
master-bin1.000001	#	Table_map	#	#	cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
master-bin1.000001	#	Query	#	#	use `test`; rename table `test.t2` to `test.t3`
master-bin1.000001	#	Query	#	#	BEGIN
master-bin1.000001	#	Table_map	#	#	cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
master-bin1.000001	#	Query	#	#	use `test`; rename table `test.t1` to `test.t2`
master-bin1.000001	#	Query	#	#	BEGIN
master-bin1.000001	#	Table_map	#	#	cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
master-bin1.000001	#	Query	#	#	use `test`; rename table `test.t4` to `test.t1`
drop table t1;
drop table t2;
drop table t3;
reset master;
show tables;
Tables_in_test
reset master;
show tables;
Tables_in_test
create table t1 (a int key) engine=ndb;
insert into t1 values(1);
rename table t1 to t2;
insert into t2 values(2);
show binlog events from <binlog_start>;
Log_name	Pos	Event_type	Server_id	End_log_pos	Info
master-bin1.000001	#	Query	#	#	use `test`; create table t1 (a int key) engine=ndb
master-bin1.000001	#	Query	#	#	BEGIN
master-bin1.000001	#	Table_map	#	#	cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Table_map	#	#	test.t1
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
master-bin1.000001	#	Query	#	#	use `test`; rename table `test.t1` to `test.t2`
master-bin1.000001	#	Query	#	#	BEGIN
master-bin1.000001	#	Table_map	#	#	cluster.apply_status
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Table_map	#	#	test.t2
master-bin1.000001	#	Write_rows	#	#	
master-bin1.000001	#	Query	#	#	COMMIT
drop table t2;
+54 −20
Original line number Diff line number Diff line
@@ -5,18 +5,16 @@
--disable_warnings
connection server2;
drop database if exists mysqltest;
drop table if exists t1,t2;
drop table if exists t1,t2,t3;
connection server1;
drop database if exists mysqltest;
drop table if exists t1,t2;
drop table if exists t1,t2,t3;
--connection server1
reset master;
--connection server2
reset master;
--enable_warnings

--let $binlog_start=102

#
# basic test to see if ddl distribution works across
# multiple binlogs
@@ -33,15 +31,10 @@ create table t1 (a int primary key) engine=ndb;

--connection server2
create table t2 (a int primary key) engine=ndb;
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 # 
--eval show binlog events from $binlog_start
--source include/show_binlog_events.inc

--connection server1
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 # 
--eval show binlog events from $binlog_start

-- source include/show_binlog_events.inc

# alter table
--connection server1
@@ -53,9 +46,7 @@ reset master;
alter table t2 add column (b int);

--connections server1
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 # 
--eval show binlog events from $binlog_start
--source include/show_binlog_events.inc


# alter database
@@ -91,9 +82,7 @@ drop database mysqltest;
create table t1 (a int primary key) engine=ndb;

--connection server2
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 # 
--eval show binlog events from $binlog_start
--source include/show_binlog_events.inc

--connection server2
drop table t2;
@@ -144,6 +133,51 @@ ENGINE =NDB;
drop table t1;

--connection server2
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 # 
--eval show binlog events from $binlog_start
--source include/show_binlog_events.inc

#
# Bug #17827 cluster: rename of several tables in one statement,
# gets multiply logged
#
--connection server1
reset master;
show tables;
--connection server2
reset master;
show tables;

--connection server1
create table t1 (a int key) engine=ndb;
create table t2 (a int key) engine=ndb;
create table t3 (a int key) engine=ndb;
rename table t3 to t4, t2 to t3, t1 to t2, t4 to t1;
--connection server2
--source include/show_binlog_events.inc

drop table t1;
drop table t2;
drop table t3;

#
# Bug #17838 binlog not setup on seconday master after rename
#
#
--connection server1
reset master;
show tables;
--connection server2
reset master;
show tables;

--connection server1
create table t1 (a int key) engine=ndb;
insert into t1 values(1);
rename table t1 to t2;
insert into t2 values(2);

# now we should see data in table t1 _and_ t2
# prior to bug fix, data was missing for t2
--connection server2
--source include/show_binlog_events.inc

drop table t2;
+202 −94
Original line number Diff line number Diff line
@@ -230,12 +230,25 @@ static void run_query(THD *thd, char *buf, char *end,
  }
}

int
static void
ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
{
  DBUG_ENTER("ndbcluster_binlog_close_table");
  if (share->table_share)
  {
    free_table_share(share->table_share);
    share->table_share= 0;
    share->table= 0;
  }
  DBUG_ASSERT(share->table == 0);
  DBUG_VOID_RETURN;
}

static int
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
                             TABLE_SHARE *table_share, TABLE *table)
{
  int error;
  MEM_ROOT *mem_root= &share->mem_root;
  DBUG_ENTER("ndbcluster_binlog_open_table");
  
  init_tmp_table_share(table_share, share->db, 0, share->table_name, 
@@ -274,22 +287,13 @@ ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
  table->s->table_name.str= share->table_name;
  table->s->table_name.length= strlen(share->table_name);
  
  DBUG_ASSERT(share->table_share == 0);
  share->table_share= table_share;
  DBUG_ASSERT(share->table == 0);
  share->table= table;
#ifndef DBUG_OFF
  dbug_print_table("table", table);
#endif
  /*
    ! do not touch the contents of the table
    it may be in use by the injector thread
  */
  share->ndb_value[0]= (NdbValue*)
    alloc_root(mem_root, sizeof(NdbValue) *
               (table->s->fields + 2 /*extra for hidden key and part key*/));
  share->ndb_value[1]= (NdbValue*)
    alloc_root(mem_root, sizeof(NdbValue) *
               (table->s->fields + 2 /*extra for hidden key and part key*/));

  DBUG_RETURN(0);
}

@@ -351,6 +355,18 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
    TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
    if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
      break;
    /*
      ! do not touch the contents of the table
      it may be in use by the injector thread
    */
    MEM_ROOT *mem_root= &share->mem_root;
    share->ndb_value[0]= (NdbValue*)
      alloc_root(mem_root, sizeof(NdbValue) *
                 (table->s->fields + 2 /*extra for hidden key and part key*/));
    share->ndb_value[1]= (NdbValue*)
      alloc_root(mem_root, sizeof(NdbValue) *
                 (table->s->fields + 2 /*extra for hidden key and part key*/));

    if (table->s->primary_key == MAX_KEY)
      share->flags|= NSF_HIDDEN_PK;
    if (table->s->blob_fields != 0)
@@ -1156,8 +1172,11 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
    (void) pthread_mutex_unlock(&share->mutex);
  }

  if (get_a_share)
  if (get_a_share && share)
  {
    free_share(&share);
    share= 0;
  }

  DBUG_RETURN(0);
}
@@ -1314,22 +1333,35 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
                         NDB_SHARE *share)
{
  DBUG_ENTER("ndb_handle_schema_change");
  int remote_drop_table= 0, do_close_cached_tables= 0;
  const char *dbname= share->table->s->db.str;
  const char *tabname= share->table->s->table_name.str;
  bool online_alter_table= (pOp->getEventType() == NDBEVENT::TE_ALTER &&
                            pOp->tableFrmChanged());
  bool do_close_cached_tables= FALSE;
  bool is_online_alter_table= FALSE;
  bool is_rename_table= FALSE;
  bool is_remote_change=
    (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();

  if (pOp->getEventType() == NDBEVENT::TE_ALTER)
  {
    if (pOp->tableFrmChanged())
    {
      is_online_alter_table= TRUE;
    }
    else
    {
      DBUG_ASSERT(pOp->tableNameChanged());
      is_rename_table= TRUE;
    }
  }

  if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
      (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
  if (is_remote_change) /* includes CLUSTER_FAILURE */
  {
    TABLE_SHARE *table_share= share->table->s;
    TABLE* table= share->table;
    TABLE_SHARE *table_share= table->s;
    const char *dbname= table_share->db.str;
    
    /* 
       Invalidate table and all it's indexes
    */
    ndb->setDatabaseName(share->table->s->db.str);
    ndb->setDatabaseName(dbname);
    Thd_ndb *thd_ndb= get_thd_ndb(thd);
    DBUG_ASSERT(thd_ndb != NULL);
    Ndb* old_ndb= thd_ndb->ndb;
@@ -1341,8 +1373,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
    table_handler.invalidate_dictionary_cache(TRUE);
    thd_ndb->ndb= old_ndb;
    
    if (online_alter_table)
    if (is_online_alter_table)
    {
      const char *tabname= table_share->table_name.str;
      char key[FN_REFLEN];
      const void *data= 0, *pack_data= 0;
      uint length, pack_length;
@@ -1375,6 +1408,7 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
          sql_print_information("NDB: Failed write frm for %s.%s, error %d",
                                dbname, tabname, error);
        }
        ndbcluster_binlog_close_table(thd, share);
        close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
        if ((error= ndbcluster_binlog_open_table(thd, share, 
                                                 table_share, table)))
@@ -1383,11 +1417,10 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
        pthread_mutex_unlock(&LOCK_open);
      }
    }
    remote_drop_table= 1;
  }

  // If only frm was changed continue replicating
  if (online_alter_table)
  if (is_online_alter_table)
  {
    /* Signal ha_ndbcluster::alter_table that drop is done */
    (void) pthread_cond_signal(&injector_cond);
@@ -1395,6 +1428,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
  }

  (void) pthread_mutex_lock(&share->mutex);
  if (is_rename_table && !is_remote_change)
  {
    DBUG_PRINT("info", ("Detected name change of table %s.%s",
                        share->db, share->table_name));
    /* ToDo: remove printout */
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
                            share_prefix, share->table->s->db.str,
                            share->table->s->table_name.str,
                            share->key);
    /* do the rename of the table in the share */
    share->table->s->db.str= share->db;
    share->table->s->db.length= strlen(share->db);
    share->table->s->table_name.str= share->table_name;
    share->table->s->table_name.length= strlen(share->table_name);
  }
  DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
  if (share->op_old == pOp)
    share->op_old= 0;
@@ -1408,11 +1457,11 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,

  pthread_mutex_lock(&ndbcluster_mutex);
  free_share(&share, TRUE);
  if (remote_drop_table && share && share->state != NSS_DROPPED)
  if (is_remote_change && share && share->state != NSS_DROPPED)
  {
    DBUG_PRINT("info", ("remote drop table"));
    DBUG_PRINT("info", ("remote change"));
    if (share->use_count != 1)
      do_close_cached_tables= 1;
      do_close_cached_tables= TRUE;
    share->state= NSS_DROPPED;
    free_share(&share, TRUE);
  }
@@ -1464,24 +1513,45 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
        int log_query= 0;
        DBUG_PRINT("info", ("log query_length: %d  query: '%s'",
                            schema->query_length, schema->query));
        char key[FN_REFLEN];
        build_table_filename(key, sizeof(key), schema->db, schema->name, "");
        NDB_SHARE *share= get_share(key, 0, false, false);

        switch ((enum SCHEMA_OP_TYPE)schema->type)
        {
        case SOT_DROP_TABLE:
          /* binlog dropping table after any table operations */
          if (ndb_binlog_running)
          if (share && share->op)
            post_epoch_log_list->push_back(schema, mem_root);
          log_query= 0;
          break;
        case SOT_RENAME_TABLE:
          /* fall through */
          if (share && share->op)
          {
            log_query= 0;
            post_epoch_log_list->push_back(schema, mem_root);
            break; /* discovery will be handled by binlog */
          }
          goto sot_create_table;
        case SOT_ALTER_TABLE:
          if (ndb_binlog_running)
          if (share && share->op)
          {
            log_query= 1;
            log_query= 0;
            post_epoch_log_list->push_back(schema, mem_root);
            break; /* discovery will be handled by binlog */
          }
          /* fall through */
          goto sot_create_table;
        case SOT_CREATE_TABLE:
      sot_create_table:
          /*
            we need to free any share here as command below
            may need to call handle_trailing_share
          */
          if (share)
          {
            free_share(&share);
            share= 0;
          }
          pthread_mutex_lock(&LOCK_open);
          if (ndb_create_table_from_engine(thd, schema->db, schema->name))
          {
@@ -1514,10 +1584,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
          break;
        case SOT_CLEAR_SLOCK:
        {
          char key[FN_REFLEN];
          build_table_filename(key, sizeof(key),
                               schema->db, schema->name, "");
          NDB_SHARE *share= get_share(key, 0, false, false);
          if (share)
          {
            pthread_mutex_lock(&share->mutex);
@@ -1528,6 +1594,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
            pthread_mutex_unlock(&share->mutex);
            pthread_cond_signal(&injector_cond);
            free_share(&share);
            share= 0;
          }
          DBUG_RETURN(0);
        }
@@ -1536,7 +1603,11 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
          log_query= 1;
          break;
        }

        if (share)
        {
          free_share(&share);
          share= 0;
        }
        /* signal that schema operation has been handled */
        if ((enum SCHEMA_OP_TYPE)schema->type != SOT_CLEAR_SLOCK)
        {
@@ -1571,23 +1642,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
    case NDBEVENT::TE_DELETE:
      // skip
      break;
    case NDBEVENT::TE_ALTER:
      if (pOp->tableNameChanged())
      {  
        DBUG_PRINT("info", ("Detected name change of table %s.%s",
                            share->db, share->table_name));
        /* do the rename of the table in the share */
        share->table->s->db.str= share->db;
        share->table->s->db.length= strlen(share->db);
        share->table->s->table_name.str= share->table_name;
        share->table->s->table_name.length= strlen(share->table_name);
      }
      ndb_handle_schema_change(thd, ndb, pOp, share);
      break;
    case NDBEVENT::TE_CLUSTER_FAILURE:
    case NDBEVENT::TE_DROP:
      free_share(&schema_share);
      schema_share= 0;
      // fall through
    case NDBEVENT::TE_ALTER:
      ndb_handle_schema_change(thd, ndb, pOp, share);
      break;
    case NDBEVENT::TE_NODE_FAILURE:
@@ -1659,6 +1719,72 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
  DBUG_RETURN(0);
}

/*
  process any operations that should be done after
  the epoch is complete
*/
static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
                                                 List<Cluster_replication_schema>
                                                 *post_epoch_log_list,
                                                 List<Cluster_replication_schema>
                                                 *post_epoch_unlock_list)
{
  DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
  Cluster_replication_schema *schema;
  while ((schema= post_epoch_log_list->pop()))
  {
    DBUG_PRINT("info", ("log query_length: %d  query: '%s'",
                        schema->query_length, schema->query));
    {
      char key[FN_REFLEN];
      build_table_filename(key, sizeof(key), schema->db, schema->name, "");
      NDB_SHARE *share= get_share(key, 0, false, false);
      switch ((enum SCHEMA_OP_TYPE)schema->type)
      {
      case SOT_DROP_DB:
      case SOT_DROP_TABLE:
        break;
      case SOT_RENAME_TABLE:
      case SOT_ALTER_TABLE:
        if (share && share->op)
        {
          break; /* discovery handled by binlog */
        }
        pthread_mutex_lock(&LOCK_open);
        if (ndb_create_table_from_engine(thd, schema->db, schema->name))
        {
          sql_print_error("Could not discover table '%s.%s' from "
                          "binlog schema event '%s' from node %d",
                          schema->db, schema->name, schema->query,
                          schema->node_id);
        }
        pthread_mutex_unlock(&LOCK_open);
      default:
        DBUG_ASSERT(false);
      }
      if (share)
      {
        free_share(&share);
        share= 0;
      }
    }
    {
      char *thd_db_save= thd->db;
      thd->db= schema->db;
      thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
                        schema->query_length, FALSE,
                        schema->name[0] == 0);
      thd->db= thd_db_save;
    }
  }
  while ((schema= post_epoch_unlock_list->pop()))
  {
    ndbcluster_update_slock(thd, schema->db, schema->name);
  }
  DBUG_VOID_RETURN;
}

/*
  Timer class for doing performance measurements
*/
@@ -2206,6 +2332,10 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
    if (share->flags & NSF_BLOB_FLAG)
      op->mergeEvents(true); // currently not inherited from event

    DBUG_PRINT("info", ("share->ndb_value[0]: 0x%x",
                        share->ndb_value[0]));
    DBUG_PRINT("info", ("share->ndb_value[1]: 0x%x",
                        share->ndb_value[1]));
    int n_columns= ndbtab->getNoOfColumns();
    int n_fields= table ? table->s->fields : 0; // XXX ???
    for (int j= 0; j < n_columns; j++)
@@ -2258,6 +2388,12 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
      }
      share->ndb_value[0][j].ptr= attr0.ptr;
      share->ndb_value[1][j].ptr= attr1.ptr;
      DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%x  "
                          "share->ndb_value[0][%d]: 0x%x",
                          j, &share->ndb_value[0][j], j, attr0.ptr));
      DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%x  "
                          "share->ndb_value[1][%d]: 0x%x",
                          j, &share->ndb_value[0][j], j, attr1.ptr));
    }
    op->setCustomData((void *) share); // set before execute
    share->op= op; // assign op in NDB_SHARE
@@ -2468,24 +2604,6 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
                        "op_old: %lx",
                       share->key, share, pOp, share->op, share->op_old));
    break;
  case NDBEVENT::TE_ALTER:
    if (pOp->tableNameChanged())
    {
      DBUG_PRINT("info", ("Detected name change of table %s.%s",
                          share->db, share->table_name));
      /* ToDo: remove printout */
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
                              share_prefix, share->table->s->db.str,
                              share->table->s->table_name.str,
                              share->key);
      /* do the rename of the table in the share */
      share->table->s->db.str= share->db;
      share->table->s->db.length= strlen(share->db);
      share->table->s->table_name.str= share->table_name;
      share->table->s->table_name.length= strlen(share->table_name);
    }
    goto drop_alter_common;
  case NDBEVENT::TE_DROP:
    if (apply_status_share == share)
    {
@@ -2495,7 +2613,8 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
    /* ToDo: remove printout */
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: drop table %s.", share->key);
drop_alter_common:
    // fall through
  case NDBEVENT::TE_ALTER:
    row.n_schemaops++;
    DBUG_PRINT("info", ("TABLE %s EVENT: %s  received share: 0x%lx  op: %lx  "
                        "share op: %lx  op_old: %lx",
@@ -3075,26 +3194,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
      }
    }

    /*
      process any operations that should be done after
      the epoch is complete
    */
    {
      Cluster_replication_schema *schema;
      while ((schema= post_epoch_unlock_list.pop()))
      {
        ndbcluster_update_slock(thd, schema->db, schema->name);
      }      
      while ((schema= post_epoch_log_list.pop()))
      {
        char *thd_db_save= thd->db;
        thd->db= schema->db;
        thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
                          schema->query_length, FALSE,
                          schema->name[0] == 0);
        thd->db= thd_db_save;
      }
    }
    ndb_binlog_thread_handle_schema_event_post_epoch(thd,
                                                     &post_epoch_log_list,
                                                     &post_epoch_unlock_list);
    free_root(&mem_root, MYF(0));
    *root_ptr= old_root;
    ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
@@ -3110,9 +3212,15 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
  sql_print_information("Stopping Cluster Binlog");

  if (apply_status_share)
  {
    free_share(&apply_status_share);
    apply_status_share= 0;
  }
  if (schema_share)
  {
    free_share(&schema_share);
    schema_share= 0;
  }

  /* remove all event operations */
  if (ndb)
+38 −2
Original line number Diff line number Diff line
@@ -689,9 +689,45 @@ NdbEventOperationImpl::receive_event()
                                error.code));
      DBUG_RETURN_EVENT(1);
    }
    if ( m_eventImpl->m_tableImpl) 
      delete m_eventImpl->m_tableImpl;

    NdbTableImpl *tmp_table_impl= m_eventImpl->m_tableImpl;
    m_eventImpl->m_tableImpl = at;

    DBUG_PRINT("info", ("switching table impl 0x%x -> 0x%x",
                        tmp_table_impl, at));

    // change the rec attrs to refer to the new table object
    int i;
    for (i = 0; i < 2; i++)
    {
      NdbRecAttr *p = theFirstPkAttrs[i];
      while (p)
      {
        int no = p->getColumn()->getColumnNo();
        NdbColumnImpl *tAttrInfo = at->getColumn(no);
        DBUG_PRINT("info", ("rec_attr: 0x%x  "
                            "switching column impl 0x%x -> 0x%x",
                            p, p->m_column, tAttrInfo));
        p->m_column = tAttrInfo;
        p = p->next();
      }
    }
    for (i = 0; i < 2; i++)
    {
      NdbRecAttr *p = theFirstDataAttrs[i];
      while (p)
      {
        int no = p->getColumn()->getColumnNo();
        NdbColumnImpl *tAttrInfo = at->getColumn(no);
        DBUG_PRINT("info", ("rec_attr: 0x%x  "
                            "switching column impl 0x%x -> 0x%x",
                            p, p->m_column, tAttrInfo));
        p->m_column = tAttrInfo;
        p = p->next();
      }
    }
    if (tmp_table_impl) 
      delete tmp_table_impl;
  }

  if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))