Loading sql/ha_ndbcluster.cc +1 −7 Original line number Diff line number Diff line Loading @@ -4770,13 +4770,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) "Creating event for logging table failed. " "See error log for details."); } if (is_old_table_tmpfile) ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, m_dbname, new_tabname, 0, 0, SOT_ALTER_TABLE); else if (!is_old_table_tmpfile) ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, m_dbname, new_tabname, Loading sql/ha_ndbcluster_binlog.cc +56 −14 Original line number Diff line number Diff line Loading @@ -441,6 +441,7 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, break; case LOGCOM_ALTER_TABLE: type= SOT_ALTER_TABLE; log= 1; break; case LOGCOM_RENAME_TABLE: type= SOT_RENAME_TABLE; Loading @@ -461,8 +462,10 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, break; } if (log) { ndbcluster_log_schema_op(thd, 0, query, query_length, db, table_name, 0, 0, type); } DBUG_VOID_RETURN; } Loading Loading @@ -891,6 +894,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; int get_a_share= 0; switch (type) { case SOT_DROP_TABLE: Loading @@ -901,12 +905,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); break; // fall through case SOT_CREATE_TABLE: break; // fall through case SOT_RENAME_TABLE: break; // fall through case SOT_ALTER_TABLE: if (!share) get_a_share= 1; break; case SOT_DROP_DB: break; Loading @@ -922,6 +928,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, abort(); /* should not happen, programming error */ } if (get_a_share) { char key[FN_REFLEN]; (void)strxnmov(key, FN_REFLEN, share_prefix, db, "/", table_name, NullS); share= get_share(key, 0, false, false); } const NdbError *ndb_error= 0; uint32 node_id= g_ndb_cluster_connection->node_id(); Uint64 epoch= 0; Loading Loading @@ -956,7 +970,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } Ndb *ndb= thd_ndb->ndb; char old_db[128]; char old_db[FN_REFLEN]; strcpy(old_db, ndb->getDatabaseName()); char tmp_buf[SCHEMA_QUERY_SIZE]; Loading @@ -974,9 +988,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, strcmp(NDB_SCHEMA_TABLE, table_name)) { ndb_error= &dict->getNdbError(); goto end; } DBUG_RETURN(0); goto end; } { Loading Loading @@ -1119,6 +1132,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } (void) pthread_mutex_unlock(&share->mutex); } if (get_a_share) free_share(&share); DBUG_RETURN(0); } Loading Loading @@ -1328,7 +1345,10 @@ static int ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, NdbEventOperation *pOp, List<Cluster_replication_schema> *schema_list, MEM_ROOT *mem_root) *post_epoch_log_list, List<Cluster_replication_schema> *post_epoch_unlock_list, MEM_ROOT *mem_root) { DBUG_ENTER("ndb_binlog_thread_handle_schema_event"); NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); Loading Loading @@ -1357,7 +1377,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, { case SOT_DROP_TABLE: /* binlog dropping table after any table operations */ schema_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root); log_query= 0; break; case SOT_RENAME_TABLE: Loading Loading @@ -1389,7 +1409,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE, /* print error */ TRUE); /* don't binlog the query */ /* binlog dropping database after any table operations */ schema_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root); log_query= 0; break; case SOT_CREATE_DB: Loading Loading @@ -1431,8 +1451,19 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, { DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); if (bitmap_is_set(&slock, node_id)) { /* If it is an SOT_ALTER_TABLE we need to acknowledge the schema operation _after_ all the events have been processed so that all schema events coming through the event operation has been processed */ if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE) post_epoch_unlock_list->push_back(schema, mem_root); else ndbcluster_update_slock(thd, schema->db, schema->name); } } if (log_query) { Loading Loading @@ -2738,7 +2769,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) MEM_ROOT *old_root= *root_ptr; MEM_ROOT mem_root; init_sql_alloc(&mem_root, 4096, 0); List<Cluster_replication_schema> schema_list; List<Cluster_replication_schema> post_epoch_log_list; List<Cluster_replication_schema> post_epoch_unlock_list; *root_ptr= &mem_root; if (unlikely(schema_res > 0)) Loading @@ -2751,7 +2783,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { if (!pOp->hasError()) ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp, &schema_list, &mem_root); &post_epoch_log_list, &post_epoch_unlock_list, &mem_root); else sql_print_error("NDB: error %lu (%s) on handling " "binlog schema event", Loading Loading @@ -2878,9 +2912,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } } /* process any operations that should be done after the epoch is complete */ { Cluster_replication_schema *schema; while ((schema= schema_list.pop())) while ((schema= post_epoch_unlock_list.pop())) { ndbcluster_update_slock(thd, schema->db, schema->name); } while ((schema= post_epoch_log_list.pop())) { char *thd_db_save= thd->db; thd->db= schema->db; Loading sql/ha_ndbcluster_binlog.h +16 −10 Original line number Diff line number Diff line Loading @@ -29,18 +29,24 @@ extern ulong ndb_extra_logging; #define INJECTOR_EVENT_LEN 200 /* The numbers below must not change as they are passed between mysql servers, and if changed would break compatablility. Add new numbers to the end. */ enum SCHEMA_OP_TYPE { SOT_DROP_TABLE, SOT_CREATE_TABLE, SOT_RENAME_TABLE, SOT_ALTER_TABLE, SOT_DROP_DB, SOT_CREATE_DB, SOT_ALTER_DB, SOT_CLEAR_SLOCK, SOT_TABLESPACE, SOT_LOGFILE_GROUP SOT_DROP_TABLE= 0, SOT_CREATE_TABLE= 1, SOT_RENAME_TABLE= 2, SOT_ALTER_TABLE= 3, SOT_DROP_DB= 4, SOT_CREATE_DB= 5, SOT_ALTER_DB= 6, SOT_CLEAR_SLOCK= 7, SOT_TABLESPACE= 8, SOT_LOGFILE_GROUP= 9 }; const uint max_ndb_nodes= 64; /* multiple of 32 */ Loading sql/handler.cc +17 −7 Original line number Diff line number Diff line Loading @@ -2529,12 +2529,11 @@ struct binlog_log_query_st const char *table_name; }; static my_bool binlog_log_query_handlerton(THD *thd, st_plugin_int *plugin, static my_bool binlog_log_query_handlerton2(THD *thd, const handlerton *hton, void *args) { struct binlog_log_query_st *b= (struct binlog_log_query_st*)args; handlerton *hton= (handlerton *) plugin->plugin->info; if (hton->state == SHOW_OPTION_YES && hton->binlog_log_query) hton->binlog_log_query(thd, b->binlog_command, Loading @@ -2545,7 +2544,15 @@ static my_bool binlog_log_query_handlerton(THD *thd, return FALSE; } void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, static my_bool binlog_log_query_handlerton(THD *thd, st_plugin_int *plugin, void *args) { return binlog_log_query_handlerton2(thd, (const handlerton *) plugin->plugin->info, args); } void ha_binlog_log_query(THD *thd, const handlerton *hton, enum_binlog_command binlog_command, const char *query, uint query_length, const char *db, const char *table_name) { Loading @@ -2555,8 +2562,11 @@ void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, b.query_length= query_length; b.db= db; b.table_name= table_name; if (hton == 0) plugin_foreach(thd, binlog_log_query_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &b); else binlog_log_query_handlerton2(thd, hton, &b); } #endif Loading sql/handler.h +2 −1 Original line number Diff line number Diff line Loading @@ -2020,7 +2020,8 @@ int ha_repl_report_replication_stop(THD *thd); int ha_reset_logs(THD *thd); int ha_binlog_index_purge_file(THD *thd, const char *file); void ha_reset_slave(THD *thd); void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, void ha_binlog_log_query(THD *thd, const handlerton *db_type, enum_binlog_command binlog_command, const char *query, uint query_length, const char *db, const char *table_name); void ha_binlog_wait(THD *thd); Loading Loading
sql/ha_ndbcluster.cc +1 −7 Original line number Diff line number Diff line Loading @@ -4770,13 +4770,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) "Creating event for logging table failed. " "See error log for details."); } if (is_old_table_tmpfile) ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, m_dbname, new_tabname, 0, 0, SOT_ALTER_TABLE); else if (!is_old_table_tmpfile) ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, m_dbname, new_tabname, Loading
sql/ha_ndbcluster_binlog.cc +56 −14 Original line number Diff line number Diff line Loading @@ -441,6 +441,7 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, break; case LOGCOM_ALTER_TABLE: type= SOT_ALTER_TABLE; log= 1; break; case LOGCOM_RENAME_TABLE: type= SOT_RENAME_TABLE; Loading @@ -461,8 +462,10 @@ ndbcluster_binlog_log_query(THD *thd, enum_binlog_command binlog_command, break; } if (log) { ndbcluster_log_schema_op(thd, 0, query, query_length, db, table_name, 0, 0, type); } DBUG_VOID_RETURN; } Loading Loading @@ -891,6 +894,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; int get_a_share= 0; switch (type) { case SOT_DROP_TABLE: Loading @@ -901,12 +905,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); break; // fall through case SOT_CREATE_TABLE: break; // fall through case SOT_RENAME_TABLE: break; // fall through case SOT_ALTER_TABLE: if (!share) get_a_share= 1; break; case SOT_DROP_DB: break; Loading @@ -922,6 +928,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, abort(); /* should not happen, programming error */ } if (get_a_share) { char key[FN_REFLEN]; (void)strxnmov(key, FN_REFLEN, share_prefix, db, "/", table_name, NullS); share= get_share(key, 0, false, false); } const NdbError *ndb_error= 0; uint32 node_id= g_ndb_cluster_connection->node_id(); Uint64 epoch= 0; Loading Loading @@ -956,7 +970,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } Ndb *ndb= thd_ndb->ndb; char old_db[128]; char old_db[FN_REFLEN]; strcpy(old_db, ndb->getDatabaseName()); char tmp_buf[SCHEMA_QUERY_SIZE]; Loading @@ -974,9 +988,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, strcmp(NDB_SCHEMA_TABLE, table_name)) { ndb_error= &dict->getNdbError(); goto end; } DBUG_RETURN(0); goto end; } { Loading Loading @@ -1119,6 +1132,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } (void) pthread_mutex_unlock(&share->mutex); } if (get_a_share) free_share(&share); DBUG_RETURN(0); } Loading Loading @@ -1328,7 +1345,10 @@ static int ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, NdbEventOperation *pOp, List<Cluster_replication_schema> *schema_list, MEM_ROOT *mem_root) *post_epoch_log_list, List<Cluster_replication_schema> *post_epoch_unlock_list, MEM_ROOT *mem_root) { DBUG_ENTER("ndb_binlog_thread_handle_schema_event"); NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); Loading Loading @@ -1357,7 +1377,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, { case SOT_DROP_TABLE: /* binlog dropping table after any table operations */ schema_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root); log_query= 0; break; case SOT_RENAME_TABLE: Loading Loading @@ -1389,7 +1409,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE, /* print error */ TRUE); /* don't binlog the query */ /* binlog dropping database after any table operations */ schema_list->push_back(schema, mem_root); post_epoch_log_list->push_back(schema, mem_root); log_query= 0; break; case SOT_CREATE_DB: Loading Loading @@ -1431,8 +1451,19 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, { DBUG_DUMP("slock", (char*)schema->slock, schema->slock_length); if (bitmap_is_set(&slock, node_id)) { /* If it is an SOT_ALTER_TABLE we need to acknowledge the schema operation _after_ all the events have been processed so that all schema events coming through the event operation has been processed */ if ((enum SCHEMA_OP_TYPE)schema->type == SOT_ALTER_TABLE) post_epoch_unlock_list->push_back(schema, mem_root); else ndbcluster_update_slock(thd, schema->db, schema->name); } } if (log_query) { Loading Loading @@ -2738,7 +2769,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) MEM_ROOT *old_root= *root_ptr; MEM_ROOT mem_root; init_sql_alloc(&mem_root, 4096, 0); List<Cluster_replication_schema> schema_list; List<Cluster_replication_schema> post_epoch_log_list; List<Cluster_replication_schema> post_epoch_unlock_list; *root_ptr= &mem_root; if (unlikely(schema_res > 0)) Loading @@ -2751,7 +2783,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) { if (!pOp->hasError()) ndb_binlog_thread_handle_schema_event(thd, schema_ndb, pOp, &schema_list, &mem_root); &post_epoch_log_list, &post_epoch_unlock_list, &mem_root); else sql_print_error("NDB: error %lu (%s) on handling " "binlog schema event", Loading Loading @@ -2878,9 +2912,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } } /* process any operations that should be done after the epoch is complete */ { Cluster_replication_schema *schema; while ((schema= schema_list.pop())) while ((schema= post_epoch_unlock_list.pop())) { ndbcluster_update_slock(thd, schema->db, schema->name); } while ((schema= post_epoch_log_list.pop())) { char *thd_db_save= thd->db; thd->db= schema->db; Loading
sql/ha_ndbcluster_binlog.h +16 −10 Original line number Diff line number Diff line Loading @@ -29,18 +29,24 @@ extern ulong ndb_extra_logging; #define INJECTOR_EVENT_LEN 200 /* The numbers below must not change as they are passed between mysql servers, and if changed would break compatablility. Add new numbers to the end. */ enum SCHEMA_OP_TYPE { SOT_DROP_TABLE, SOT_CREATE_TABLE, SOT_RENAME_TABLE, SOT_ALTER_TABLE, SOT_DROP_DB, SOT_CREATE_DB, SOT_ALTER_DB, SOT_CLEAR_SLOCK, SOT_TABLESPACE, SOT_LOGFILE_GROUP SOT_DROP_TABLE= 0, SOT_CREATE_TABLE= 1, SOT_RENAME_TABLE= 2, SOT_ALTER_TABLE= 3, SOT_DROP_DB= 4, SOT_CREATE_DB= 5, SOT_ALTER_DB= 6, SOT_CLEAR_SLOCK= 7, SOT_TABLESPACE= 8, SOT_LOGFILE_GROUP= 9 }; const uint max_ndb_nodes= 64; /* multiple of 32 */ Loading
sql/handler.cc +17 −7 Original line number Diff line number Diff line Loading @@ -2529,12 +2529,11 @@ struct binlog_log_query_st const char *table_name; }; static my_bool binlog_log_query_handlerton(THD *thd, st_plugin_int *plugin, static my_bool binlog_log_query_handlerton2(THD *thd, const handlerton *hton, void *args) { struct binlog_log_query_st *b= (struct binlog_log_query_st*)args; handlerton *hton= (handlerton *) plugin->plugin->info; if (hton->state == SHOW_OPTION_YES && hton->binlog_log_query) hton->binlog_log_query(thd, b->binlog_command, Loading @@ -2545,7 +2544,15 @@ static my_bool binlog_log_query_handlerton(THD *thd, return FALSE; } void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, static my_bool binlog_log_query_handlerton(THD *thd, st_plugin_int *plugin, void *args) { return binlog_log_query_handlerton2(thd, (const handlerton *) plugin->plugin->info, args); } void ha_binlog_log_query(THD *thd, const handlerton *hton, enum_binlog_command binlog_command, const char *query, uint query_length, const char *db, const char *table_name) { Loading @@ -2555,8 +2562,11 @@ void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, b.query_length= query_length; b.db= db; b.table_name= table_name; if (hton == 0) plugin_foreach(thd, binlog_log_query_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &b); else binlog_log_query_handlerton2(thd, hton, &b); } #endif Loading
sql/handler.h +2 −1 Original line number Diff line number Diff line Loading @@ -2020,7 +2020,8 @@ int ha_repl_report_replication_stop(THD *thd); int ha_reset_logs(THD *thd); int ha_binlog_index_purge_file(THD *thd, const char *file); void ha_reset_slave(THD *thd); void ha_binlog_log_query(THD *thd, enum_binlog_command binlog_command, void ha_binlog_log_query(THD *thd, const handlerton *db_type, enum_binlog_command binlog_command, const char *query, uint query_length, const char *db, const char *table_name); void ha_binlog_wait(THD *thd); Loading