Loading mysql-test/extra/rpl_tests/rpl_row_blob.test +6 −0 Original line number Diff line number Diff line Loading @@ -183,3 +183,9 @@ connection master; DROP TABLE IF EXISTS test.t1; DROP TABLE IF EXISTS test.t2; # ensure cleanup on slave as well: # ndb blob tables consist of several tables # if cluster is shutdown while not all tables are # properly dropped, the table becomes inconsistent # and wrecks later test cases --sync_slave_with_master sql/ha_ndbcluster.cc +73 −20 Original line number Diff line number Diff line Loading @@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name, DBUG_RETURN(my_errno); } #ifdef HAVE_NDB_BINLOG /* Don't allow table creation unless schema distribution table is setup ( unless it is a creation of the schema dist table itself ) */ if (!schema_share && !(strcmp(m_dbname, NDB_REP_DB) == 0 && strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0)) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_RETURN(HA_ERR_NO_CONNECTION); } #endif /* HAVE_NDB_BINLOG */ DBUG_PRINT("table", ("name: %s", m_tabname)); tab.setName(m_tabname); tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); Loading Loading @@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) is_old_table_tmpfile= 0; String event_name(INJECTOR_EVENT_LEN); ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0); ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share); ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share, "rename table"); } if (!result && !IS_TMP_PREFIX(new_tabname)) Loading Loading @@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); NDBDICT *dict= ndb->getDictionary(); #ifdef HAVE_NDB_BINLOG /* Don't allow drop table unless schema distribution table is setup */ if (!schema_share) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_RETURN(HA_ERR_NO_CONNECTION); } NDB_SHARE *share= get_share(path, 0, false); #endif Loading Loading @@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0); ndbcluster_handle_drop_table(ndb, table_dropped ? event_name.c_ptr() : 0, share); share, "delete table"); } if (share) Loading Loading @@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name) set_dbname(name); set_tabname(name); #ifdef HAVE_NDB_BINLOG /* Don't allow drop table unless schema distribution table is setup */ if (!schema_share) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_RETURN(HA_ERR_NO_CONNECTION); } #endif if (check_ndb_connection()) DBUG_RETURN(HA_ERR_NO_CONNECTION); Loading Loading @@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) if (!res) info(HA_STATUS_VARIABLE | HA_STATUS_CONST); #ifdef HAVE_NDB_BINLOG if (!ndb_binlog_tables_inited && ndb_binlog_running) table->db_stat|= HA_READ_ONLY; #endif DBUG_RETURN(res); } Loading Loading @@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path) static void ndbcluster_drop_database(char *path) { DBUG_ENTER("ndbcluster_drop_database"); #ifdef HAVE_NDB_BINLOG /* Don't allow drop database unless schema distribution table is setup */ if (!schema_share) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_VOID_RETURN; //DBUG_RETURN(HA_ERR_NO_CONNECTION); } #endif ndbcluster_drop_database_impl(path); #ifdef HAVE_NDB_BINLOG char db[FN_REFLEN]; Loading @@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path) current_thd->query, current_thd->query_length, db, "", 0, 0, SOT_DROP_DB); #endif DBUG_VOID_RETURN; } /* find all tables in ndb and discover those needed Loading Loading @@ -5740,29 +5796,30 @@ int ndbcluster_find_all_files(THD *thd) NDBDICT *dict= ndb->getDictionary(); int unhandled, retries= 5; int unhandled, retries= 5, skipped; do { if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) ERR_RETURN(dict->getNdbError()); unhandled= 0; skipped= 0; retries--; for (uint i= 0 ; i < list.count ; i++) { NDBDICT::List::Element& elmt= list.elements[i]; int do_handle_table= 0; if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name)) { DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); continue; } DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); if (elmt.state == NDBOBJ::StateOnline || elmt.state == NDBOBJ::StateBackup) do_handle_table= 1; else if (!(elmt.state == NDBOBJ::StateBuilding)) if (elmt.state != NDBOBJ::StateOnline && elmt.state != NDBOBJ::StateBackup && elmt.state != NDBOBJ::StateBuilding) { sql_print_information("NDB: skipping setup table %s.%s, in state %d", elmt.database, elmt.name, elmt.state); skipped++; continue; } Loading @@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd) if (!(ndbtab= dict->getTable(elmt.name))) { if (do_handle_table) if (retries == 0) sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", elmt.database, elmt.name, dict->getNdbError().code, Loading Loading @@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd) pthread_mutex_unlock(&LOCK_open); } } while (unhandled && retries--); while (unhandled && retries); DBUG_RETURN(0); DBUG_RETURN(-(skipped + unhandled)); } int ndbcluster_find_files(THD *thd,const char *db,const char *path, Loading Loading @@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started); ndbcluster_util_inited= 1; /* Wait for cluster to start */ Loading Loading @@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) } #ifdef HAVE_NDB_BINLOG if (ndb_extra_logging && ndb_binlog_running) sql_print_information("NDB Binlog: Ndb tables initially read only."); /* create tables needed by the replication */ ndbcluster_setup_binlog_table_shares(thd); #else Loading @@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ndbcluster_find_all_files(thd); #endif ndbcluster_util_inited= 1; #ifdef HAVE_NDB_BINLOG /* Signal injector thread that all is setup */ pthread_cond_signal(&injector_cond); #endif set_timespec(abstime, 0); for (;!abort_loop;) { pthread_mutex_lock(&LOCK_ndb_util_thread); pthread_cond_timedwait(&COND_ndb_util_thread, &LOCK_ndb_util_thread, Loading @@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) Check that the apply_status_share and schema_share has been created. If not try to create it */ if (!apply_status_share || !schema_share) if (!ndb_binlog_tables_inited) ndbcluster_setup_binlog_table_shares(thd); #endif Loading sql/ha_ndbcluster_binlog.cc +103 −29 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0; FALSE if not */ my_bool ndb_binlog_running= FALSE; my_bool ndb_binlog_tables_inited= FALSE; /* Global reference to the ndb injector thread THD oject Loading Loading @@ -775,33 +776,51 @@ static int ndbcluster_create_schema_table(THD *thd) DBUG_RETURN(0); } void ndbcluster_setup_binlog_table_shares(THD *thd) int ndbcluster_setup_binlog_table_shares(THD *thd) { int done_find_all_files= 0; if (!schema_share && ndbcluster_check_schema_share() == 0) { if (!done_find_all_files) pthread_mutex_lock(&LOCK_open); ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE); pthread_mutex_unlock(&LOCK_open); if (!schema_share) { ndbcluster_find_all_files(thd); done_find_all_files= 1; } ndbcluster_create_schema_table(thd); // always make sure we create the 'schema' first if (!schema_share) return; return 1; } } if (!apply_status_share && ndbcluster_check_apply_status_share() == 0) { if (!done_find_all_files) pthread_mutex_lock(&LOCK_open); ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE); pthread_mutex_unlock(&LOCK_open); if (!apply_status_share) { ndbcluster_find_all_files(thd); done_find_all_files= 1; } ndbcluster_create_apply_status_table(thd); if (!apply_status_share) return 1; } } if (!ndbcluster_find_all_files(thd)) { pthread_mutex_lock(&LOCK_open); ndb_binlog_tables_inited= TRUE; if (ndb_binlog_running) { if (ndb_extra_logging) sql_print_information("NDB Binlog: ndb tables writable"); close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); } pthread_mutex_unlock(&LOCK_open); /* Signal injector thread that all is setup */ pthread_cond_signal(&injector_cond); } return 0; } /* Defines and struct for schema table. Loading Loading @@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf, /* log query in schema table */ static void ndb_report_waiting(const char *key, int the_time, const char *op, const char *obj) { ulonglong ndb_latest_epoch= 0; const char *proc_info= "<no info>"; pthread_mutex_lock(&injector_mutex); if (injector_ndb) ndb_latest_epoch= injector_ndb->getLatestGCI(); if (injector_thd) proc_info= injector_thd->proc_info; pthread_mutex_unlock(&injector_mutex); sql_print_information("NDB %s:" " waiting max %u sec for %s %s." " epochs: (%u,%u,%u)" " injector proc_info: %s" ,key, the_time, op, obj ,(uint)ndb_latest_handled_binlog_epoch ,(uint)ndb_latest_received_binlog_epoch ,(uint)ndb_latest_epoch ,proc_info ); } int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, const char *query, int query_length, const char *db, const char *table_name, Loading Loading @@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; const char *type_str; switch (type) { case SOT_DROP_TABLE: Loading @@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); type_str= "drop table"; break; case SOT_RENAME_TABLE: /* redo the rename table query as is may contain several tables */ Loading @@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); type_str= "rename table"; break; case SOT_CREATE_TABLE: // fall through type_str= "create table"; break; case SOT_ALTER_TABLE: type_str= "create table"; break; case SOT_DROP_DB: type_str= "drop db"; break; case SOT_CREATE_DB: type_str= "create db"; break; case SOT_ALTER_DB: type_str= "alter db"; break; case SOT_TABLESPACE: type_str= "tablespace"; break; case SOT_LOGFILE_GROUP: type_str= "logfile group"; break; default: abort(); /* should not happen, programming error */ Loading Loading @@ -1201,13 +1255,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, max_timeout--; if (max_timeout == 0) { sql_print_error("NDB create table: timed out. Ignoring..."); sql_print_error("NDB %s: distibuting %s timed out. Ignoring...", type_str, ndb_schema_object->key); break; } if (ndb_extra_logging) sql_print_information("NDB create table: " "waiting max %u sec for create table %s.", max_timeout, ndb_schema_object->key); ndb_report_waiting(type_str, max_timeout, "distributing", ndb_schema_object->key); } (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } Loading Loading @@ -1689,9 +1743,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, // skip break; case NDBEVENT::TE_CLUSTER_FAILURE: // fall through case NDBEVENT::TE_DROP: if (ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); free_share(&schema_share); schema_share= 0; ndb_binlog_tables_inited= FALSE; // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, tmp_share); Loading Loading @@ -2494,9 +2554,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, get_share(share); if (do_apply_status_share) { apply_status_share= get_share(share); (void) pthread_cond_signal(&injector_cond); } else if (do_schema_share) { schema_share= get_share(share); (void) pthread_cond_signal(&injector_cond); } DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u", share->key, share->op, share->use_count)); Loading @@ -2513,7 +2579,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share) NDB_SHARE *share, const char *type_str) { DBUG_ENTER("ndbcluster_handle_drop_table"); Loading Loading @@ -2581,9 +2647,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, break; } if (ndb_extra_logging) sql_print_information("NDB delete table: " "waiting max %u sec for drop table %s.", max_timeout, share->key); ndb_report_waiting(type_str, max_timeout, type_str, share->key); } (void) pthread_mutex_unlock(&share->mutex); #else Loading Loading @@ -2660,13 +2725,18 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, switch (type) { case NDBEVENT::TE_CLUSTER_FAILURE: if (ndb_extra_logging) sql_print_information("NDB Binlog: cluster failure for %s.", share->key); if (apply_status_share == share) { if (ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; ndb_binlog_tables_inited= FALSE; } if (ndb_extra_logging) sql_print_information("NDB Binlog: cluster failure for %s.", share->key); DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: " "%s received share: 0x%lx op: %lx share op: %lx " "op_old: %lx", Loading @@ -2675,8 +2745,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, case NDBEVENT::TE_DROP: if (apply_status_share == share) { if (ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; ndb_binlog_tables_inited= FALSE; } /* ToDo: remove printout */ if (ndb_extra_logging) Loading Loading @@ -3087,7 +3162,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->proc_info= "Waiting for ndbcluster to start"; pthread_mutex_lock(&injector_mutex); while (!ndbcluster_util_inited) while (!schema_share || !apply_status_share) { /* ndb not connected yet */ struct timespec abstime; Loading Loading @@ -3119,10 +3194,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->db= db; if (ndb_binlog_running) open_binlog_index(thd, &binlog_tables, &binlog_index); if (!apply_status_share) { sql_print_error("NDB: Could not get apply status share"); } thd->db= db; } Loading Loading @@ -3184,6 +3255,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (unlikely(schema_res > 0)) { thd->proc_info= "Processing events from schema table"; schema_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); schema_ndb-> Loading Loading @@ -3379,6 +3451,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (trans.good()) { //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes); thd->proc_info= "Committing events to binlog"; injector::transaction::binlog_pos start= trans.start_pos(); if (int r= trans.commit()) { Loading Loading @@ -3418,6 +3491,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } err: DBUG_PRINT("info",("Shutting down cluster binlog thread")); thd->proc_info= "Shutting down"; close_thread_tables(thd); pthread_mutex_lock(&injector_mutex); /* don't mess with the injector_ndb anymore from other threads */ Loading sql/ha_ndbcluster_binlog.h +4 −2 Original line number Diff line number Diff line Loading @@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, const char *old_db= 0, const char *old_table_name= 0); int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share); NDB_SHARE *share, const char *type_str); void ndb_rep_event_name(String *event_name, const char *db, const char *tbl); int ndb_create_table_from_engine(THD *thd, const char *db, Loading @@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg); /* table cluster_replication.apply_status */ void ndbcluster_setup_binlog_table_shares(THD *thd); int ndbcluster_setup_binlog_table_shares(THD *thd); extern NDB_SHARE *apply_status_share; extern NDB_SHARE *schema_share; extern THD *injector_thd; extern my_bool ndb_binlog_running; extern my_bool ndb_binlog_tables_inited; bool ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, Loading Loading
mysql-test/extra/rpl_tests/rpl_row_blob.test +6 −0 Original line number Diff line number Diff line Loading @@ -183,3 +183,9 @@ connection master; DROP TABLE IF EXISTS test.t1; DROP TABLE IF EXISTS test.t2; # ensure cleanup on slave as well: # ndb blob tables consist of several tables # if cluster is shutdown while not all tables are # properly dropped, the table becomes inconsistent # and wrecks later test cases --sync_slave_with_master
sql/ha_ndbcluster.cc +73 −20 Original line number Diff line number Diff line Loading @@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name, DBUG_RETURN(my_errno); } #ifdef HAVE_NDB_BINLOG /* Don't allow table creation unless schema distribution table is setup ( unless it is a creation of the schema dist table itself ) */ if (!schema_share && !(strcmp(m_dbname, NDB_REP_DB) == 0 && strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0)) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_RETURN(HA_ERR_NO_CONNECTION); } #endif /* HAVE_NDB_BINLOG */ DBUG_PRINT("table", ("name: %s", m_tabname)); tab.setName(m_tabname); tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE)); Loading Loading @@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to) is_old_table_tmpfile= 0; String event_name(INJECTOR_EVENT_LEN); ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0); ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share); ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share, "rename table"); } if (!result && !IS_TMP_PREFIX(new_tabname)) Loading Loading @@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); NDBDICT *dict= ndb->getDictionary(); #ifdef HAVE_NDB_BINLOG /* Don't allow drop table unless schema distribution table is setup */ if (!schema_share) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_RETURN(HA_ERR_NO_CONNECTION); } NDB_SHARE *share= get_share(path, 0, false); #endif Loading Loading @@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb, ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0); ndbcluster_handle_drop_table(ndb, table_dropped ? event_name.c_ptr() : 0, share); share, "delete table"); } if (share) Loading Loading @@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name) set_dbname(name); set_tabname(name); #ifdef HAVE_NDB_BINLOG /* Don't allow drop table unless schema distribution table is setup */ if (!schema_share) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_RETURN(HA_ERR_NO_CONNECTION); } #endif if (check_ndb_connection()) DBUG_RETURN(HA_ERR_NO_CONNECTION); Loading Loading @@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) if (!res) info(HA_STATUS_VARIABLE | HA_STATUS_CONST); #ifdef HAVE_NDB_BINLOG if (!ndb_binlog_tables_inited && ndb_binlog_running) table->db_stat|= HA_READ_ONLY; #endif DBUG_RETURN(res); } Loading Loading @@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path) static void ndbcluster_drop_database(char *path) { DBUG_ENTER("ndbcluster_drop_database"); #ifdef HAVE_NDB_BINLOG /* Don't allow drop database unless schema distribution table is setup */ if (!schema_share) { DBUG_PRINT("info", ("Schema distribution table not setup")); DBUG_VOID_RETURN; //DBUG_RETURN(HA_ERR_NO_CONNECTION); } #endif ndbcluster_drop_database_impl(path); #ifdef HAVE_NDB_BINLOG char db[FN_REFLEN]; Loading @@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path) current_thd->query, current_thd->query_length, db, "", 0, 0, SOT_DROP_DB); #endif DBUG_VOID_RETURN; } /* find all tables in ndb and discover those needed Loading Loading @@ -5740,29 +5796,30 @@ int ndbcluster_find_all_files(THD *thd) NDBDICT *dict= ndb->getDictionary(); int unhandled, retries= 5; int unhandled, retries= 5, skipped; do { if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0) ERR_RETURN(dict->getNdbError()); unhandled= 0; skipped= 0; retries--; for (uint i= 0 ; i < list.count ; i++) { NDBDICT::List::Element& elmt= list.elements[i]; int do_handle_table= 0; if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name)) { DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name)); continue; } DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name)); if (elmt.state == NDBOBJ::StateOnline || elmt.state == NDBOBJ::StateBackup) do_handle_table= 1; else if (!(elmt.state == NDBOBJ::StateBuilding)) if (elmt.state != NDBOBJ::StateOnline && elmt.state != NDBOBJ::StateBackup && elmt.state != NDBOBJ::StateBuilding) { sql_print_information("NDB: skipping setup table %s.%s, in state %d", elmt.database, elmt.name, elmt.state); skipped++; continue; } Loading @@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd) if (!(ndbtab= dict->getTable(elmt.name))) { if (do_handle_table) if (retries == 0) sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s", elmt.database, elmt.name, dict->getNdbError().code, Loading Loading @@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd) pthread_mutex_unlock(&LOCK_open); } } while (unhandled && retries--); while (unhandled && retries); DBUG_RETURN(0); DBUG_RETURN(-(skipped + unhandled)); } int ndbcluster_find_files(THD *thd,const char *db,const char *path, Loading Loading @@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started); ndbcluster_util_inited= 1; /* Wait for cluster to start */ Loading Loading @@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) } #ifdef HAVE_NDB_BINLOG if (ndb_extra_logging && ndb_binlog_running) sql_print_information("NDB Binlog: Ndb tables initially read only."); /* create tables needed by the replication */ ndbcluster_setup_binlog_table_shares(thd); #else Loading @@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) ndbcluster_find_all_files(thd); #endif ndbcluster_util_inited= 1; #ifdef HAVE_NDB_BINLOG /* Signal injector thread that all is setup */ pthread_cond_signal(&injector_cond); #endif set_timespec(abstime, 0); for (;!abort_loop;) { pthread_mutex_lock(&LOCK_ndb_util_thread); pthread_cond_timedwait(&COND_ndb_util_thread, &LOCK_ndb_util_thread, Loading @@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) Check that the apply_status_share and schema_share has been created. If not try to create it */ if (!apply_status_share || !schema_share) if (!ndb_binlog_tables_inited) ndbcluster_setup_binlog_table_shares(thd); #endif Loading
sql/ha_ndbcluster_binlog.cc +103 −29 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0; FALSE if not */ my_bool ndb_binlog_running= FALSE; my_bool ndb_binlog_tables_inited= FALSE; /* Global reference to the ndb injector thread THD oject Loading Loading @@ -775,33 +776,51 @@ static int ndbcluster_create_schema_table(THD *thd) DBUG_RETURN(0); } void ndbcluster_setup_binlog_table_shares(THD *thd) int ndbcluster_setup_binlog_table_shares(THD *thd) { int done_find_all_files= 0; if (!schema_share && ndbcluster_check_schema_share() == 0) { if (!done_find_all_files) pthread_mutex_lock(&LOCK_open); ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE); pthread_mutex_unlock(&LOCK_open); if (!schema_share) { ndbcluster_find_all_files(thd); done_find_all_files= 1; } ndbcluster_create_schema_table(thd); // always make sure we create the 'schema' first if (!schema_share) return; return 1; } } if (!apply_status_share && ndbcluster_check_apply_status_share() == 0) { if (!done_find_all_files) pthread_mutex_lock(&LOCK_open); ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE); pthread_mutex_unlock(&LOCK_open); if (!apply_status_share) { ndbcluster_find_all_files(thd); done_find_all_files= 1; } ndbcluster_create_apply_status_table(thd); if (!apply_status_share) return 1; } } if (!ndbcluster_find_all_files(thd)) { pthread_mutex_lock(&LOCK_open); ndb_binlog_tables_inited= TRUE; if (ndb_binlog_running) { if (ndb_extra_logging) sql_print_information("NDB Binlog: ndb tables writable"); close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); } pthread_mutex_unlock(&LOCK_open); /* Signal injector thread that all is setup */ pthread_cond_signal(&injector_cond); } return 0; } /* Defines and struct for schema table. Loading Loading @@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf, /* log query in schema table */ static void ndb_report_waiting(const char *key, int the_time, const char *op, const char *obj) { ulonglong ndb_latest_epoch= 0; const char *proc_info= "<no info>"; pthread_mutex_lock(&injector_mutex); if (injector_ndb) ndb_latest_epoch= injector_ndb->getLatestGCI(); if (injector_thd) proc_info= injector_thd->proc_info; pthread_mutex_unlock(&injector_mutex); sql_print_information("NDB %s:" " waiting max %u sec for %s %s." " epochs: (%u,%u,%u)" " injector proc_info: %s" ,key, the_time, op, obj ,(uint)ndb_latest_handled_binlog_epoch ,(uint)ndb_latest_received_binlog_epoch ,(uint)ndb_latest_epoch ,proc_info ); } int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, const char *query, int query_length, const char *db, const char *table_name, Loading Loading @@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; const char *type_str; switch (type) { case SOT_DROP_TABLE: Loading @@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); type_str= "drop table"; break; case SOT_RENAME_TABLE: /* redo the rename table query as is may contain several tables */ Loading @@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); type_str= "rename table"; break; case SOT_CREATE_TABLE: // fall through type_str= "create table"; break; case SOT_ALTER_TABLE: type_str= "create table"; break; case SOT_DROP_DB: type_str= "drop db"; break; case SOT_CREATE_DB: type_str= "create db"; break; case SOT_ALTER_DB: type_str= "alter db"; break; case SOT_TABLESPACE: type_str= "tablespace"; break; case SOT_LOGFILE_GROUP: type_str= "logfile group"; break; default: abort(); /* should not happen, programming error */ Loading Loading @@ -1201,13 +1255,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, max_timeout--; if (max_timeout == 0) { sql_print_error("NDB create table: timed out. Ignoring..."); sql_print_error("NDB %s: distibuting %s timed out. Ignoring...", type_str, ndb_schema_object->key); break; } if (ndb_extra_logging) sql_print_information("NDB create table: " "waiting max %u sec for create table %s.", max_timeout, ndb_schema_object->key); ndb_report_waiting(type_str, max_timeout, "distributing", ndb_schema_object->key); } (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } Loading Loading @@ -1689,9 +1743,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, // skip break; case NDBEVENT::TE_CLUSTER_FAILURE: // fall through case NDBEVENT::TE_DROP: if (ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); free_share(&schema_share); schema_share= 0; ndb_binlog_tables_inited= FALSE; // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, tmp_share); Loading Loading @@ -2494,9 +2554,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, get_share(share); if (do_apply_status_share) { apply_status_share= get_share(share); (void) pthread_cond_signal(&injector_cond); } else if (do_schema_share) { schema_share= get_share(share); (void) pthread_cond_signal(&injector_cond); } DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u", share->key, share->op, share->use_count)); Loading @@ -2513,7 +2579,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share) NDB_SHARE *share, const char *type_str) { DBUG_ENTER("ndbcluster_handle_drop_table"); Loading Loading @@ -2581,9 +2647,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, break; } if (ndb_extra_logging) sql_print_information("NDB delete table: " "waiting max %u sec for drop table %s.", max_timeout, share->key); ndb_report_waiting(type_str, max_timeout, type_str, share->key); } (void) pthread_mutex_unlock(&share->mutex); #else Loading Loading @@ -2660,13 +2725,18 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, switch (type) { case NDBEVENT::TE_CLUSTER_FAILURE: if (ndb_extra_logging) sql_print_information("NDB Binlog: cluster failure for %s.", share->key); if (apply_status_share == share) { if (ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; ndb_binlog_tables_inited= FALSE; } if (ndb_extra_logging) sql_print_information("NDB Binlog: cluster failure for %s.", share->key); DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: " "%s received share: 0x%lx op: %lx share op: %lx " "op_old: %lx", Loading @@ -2675,8 +2745,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, case NDBEVENT::TE_DROP: if (apply_status_share == share) { if (ndb_extra_logging && ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); free_share(&apply_status_share); apply_status_share= 0; ndb_binlog_tables_inited= FALSE; } /* ToDo: remove printout */ if (ndb_extra_logging) Loading Loading @@ -3087,7 +3162,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->proc_info= "Waiting for ndbcluster to start"; pthread_mutex_lock(&injector_mutex); while (!ndbcluster_util_inited) while (!schema_share || !apply_status_share) { /* ndb not connected yet */ struct timespec abstime; Loading Loading @@ -3119,10 +3194,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) thd->db= db; if (ndb_binlog_running) open_binlog_index(thd, &binlog_tables, &binlog_index); if (!apply_status_share) { sql_print_error("NDB: Could not get apply status share"); } thd->db= db; } Loading Loading @@ -3184,6 +3255,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (unlikely(schema_res > 0)) { thd->proc_info= "Processing events from schema table"; schema_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); schema_ndb-> Loading Loading @@ -3379,6 +3451,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (trans.good()) { //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes); thd->proc_info= "Committing events to binlog"; injector::transaction::binlog_pos start= trans.start_pos(); if (int r= trans.commit()) { Loading Loading @@ -3418,6 +3491,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } err: DBUG_PRINT("info",("Shutting down cluster binlog thread")); thd->proc_info= "Shutting down"; close_thread_tables(thd); pthread_mutex_lock(&injector_mutex); /* don't mess with the injector_ndb anymore from other threads */ Loading
sql/ha_ndbcluster_binlog.h +4 −2 Original line number Diff line number Diff line Loading @@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, const char *old_db= 0, const char *old_table_name= 0); int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share); NDB_SHARE *share, const char *type_str); void ndb_rep_event_name(String *event_name, const char *db, const char *tbl); int ndb_create_table_from_engine(THD *thd, const char *db, Loading @@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg); /* table cluster_replication.apply_status */ void ndbcluster_setup_binlog_table_shares(THD *thd); int ndbcluster_setup_binlog_table_shares(THD *thd); extern NDB_SHARE *apply_status_share; extern NDB_SHARE *schema_share; extern THD *injector_thd; extern my_bool ndb_binlog_running; extern my_bool ndb_binlog_tables_inited; bool ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print, Loading