Commit 2f333aae authored by unknown's avatar unknown
Browse files

Bug #18947 CRBR: order in binlog of create table and insert (on different table) not determ

- wait for schema event also if mysqld is a single one in cluster to ensure serialization with data events
+ some current_thd removals
+ enabling kill of sql thread during shema sync wait


mysql-test/r/rpl_ndb_log.result:
  Bug #18947  	CRBR: order in binlog of create table and insert (on different table) not determ
mysql-test/t/disabled.def:
  Bug #18947  	CRBR: order in binlog of create table and insert (on different table) not determ
parent 49c664d3
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -47,6 +47,10 @@ master-bin.000001 # Table_map 1 # table_id: # (test.t1)
flush logs;
create table t3 (a int)ENGINE=NDB;
start slave;

let $result_pattern= '%127.0.0.1%root%master-bin.000002%slave-relay-bin.000005%Yes%Yes%0%0%None%' ;

--source include/wait_slave_status.inc
flush logs;
stop slave;
create table t2 (n int)ENGINE=NDB;
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ rpl_ndb_commit_afterflush : BUG#19328 2006-05-04 tomas Slave timeout with COM_RE
rpl_ndb_dd_partitions    : BUG#19259 2006-04-21 rpl_ndb_dd_partitions fails on s/AMD
rpl_ndb_ddl              : BUG#18946 result file needs update + test needs to checked
rpl_ndb_innodb2ndb       : Bug #19710  	Cluster replication to partition table fails on DELETE FROM statement
rpl_ndb_log              : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
#rpl_ndb_log              : BUG#18947 2006-03-21 tomas CRBR: order in binlog of create table and insert (on different table) not determ
rpl_ndb_myisam2ndb       : Bug #19710  	Cluster replication to partition table fails on DELETE FROM statement
rpl_switch_stm_row_mixed : BUG#18590 2006-03-28 brian
rpl_row_blob_innodb      : BUG#18980 2006-04-10 kent    Test fails randomly
+9 −6
Original line number Diff line number Diff line
@@ -5054,6 +5054,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
                            const char *db,
                            const char *table_name)
{
  THD *thd= current_thd;
  DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
  NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG
@@ -5085,7 +5086,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
      ndb_table_version= h->m_table->getObjectVersion();
    }
#endif
    h->release_metadata(current_thd, ndb);
    h->release_metadata(thd, ndb);
  }
  else
  {
@@ -5151,8 +5152,8 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,

  if (!IS_TMP_PREFIX(table_name) && share)
  {
    ndbcluster_log_schema_op(current_thd, share,
                             current_thd->query, current_thd->query_length,
    ndbcluster_log_schema_op(thd, share,
                             thd->query, thd->query_length,
                             share->db, share->table_name,
                             ndb_table_id, ndb_table_version,
                             SOT_DROP_TABLE, 0, 0, 1);
@@ -5733,6 +5734,7 @@ int ndbcluster_drop_database_impl(const char *path)

static void ndbcluster_drop_database(char *path)
{
  THD *thd= current_thd;
  DBUG_ENTER("ndbcluster_drop_database");
#ifdef HAVE_NDB_BINLOG
  /*
@@ -5750,8 +5752,8 @@ static void ndbcluster_drop_database(char *path)
#ifdef HAVE_NDB_BINLOG
  char db[FN_REFLEN];
  ha_ndbcluster::set_dbname(path, db);
  ndbcluster_log_schema_op(current_thd, 0,
                           current_thd->query, current_thd->query_length,
  ndbcluster_log_schema_op(thd, 0,
                           thd->query, thd->query_length,
                           db, "", 0, 0, SOT_DROP_DB, 0, 0, 0);
#endif
  DBUG_VOID_RETURN;
@@ -6827,6 +6829,7 @@ static void dbug_print_open_tables()
*/
int handle_trailing_share(NDB_SHARE *share)
{
  THD *thd= current_thd;
  static ulong trailing_share_id= 0;
  DBUG_ENTER("handle_trailing_share");

@@ -6837,7 +6840,7 @@ int handle_trailing_share(NDB_SHARE *share)
  bzero((char*) &table_list,sizeof(table_list));
  table_list.db= share->db;
  table_list.alias= table_list.table_name= share->table_name;
  close_cached_tables(current_thd, 0, &table_list, TRUE);
  close_cached_tables(thd, 0, &table_list, TRUE);

  pthread_mutex_lock(&ndbcluster_mutex);
  if (!--share->use_count)
+215 −191
Original line number Diff line number Diff line
@@ -972,6 +972,154 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
  return buf;
}

/*
  acknowledge handling of schema operation
*/
static int
ndbcluster_update_slock(THD *thd,
                        const char *db,
                        const char *table_name)
{
  DBUG_ENTER("ndbcluster_update_slock");
  if (!schema_share)
  {
    DBUG_RETURN(0);
  }

  const NdbError *ndb_error= 0;
  uint32 node_id= g_ndb_cluster_connection->node_id();
  Ndb *ndb= check_ndb_in_thd(thd);
  char save_db[FN_HEADLEN];
  strcpy(save_db, ndb->getDatabaseName());

  char tmp_buf[FN_REFLEN];
  NDBDICT *dict= ndb->getDictionary();
  ndb->setDatabaseName(NDB_REP_DB);
  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
  const NDBTAB *ndbtab= ndbtab_g.get_table();
  NdbTransaction *trans= 0;
  int retries= 100;
  const NDBCOL *col[SCHEMA_SIZE];
  unsigned sz[SCHEMA_SIZE];

  MY_BITMAP slock;
  uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
  bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);

  if (ndbtab == 0)
  {
    abort();
    DBUG_RETURN(0);
  }

  {
    uint i;
    for (i= 0; i < SCHEMA_SIZE; i++)
    {
      col[i]= ndbtab->getColumn(i);
      if (i != SCHEMA_QUERY_I)
      {
        sz[i]= col[i]->getLength();
        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
      }
    }
  }

  while (1)
  {
    if ((trans= ndb->startTransaction()) == 0)
      goto err;
    {
      NdbOperation *op= 0;
      int r= 0;

      /* read the bitmap exlusive */
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->readTupleExclusive();
      DBUG_ASSERT(r == 0);
    
      /* db */
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
                       strlen(table_name));
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
      DBUG_ASSERT(r == 0);
    }
    if (trans->execute(NdbTransaction::NoCommit))
      goto err;
    bitmap_clear_bit(&slock, node_id);
    {
      NdbOperation *op= 0;
      int r= 0;

      /* now update the tuple */
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->updateTuple();
      DBUG_ASSERT(r == 0);

      /* db */
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
                       strlen(table_name));
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
      DBUG_ASSERT(r == 0);
      /* node_id */
      r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
      DBUG_ASSERT(r == 0);
      /* type */
      r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
      DBUG_ASSERT(r == 0);
    }
    if (trans->execute(NdbTransaction::Commit) == 0)
    {
      dict->forceGCPWait();
      DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
                          node_id, db, table_name));
      break;
    }
  err:
    const NdbError *this_error= trans ?
      &trans->getNdbError() : &ndb->getNdbError();
    if (this_error->status == NdbError::TemporaryError)
    {
      if (retries--)
      {
        if (trans)
          ndb->closeTransaction(trans);
        continue; // retry
      }
    }
    ndb_error= this_error;
    break;
  }
end:
  if (ndb_error)
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        ndb_error->code,
                        ndb_error->message,
                        "Could not release lock on '%s.%s'",
                        db, table_name);
  if (trans)
    ndb->closeTransaction(trans);
  ndb->setDatabaseName(save_db);
  DBUG_RETURN(0);
}

/*
  log query in schema table
*/
@@ -1088,8 +1236,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  Uint64 epoch= 0;
  MY_BITMAP schema_subscribers;
  uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
  uint32 bitbuf_e[sizeof(bitbuf)];
  bzero((char *)bitbuf_e, sizeof(bitbuf_e));
  char bitbuf_e[sizeof(bitbuf)];
  bzero(bitbuf_e, sizeof(bitbuf_e));
  {
    int i, updated= 0;
    int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
@@ -1108,7 +1256,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
    }
    (void) pthread_mutex_unlock(&schema_share->mutex);
    if (updated)
    {
      bitmap_clear_bit(&schema_subscribers, node_id);
      /*
        if setting own acknowledge bit it is important that
        no other mysqld's are registred, as subsequent code
        will cause the original event to be hidden (by blob
        merge event code)
      */
      if (bitmap_is_clear_all(&schema_subscribers))
          bitmap_set_bit(&schema_subscribers, node_id);
    }
    else
      bitmap_clear_all(&schema_subscribers);

@@ -1221,7 +1379,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
      {
        log_db= new_db;
        log_tab= new_table_name;
        log_subscribers= (const char *)bitbuf_e; // no ack expected on this
        log_subscribers= bitbuf_e; // no ack expected on this
        log_type= (uint32)SOT_RENAME_TABLE_NEW;
        continue;
      }
@@ -1229,7 +1387,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
    }
    if (trans->execute(NdbTransaction::Commit) == 0)
    {
      dict->forceGCPWait();
      DBUG_PRINT("info", ("logged: %s", query));
      break;
    }
@@ -1250,7 +1407,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  }
end:
  if (ndb_error)
    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        ndb_error->code,
                        ndb_error->message,
@@ -1266,6 +1423,15 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  if (ndb_error == 0 &&
      !bitmap_is_clear_all(&schema_subscribers))
  {
    /*
      if own nodeid is set we are a single mysqld registred
      as an optimization we update the slock directly
    */
    if (bitmap_is_set(&schema_subscribers, node_id))
      ndbcluster_update_slock(thd, db, table_name);
    else
      dict->forceGCPWait();

    int max_timeout= opt_ndb_sync_timeout;
    (void) pthread_mutex_lock(&ndb_schema_object->mutex);
    if (have_lock_open)
@@ -1282,7 +1448,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
      int ret= pthread_cond_timedwait(&injector_cond,
                                      &ndb_schema_object->mutex,
                                      &abstime);

      if (thd->killed)
        break;
      (void) pthread_mutex_lock(&schema_share->mutex);
      for (i= 0; i < no_storage_nodes; i++)
      {
@@ -1330,154 +1497,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  DBUG_RETURN(0);
}

/*
  acknowledge handling of schema operation
*/
static int
ndbcluster_update_slock(THD *thd,
                        const char *db,
                        const char *table_name)
{
  DBUG_ENTER("ndbcluster_update_slock");
  if (!schema_share)
  {
    DBUG_RETURN(0);
  }

  const NdbError *ndb_error= 0;
  uint32 node_id= g_ndb_cluster_connection->node_id();
  Ndb *ndb= check_ndb_in_thd(thd);
  char save_db[FN_HEADLEN];
  strcpy(save_db, ndb->getDatabaseName());

  char tmp_buf[FN_REFLEN];
  NDBDICT *dict= ndb->getDictionary();
  ndb->setDatabaseName(NDB_REP_DB);
  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
  const NDBTAB *ndbtab= ndbtab_g.get_table();
  NdbTransaction *trans= 0;
  int retries= 100;
  const NDBCOL *col[SCHEMA_SIZE];
  unsigned sz[SCHEMA_SIZE];

  MY_BITMAP slock;
  uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
  bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);

  if (ndbtab == 0)
  {
    abort();
    DBUG_RETURN(0);
  }

  {
    uint i;
    for (i= 0; i < SCHEMA_SIZE; i++)
    {
      col[i]= ndbtab->getColumn(i);
      if (i != SCHEMA_QUERY_I)
      {
        sz[i]= col[i]->getLength();
        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
      }
    }
  }

  while (1)
  {
    if ((trans= ndb->startTransaction()) == 0)
      goto err;
    {
      NdbOperation *op= 0;
      int r= 0;

      /* read the bitmap exlusive */
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->readTupleExclusive();
      DBUG_ASSERT(r == 0);
    
      /* db */
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
                       strlen(table_name));
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
      DBUG_ASSERT(r == 0);
    }
    if (trans->execute(NdbTransaction::NoCommit))
      goto err;
    bitmap_clear_bit(&slock, node_id);
    {
      NdbOperation *op= 0;
      int r= 0;

      /* now update the tuple */
      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
      DBUG_ASSERT(r == 0);
      r|= op->updateTuple();
      DBUG_ASSERT(r == 0);

      /* db */
      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
      r|= op->equal(SCHEMA_DB_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* name */
      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
                       strlen(table_name));
      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
      DBUG_ASSERT(r == 0);
      /* slock */
      r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
      DBUG_ASSERT(r == 0);
      /* node_id */
      r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
      DBUG_ASSERT(r == 0);
      /* type */
      r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
      DBUG_ASSERT(r == 0);
    }
    if (trans->execute(NdbTransaction::Commit) == 0)
    {
      dict->forceGCPWait();
      DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
                          node_id, db, table_name));
      break;
    }
  err:
    const NdbError *this_error= trans ?
      &trans->getNdbError() : &ndb->getNdbError();
    if (this_error->status == NdbError::TemporaryError)
    {
      if (retries--)
      {
        if (trans)
          ndb->closeTransaction(trans);
        continue; // retry
      }
    }
    ndb_error= this_error;
    break;
  }
end:
  if (ndb_error)
    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        ndb_error->code,
                        ndb_error->message,
                        "Could not release lock on '%s.%s'",
                        db, table_name);
  if (trans)
    ndb->closeTransaction(trans);
  ndb->setDatabaseName(save_db);
  DBUG_RETURN(0);
}

/*
  Handle _non_ data events from the storage nodes
*/
@@ -1701,9 +1720,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
      bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
      uint node_id= g_ndb_cluster_connection->node_id();
      ndbcluster_get_schema(tmp_share, schema);
      if (schema->node_id != node_id)
      {
        int log_query= 0, post_epoch_unlock= 0;
      DBUG_PRINT("info",
                 ("%s.%s: log query_length: %d  query: '%s'  type: %d",
                  schema->db, schema->name,
@@ -1711,6 +1727,29 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
                  schema->type));
      char key[FN_REFLEN];
      build_table_filename(key, sizeof(key), schema->db, schema->name, "");
      if ((enum SCHEMA_OP_TYPE)schema->type == SOT_CLEAR_SLOCK)
      {
        pthread_mutex_lock(&ndbcluster_mutex);
        NDB_SCHEMA_OBJECT *ndb_schema_object=
          (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
                                           (byte*) key, strlen(key));
        if (ndb_schema_object)
        {
          pthread_mutex_lock(&ndb_schema_object->mutex);
          memcpy(ndb_schema_object->slock, schema->slock,
                 sizeof(ndb_schema_object->slock));
          DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
                    (char*)ndb_schema_object->slock_bitmap.bitmap,
                    no_bytes_in_map(&ndb_schema_object->slock_bitmap));
          pthread_mutex_unlock(&ndb_schema_object->mutex);
          pthread_cond_signal(&injector_cond);
        }
        pthread_mutex_unlock(&ndbcluster_mutex);
        DBUG_RETURN(0);
      }
      if (schema->node_id != node_id)
      {
        int log_query= 0, post_epoch_unlock= 0;
        switch ((enum SCHEMA_OP_TYPE)schema->type)
        {
        case SOT_DROP_TABLE:
@@ -1759,30 +1798,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
                    TRUE,    /* print error */
                    FALSE);  /* binlog the query */
          break;
        case SOT_CLEAR_SLOCK:
        {
          pthread_mutex_lock(&ndbcluster_mutex);
          NDB_SCHEMA_OBJECT *ndb_schema_object=
            (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects,
                                             (byte*) key, strlen(key));
          if (ndb_schema_object)
          {
            pthread_mutex_lock(&ndb_schema_object->mutex);
            memcpy(ndb_schema_object->slock, schema->slock,
                   sizeof(ndb_schema_object->slock));
            DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
                      (char*)ndb_schema_object->slock_bitmap.bitmap,
                      no_bytes_in_map(&ndb_schema_object->slock_bitmap));
            pthread_mutex_unlock(&ndb_schema_object->mutex);
            pthread_cond_signal(&injector_cond);
          }
          pthread_mutex_unlock(&ndbcluster_mutex);
          DBUG_RETURN(0);
        }
        case SOT_TABLESPACE:
        case SOT_LOGFILE_GROUP:
          log_query= 1;
          break;
        case SOT_CLEAR_SLOCK:
          abort();
        }
        if (log_query && ndb_binlog_running)
        {
@@ -2349,6 +2370,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
                        const char *event_name, NDB_SHARE *share,
                        int push_warning)
{
  THD *thd= current_thd;
  DBUG_ENTER("ndbcluster_create_event");
  DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
                      ndbtab->getName(), ndbtab->getObjectVersion(),
@@ -2378,7 +2400,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
                      "with BLOB attribute and no PK is not supported",
                      share->key);
      if (push_warning)
        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                            ER_ILLEGAL_HA_CREATE_OPTION,
                            ER(ER_ILLEGAL_HA_CREATE_OPTION),
                            ndbcluster_hton.name,
@@ -2422,7 +2444,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
        failed, print a warning
      */
      if (push_warning)
        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                            dict->getNdbError().code,
                            dict->getNdbError().message, "NDB");
@@ -2450,7 +2472,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
        dict->dropEvent(my_event.getName()))
    {
      if (push_warning)
        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                            dict->getNdbError().code,
                            dict->getNdbError().message, "NDB");
@@ -2469,7 +2491,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
    if (dict->createEvent(my_event))
    {
      if (push_warning)
        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                            dict->getNdbError().code,
                            dict->getNdbError().message, "NDB");
@@ -2482,7 +2504,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
      DBUG_RETURN(-1);
    }
#ifdef NDB_BINLOG_EXTRA_WARNINGS
    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                        0, "NDB Binlog: Removed trailing event",
                        "NDB");
@@ -2511,6 +2533,7 @@ int
ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
                            const char *event_name)
{
  THD *thd= current_thd;
  /*
    we are in either create table or rename table so table should be
    locked, hence we can work with the share without locks
@@ -2584,7 +2607,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
    {
      sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
                      " %s",event_name);
      push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                          ndb->getNdbError().code,
                          ndb->getNdbError().message,
@@ -2634,7 +2657,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
            sql_print_error("NDB Binlog: Creating NdbEventOperation"
                            " blob field %u handles failed (code=%d) for %s",
                            j, op->getNdbError().code, event_name);
            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
            push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                                ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                                op->getNdbError().code,
                                op->getNdbError().message,
@@ -2671,7 +2694,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
        retries= 0;
      if (retries == 0)
      {
        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                            ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 
                            op->getNdbError().code, op->getNdbError().message,
                            "NDB");
@@ -2727,7 +2750,7 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
    if (dict->getNdbError().code != 4710)
    {
      /* drop event failed for some reason, issue a warning */
      push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
                          dict->getNdbError().code,
                          dict->getNdbError().message, "NDB");
@@ -2780,7 +2803,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
    int ret= pthread_cond_timedwait(&injector_cond,
                                    &share->mutex,
                                    &abstime);
    if (share->op == 0)
    if (thd->killed ||
        share->op == 0)
      break;
    if (ret)
    {