Loading sql/ha_ndbcluster_binlog.cc +87 −54 Original line number Diff line number Diff line Loading @@ -1228,7 +1228,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, struct timespec abstime; int i; set_timespec(abstime, 1); (void) pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond, &ndb_schema_object->mutex, &abstime); Loading @@ -1252,6 +1252,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) break; if (ret) { max_timeout--; if (max_timeout == 0) { Loading @@ -1263,6 +1265,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ndb_report_waiting(type_str, max_timeout, "distributing", ndb_schema_object->key); } } (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } Loading Loading @@ -2445,7 +2448,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } if (!op) { pthread_mutex_unlock(&injector_mutex); sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" " %s",event_name); push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, Loading @@ -2453,6 +2455,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndb->getNdbError().code, ndb->getNdbError().message, "NDB"); pthread_mutex_unlock(&injector_mutex); DBUG_RETURN(-1); } Loading Loading @@ -2635,21 +2638,24 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, { struct timespec abstime; set_timespec(abstime, 1); (void) pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond, &share->mutex, &abstime); max_timeout--; if (share->op == 0) break; if (ret) { max_timeout--; if (max_timeout == 0) { sql_print_error("NDB delete table: timed out. Ignoring..."); sql_print_error("NDB %s: timed out. Ignoring...", type_str); break; } if (ndb_extra_logging) ndb_report_waiting(type_str, max_timeout, type_str, share->key); } } (void) pthread_mutex_unlock(&share->mutex); #else (void) pthread_mutex_lock(&share->mutex); Loading Loading @@ -2711,7 +2717,8 @@ static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp, } static int ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, NdbEventOperation *pOp, Binlog_index_row &row) { NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); Loading @@ -2720,7 +2727,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, /* make sure to flush any pending events as they can be dependent on one of the tables being changed below */ injector_thd->binlog_flush_pending_rows_event(true); thd->binlog_flush_pending_rows_event(true); switch (type) { Loading Loading @@ -2777,7 +2784,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, return 0; } ndb_handle_schema_change(injector_thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, share); return 0; } Loading Loading @@ -3057,7 +3064,8 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, pthread_handler_t ndb_binlog_thread_func(void *arg) { THD *thd; /* needs to be first for thread_stack */ Ndb *ndb= 0; Ndb *i_ndb= 0; Ndb *s_ndb= 0; Thd_ndb *thd_ndb=0; int ndb_update_binlog_index= 1; injector *inj= injector::instance(); Loading Loading @@ -3109,16 +3117,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&LOCK_thread_count); thd->lex->start_transaction_opt= 0; if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) || schema_ndb->init()) if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) || s_ndb->init()) { sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); goto err; } // empty database if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) || ndb->init()) if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) || i_ndb->init()) { sql_print_error("NDB Binlog: Getting Ndb object failed"); ndb_binlog_thread_running= -1; Loading @@ -3139,7 +3147,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_lock(&injector_mutex); */ injector_thd= thd; injector_ndb= ndb; injector_ndb= i_ndb; schema_ndb= s_ndb; ndb_binlog_thread_running= 1; if (opt_bin_log) { Loading Loading @@ -3221,14 +3230,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) int res= 0, tot_poll_wait= 1000; if (ndb_binlog_running) { res= ndb->pollEvents(tot_poll_wait, &gci); res= i_ndb->pollEvents(tot_poll_wait, &gci); tot_poll_wait= 0; } int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci); int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci); ndb_latest_received_binlog_epoch= gci; while (gci > schema_gci && schema_res >= 0) schema_res= schema_ndb->pollEvents(10, &schema_gci); schema_res= s_ndb->pollEvents(10, &schema_gci); if ((abort_loop || do_ndbcluster_binlog_close_connection) && (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || Loading Loading @@ -3256,11 +3265,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (unlikely(schema_res > 0)) { thd->proc_info= "Processing events from schema table"; schema_ndb-> s_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); schema_ndb-> s_ndb-> setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); NdbEventOperation *pOp= schema_ndb->nextEvent(); NdbEventOperation *pOp= s_ndb->nextEvent(); while (pOp != NULL) { if (!pOp->hasError()) Loading @@ -3273,7 +3282,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) "binlog schema event", (ulong) pOp->getNdbError().code, pOp->getNdbError().message); pOp= schema_ndb->nextEvent(); pOp= s_ndb->nextEvent(); } } Loading @@ -3285,7 +3294,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) int event_count= 0; #endif thd->proc_info= "Processing events"; NdbEventOperation *pOp= ndb->nextEvent(); NdbEventOperation *pOp= i_ndb->nextEvent(); Binlog_index_row row; while (pOp != NULL) { Loading @@ -3296,9 +3305,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName())); DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch); ndb-> i_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); bzero((char*) &row, sizeof(row)); injector::transaction trans; Loading @@ -3307,7 +3316,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Uint32 iter= 0; const NdbEventOperation *gci_op; Uint32 event_types; while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types)) != NULL) { NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData(); Loading Loading @@ -3393,7 +3402,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) event_count++; #endif if (pOp->hasError() && ndb_binlog_thread_handle_error(ndb, pOp, row) < 0) ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0) goto err; #ifndef DBUG_OFF Loading @@ -3413,7 +3422,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Uint32 iter= 0; const NdbEventOperation *gci_op; Uint32 event_types; while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types)) != NULL) { if (gci_op == pOp) Loading @@ -3425,19 +3434,19 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) #endif if ((unsigned) pOp->getEventType() < (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT) ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans); ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans); else { // set injector_ndb database/schema from table internal name int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); DBUG_ASSERT(ret == 0); ndb_binlog_thread_handle_non_data_event(ndb, pOp, row); ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row); // reset to catch errors ndb->setDatabaseName(""); i_ndb->setDatabaseName(""); } pOp= ndb->nextEvent(); pOp= i_ndb->nextEvent(); } while (pOp && pOp->getGCI() == gci); /* Loading Loading @@ -3495,7 +3504,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) close_thread_tables(thd); pthread_mutex_lock(&injector_mutex); /* don't mess with the injector_ndb anymore from other threads */ injector_thd= 0; injector_ndb= 0; schema_ndb= 0; pthread_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory sql_print_information("Stopping Cluster Binlog"); Loading @@ -3512,21 +3523,43 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } /* remove all event operations */ if (ndb) if (s_ndb) { NdbEventOperation *op; DBUG_PRINT("info",("removing all event operations")); while ((op= ndb->getEventOperation())) while ((op= s_ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); DBUG_ASSERT(share->op == op || share->op_old == op); share->op= share->op_old= 0; free_share(&share); ndb->dropEventOperation(op); s_ndb->dropEventOperation(op); } delete s_ndb; s_ndb= 0; } if (i_ndb) { NdbEventOperation *op; DBUG_PRINT("info",("removing all event operations")); while ((op= i_ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); DBUG_ASSERT(share->op == op || share->op_old == op); share->op= share->op_old= 0; free_share(&share); i_ndb->dropEventOperation(op); } delete ndb; ndb= 0; delete i_ndb; i_ndb= 0; } hash_free(&ndb_schema_objects); Loading Loading
sql/ha_ndbcluster_binlog.cc +87 −54 Original line number Diff line number Diff line Loading @@ -1228,7 +1228,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, struct timespec abstime; int i; set_timespec(abstime, 1); (void) pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond, &ndb_schema_object->mutex, &abstime); Loading @@ -1252,6 +1252,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) break; if (ret) { max_timeout--; if (max_timeout == 0) { Loading @@ -1263,6 +1265,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, ndb_report_waiting(type_str, max_timeout, "distributing", ndb_schema_object->key); } } (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } Loading Loading @@ -2445,7 +2448,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } if (!op) { pthread_mutex_unlock(&injector_mutex); sql_print_error("NDB Binlog: Creating NdbEventOperation failed for" " %s",event_name); push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR, Loading @@ -2453,6 +2455,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, ndb->getNdbError().code, ndb->getNdbError().message, "NDB"); pthread_mutex_unlock(&injector_mutex); DBUG_RETURN(-1); } Loading Loading @@ -2635,21 +2638,24 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, { struct timespec abstime; set_timespec(abstime, 1); (void) pthread_cond_timedwait(&injector_cond, int ret= pthread_cond_timedwait(&injector_cond, &share->mutex, &abstime); max_timeout--; if (share->op == 0) break; if (ret) { max_timeout--; if (max_timeout == 0) { sql_print_error("NDB delete table: timed out. Ignoring..."); sql_print_error("NDB %s: timed out. Ignoring...", type_str); break; } if (ndb_extra_logging) ndb_report_waiting(type_str, max_timeout, type_str, share->key); } } (void) pthread_mutex_unlock(&share->mutex); #else (void) pthread_mutex_lock(&share->mutex); Loading Loading @@ -2711,7 +2717,8 @@ static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp, } static int ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb, NdbEventOperation *pOp, Binlog_index_row &row) { NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); Loading @@ -2720,7 +2727,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, /* make sure to flush any pending events as they can be dependent on one of the tables being changed below */ injector_thd->binlog_flush_pending_rows_event(true); thd->binlog_flush_pending_rows_event(true); switch (type) { Loading Loading @@ -2777,7 +2784,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp, return 0; } ndb_handle_schema_change(injector_thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, share); return 0; } Loading Loading @@ -3057,7 +3064,8 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, pthread_handler_t ndb_binlog_thread_func(void *arg) { THD *thd; /* needs to be first for thread_stack */ Ndb *ndb= 0; Ndb *i_ndb= 0; Ndb *s_ndb= 0; Thd_ndb *thd_ndb=0; int ndb_update_binlog_index= 1; injector *inj= injector::instance(); Loading Loading @@ -3109,16 +3117,16 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&LOCK_thread_count); thd->lex->start_transaction_opt= 0; if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) || schema_ndb->init()) if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) || s_ndb->init()) { sql_print_error("NDB Binlog: Getting Schema Ndb object failed"); goto err; } // empty database if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) || ndb->init()) if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) || i_ndb->init()) { sql_print_error("NDB Binlog: Getting Ndb object failed"); ndb_binlog_thread_running= -1; Loading @@ -3139,7 +3147,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_lock(&injector_mutex); */ injector_thd= thd; injector_ndb= ndb; injector_ndb= i_ndb; schema_ndb= s_ndb; ndb_binlog_thread_running= 1; if (opt_bin_log) { Loading Loading @@ -3221,14 +3230,14 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) int res= 0, tot_poll_wait= 1000; if (ndb_binlog_running) { res= ndb->pollEvents(tot_poll_wait, &gci); res= i_ndb->pollEvents(tot_poll_wait, &gci); tot_poll_wait= 0; } int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci); int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci); ndb_latest_received_binlog_epoch= gci; while (gci > schema_gci && schema_res >= 0) schema_res= schema_ndb->pollEvents(10, &schema_gci); schema_res= s_ndb->pollEvents(10, &schema_gci); if ((abort_loop || do_ndbcluster_binlog_close_connection) && (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci || Loading Loading @@ -3256,11 +3265,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) if (unlikely(schema_res > 0)) { thd->proc_info= "Processing events from schema table"; schema_ndb-> s_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); schema_ndb-> s_ndb-> setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); NdbEventOperation *pOp= schema_ndb->nextEvent(); NdbEventOperation *pOp= s_ndb->nextEvent(); while (pOp != NULL) { if (!pOp->hasError()) Loading @@ -3273,7 +3282,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) "binlog schema event", (ulong) pOp->getNdbError().code, pOp->getNdbError().message); pOp= schema_ndb->nextEvent(); pOp= s_ndb->nextEvent(); } } Loading @@ -3285,7 +3294,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) int event_count= 0; #endif thd->proc_info= "Processing events"; NdbEventOperation *pOp= ndb->nextEvent(); NdbEventOperation *pOp= i_ndb->nextEvent(); Binlog_index_row row; while (pOp != NULL) { Loading @@ -3296,9 +3305,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName())); DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch); ndb-> i_ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); bzero((char*) &row, sizeof(row)); injector::transaction trans; Loading @@ -3307,7 +3316,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Uint32 iter= 0; const NdbEventOperation *gci_op; Uint32 event_types; while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types)) != NULL) { NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData(); Loading Loading @@ -3393,7 +3402,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) event_count++; #endif if (pOp->hasError() && ndb_binlog_thread_handle_error(ndb, pOp, row) < 0) ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0) goto err; #ifndef DBUG_OFF Loading @@ -3413,7 +3422,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Uint32 iter= 0; const NdbEventOperation *gci_op; Uint32 event_types; while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types)) while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types)) != NULL) { if (gci_op == pOp) Loading @@ -3425,19 +3434,19 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) #endif if ((unsigned) pOp->getEventType() < (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT) ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans); ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans); else { // set injector_ndb database/schema from table internal name int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable()); DBUG_ASSERT(ret == 0); ndb_binlog_thread_handle_non_data_event(ndb, pOp, row); ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row); // reset to catch errors ndb->setDatabaseName(""); i_ndb->setDatabaseName(""); } pOp= ndb->nextEvent(); pOp= i_ndb->nextEvent(); } while (pOp && pOp->getGCI() == gci); /* Loading Loading @@ -3495,7 +3504,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) close_thread_tables(thd); pthread_mutex_lock(&injector_mutex); /* don't mess with the injector_ndb anymore from other threads */ injector_thd= 0; injector_ndb= 0; schema_ndb= 0; pthread_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory sql_print_information("Stopping Cluster Binlog"); Loading @@ -3512,21 +3523,43 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } /* remove all event operations */ if (ndb) if (s_ndb) { NdbEventOperation *op; DBUG_PRINT("info",("removing all event operations")); while ((op= ndb->getEventOperation())) while ((op= s_ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); DBUG_ASSERT(share->op == op || share->op_old == op); share->op= share->op_old= 0; free_share(&share); ndb->dropEventOperation(op); s_ndb->dropEventOperation(op); } delete s_ndb; s_ndb= 0; } if (i_ndb) { NdbEventOperation *op; DBUG_PRINT("info",("removing all event operations")); while ((op= i_ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); DBUG_ASSERT(share->op == op || share->op_old == op); share->op= share->op_old= 0; free_share(&share); i_ndb->dropEventOperation(op); } delete ndb; ndb= 0; delete i_ndb; i_ndb= 0; } hash_free(&ndb_schema_objects); Loading