Loading sql/ha_ndbcluster.cc +35 −4 Original line number Diff line number Diff line Loading @@ -134,7 +134,6 @@ static uint ndbcluster_alter_table_flags(uint flags) } static int ndbcluster_inited= 0; int ndbcluster_util_inited= 0; static Ndb* g_ndb= NULL; Ndb_cluster_connection* g_ndb_cluster_connection= NULL; Loading @@ -158,6 +157,7 @@ static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *, // Util thread variables pthread_t ndb_util_thread; int ndb_util_thread_running= 0; pthread_mutex_t LOCK_ndb_util_thread; pthread_cond_t COND_ndb_util_thread; pthread_handler_t ndb_util_thread_func(void *arg); Loading Loading @@ -6720,6 +6720,12 @@ static int ndbcluster_init(void *p) goto ndbcluster_init_error; } /* Wait for the util thread to start */ pthread_mutex_lock(&LOCK_ndb_util_thread); while (!ndb_util_thread_running) pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread); pthread_mutex_unlock(&LOCK_ndb_util_thread); ndbcluster_inited= 1; DBUG_RETURN(FALSE); Loading @@ -6742,6 +6748,27 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) if (!ndbcluster_inited) DBUG_RETURN(0); ndbcluster_inited= 0; /* wait for util thread to finish */ pthread_mutex_lock(&LOCK_ndb_util_thread); if (ndb_util_thread_running > 0) { pthread_cond_signal(&COND_ndb_util_thread); pthread_mutex_unlock(&LOCK_ndb_util_thread); pthread_mutex_lock(&LOCK_ndb_util_thread); while (ndb_util_thread_running > 0) { struct timespec abstime; set_timespec(abstime, 1); pthread_cond_timedwait(&COND_ndb_util_thread, &LOCK_ndb_util_thread, &abstime); } } pthread_mutex_unlock(&LOCK_ndb_util_thread); #ifdef HAVE_NDB_BINLOG { Loading Loading @@ -6788,7 +6815,6 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) pthread_mutex_destroy(&ndbcluster_mutex); pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread); ndbcluster_inited= 0; DBUG_RETURN(0); } Loading Loading @@ -8334,6 +8360,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) { thd->cleanup(); delete thd; ndb_util_thread_running= 0; DBUG_RETURN(NULL); } thd->init_for_queries(); Loading @@ -8346,6 +8373,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) thd->main_security_ctx.priv_user = 0; thd->current_stmt_binlog_row_based= TRUE; // If in mixed mode ndb_util_thread_running= 1; pthread_cond_signal(&COND_ndb_util_thread); /* wait for mysql server to start */ Loading @@ -8354,8 +8384,6 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started); ndbcluster_util_inited= 1; /* Wait for cluster to start */ Loading Loading @@ -8537,6 +8565,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) net_end(&thd->net); thd->cleanup(); delete thd; pthread_mutex_lock(&LOCK_ndb_util_thread); ndb_util_thread_running= 0; pthread_mutex_unlock(&LOCK_ndb_util_thread); DBUG_PRINT("exit", ("ndb_util_thread")); my_thread_end(); pthread_exit(0); Loading sql/ha_ndbcluster_binlog.cc +23 −30 Original line number Diff line number Diff line Loading @@ -81,6 +81,8 @@ THD *injector_thd= 0; static Ndb *injector_ndb= 0; static Ndb *schema_ndb= 0; static int ndbcluster_binlog_inited= 0; /* Mutex and condition used for interacting between client sql thread and injector thread Loading Loading @@ -558,29 +560,28 @@ ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binl DBUG_VOID_RETURN; } /* End use of the NDB Cluster table handler - free all global variables allocated by ndbcluster_init() End use of the NDB Cluster binlog - wait for binlog thread to shutdown */ static int ndbcluster_binlog_end(THD *thd) { DBUG_ENTER("ndb_binlog_end"); DBUG_ENTER("ndbcluster_binlog_end"); if (!ndbcluster_util_inited) if (!ndbcluster_binlog_inited) DBUG_RETURN(0); // Kill ndb utility thread (void) pthread_mutex_lock(&LOCK_ndb_util_thread); DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread)); (void) pthread_cond_signal(&COND_ndb_util_thread); (void) pthread_mutex_unlock(&LOCK_ndb_util_thread); ndbcluster_binlog_inited= 0; #ifdef HAVE_NDB_BINLOG /* wait for injector thread to finish */ pthread_mutex_lock(&injector_mutex); if (ndb_binlog_thread_running > 0) { pthread_cond_signal(&injector_cond); pthread_mutex_unlock(&injector_mutex); pthread_mutex_lock(&injector_mutex); while (ndb_binlog_thread_running > 0) { Loading @@ -588,8 +589,9 @@ static int ndbcluster_binlog_end(THD *thd) set_timespec(abstime, 1); pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); } pthread_mutex_unlock(&injector_mutex); } pthread_mutex_unlock(&injector_mutex); /* remove all shares */ { Loading Loading @@ -617,8 +619,10 @@ static int ndbcluster_binlog_end(THD *thd) } pthread_mutex_unlock(&ndbcluster_mutex); } pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); #endif ndbcluster_util_inited= 0; DBUG_RETURN(0); } Loading Loading @@ -2286,9 +2290,9 @@ int ndbcluster_binlog_start() DBUG_RETURN(-1); } /* Wait for the ndb injector thread to finish starting up. */ ndbcluster_binlog_inited= 1; /* Wait for the injector thread to start */ pthread_mutex_lock(&injector_mutex); while (!ndb_binlog_thread_running) pthread_cond_wait(&injector_cond, &injector_mutex); Loading @@ -2297,20 +2301,10 @@ int ndbcluster_binlog_start() if (ndb_binlog_thread_running < 0) DBUG_RETURN(-1); DBUG_RETURN(0); } static void ndbcluster_binlog_close_connection(THD *thd) { DBUG_ENTER("ndbcluster_binlog_close_connection"); const char *save_info= thd->proc_info; thd->proc_info= "ndbcluster_binlog_close_connection"; do_ndbcluster_binlog_close_connection= BCCC_exit; while (ndb_binlog_thread_running > 0) sleep(1); thd->proc_info= save_info; DBUG_VOID_RETURN; } /************************************************************** Internal helper functions for creating/dropping ndb events Loading Loading @@ -3953,6 +3947,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) goto restart; } err: sql_print_information("Stopping Cluster Binlog"); DBUG_PRINT("info",("Shutting down cluster binlog thread")); thd->proc_info= "Shutting down"; close_thread_tables(thd); Loading @@ -3965,8 +3960,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory sql_print_information("Stopping Cluster Binlog"); if (ndb_apply_status_share) { free_share(&ndb_apply_status_share); Loading Loading
sql/ha_ndbcluster.cc +35 −4 Original line number Diff line number Diff line Loading @@ -134,7 +134,6 @@ static uint ndbcluster_alter_table_flags(uint flags) } static int ndbcluster_inited= 0; int ndbcluster_util_inited= 0; static Ndb* g_ndb= NULL; Ndb_cluster_connection* g_ndb_cluster_connection= NULL; Loading @@ -158,6 +157,7 @@ static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *, // Util thread variables pthread_t ndb_util_thread; int ndb_util_thread_running= 0; pthread_mutex_t LOCK_ndb_util_thread; pthread_cond_t COND_ndb_util_thread; pthread_handler_t ndb_util_thread_func(void *arg); Loading Loading @@ -6720,6 +6720,12 @@ static int ndbcluster_init(void *p) goto ndbcluster_init_error; } /* Wait for the util thread to start */ pthread_mutex_lock(&LOCK_ndb_util_thread); while (!ndb_util_thread_running) pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread); pthread_mutex_unlock(&LOCK_ndb_util_thread); ndbcluster_inited= 1; DBUG_RETURN(FALSE); Loading @@ -6742,6 +6748,27 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) if (!ndbcluster_inited) DBUG_RETURN(0); ndbcluster_inited= 0; /* wait for util thread to finish */ pthread_mutex_lock(&LOCK_ndb_util_thread); if (ndb_util_thread_running > 0) { pthread_cond_signal(&COND_ndb_util_thread); pthread_mutex_unlock(&LOCK_ndb_util_thread); pthread_mutex_lock(&LOCK_ndb_util_thread); while (ndb_util_thread_running > 0) { struct timespec abstime; set_timespec(abstime, 1); pthread_cond_timedwait(&COND_ndb_util_thread, &LOCK_ndb_util_thread, &abstime); } } pthread_mutex_unlock(&LOCK_ndb_util_thread); #ifdef HAVE_NDB_BINLOG { Loading Loading @@ -6788,7 +6815,6 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) pthread_mutex_destroy(&ndbcluster_mutex); pthread_mutex_destroy(&LOCK_ndb_util_thread); pthread_cond_destroy(&COND_ndb_util_thread); ndbcluster_inited= 0; DBUG_RETURN(0); } Loading Loading @@ -8334,6 +8360,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) { thd->cleanup(); delete thd; ndb_util_thread_running= 0; DBUG_RETURN(NULL); } thd->init_for_queries(); Loading @@ -8346,6 +8373,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) thd->main_security_ctx.priv_user = 0; thd->current_stmt_binlog_row_based= TRUE; // If in mixed mode ndb_util_thread_running= 1; pthread_cond_signal(&COND_ndb_util_thread); /* wait for mysql server to start */ Loading @@ -8354,8 +8384,6 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) pthread_cond_wait(&COND_server_started, &LOCK_server_started); pthread_mutex_unlock(&LOCK_server_started); ndbcluster_util_inited= 1; /* Wait for cluster to start */ Loading Loading @@ -8537,6 +8565,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused))) net_end(&thd->net); thd->cleanup(); delete thd; pthread_mutex_lock(&LOCK_ndb_util_thread); ndb_util_thread_running= 0; pthread_mutex_unlock(&LOCK_ndb_util_thread); DBUG_PRINT("exit", ("ndb_util_thread")); my_thread_end(); pthread_exit(0); Loading
sql/ha_ndbcluster_binlog.cc +23 −30 Original line number Diff line number Diff line Loading @@ -81,6 +81,8 @@ THD *injector_thd= 0; static Ndb *injector_ndb= 0; static Ndb *schema_ndb= 0; static int ndbcluster_binlog_inited= 0; /* Mutex and condition used for interacting between client sql thread and injector thread Loading Loading @@ -558,29 +560,28 @@ ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binl DBUG_VOID_RETURN; } /* End use of the NDB Cluster table handler - free all global variables allocated by ndbcluster_init() End use of the NDB Cluster binlog - wait for binlog thread to shutdown */ static int ndbcluster_binlog_end(THD *thd) { DBUG_ENTER("ndb_binlog_end"); DBUG_ENTER("ndbcluster_binlog_end"); if (!ndbcluster_util_inited) if (!ndbcluster_binlog_inited) DBUG_RETURN(0); // Kill ndb utility thread (void) pthread_mutex_lock(&LOCK_ndb_util_thread); DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread)); (void) pthread_cond_signal(&COND_ndb_util_thread); (void) pthread_mutex_unlock(&LOCK_ndb_util_thread); ndbcluster_binlog_inited= 0; #ifdef HAVE_NDB_BINLOG /* wait for injector thread to finish */ pthread_mutex_lock(&injector_mutex); if (ndb_binlog_thread_running > 0) { pthread_cond_signal(&injector_cond); pthread_mutex_unlock(&injector_mutex); pthread_mutex_lock(&injector_mutex); while (ndb_binlog_thread_running > 0) { Loading @@ -588,8 +589,9 @@ static int ndbcluster_binlog_end(THD *thd) set_timespec(abstime, 1); pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); } pthread_mutex_unlock(&injector_mutex); } pthread_mutex_unlock(&injector_mutex); /* remove all shares */ { Loading Loading @@ -617,8 +619,10 @@ static int ndbcluster_binlog_end(THD *thd) } pthread_mutex_unlock(&ndbcluster_mutex); } pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); #endif ndbcluster_util_inited= 0; DBUG_RETURN(0); } Loading Loading @@ -2286,9 +2290,9 @@ int ndbcluster_binlog_start() DBUG_RETURN(-1); } /* Wait for the ndb injector thread to finish starting up. */ ndbcluster_binlog_inited= 1; /* Wait for the injector thread to start */ pthread_mutex_lock(&injector_mutex); while (!ndb_binlog_thread_running) pthread_cond_wait(&injector_cond, &injector_mutex); Loading @@ -2297,20 +2301,10 @@ int ndbcluster_binlog_start() if (ndb_binlog_thread_running < 0) DBUG_RETURN(-1); DBUG_RETURN(0); } static void ndbcluster_binlog_close_connection(THD *thd) { DBUG_ENTER("ndbcluster_binlog_close_connection"); const char *save_info= thd->proc_info; thd->proc_info= "ndbcluster_binlog_close_connection"; do_ndbcluster_binlog_close_connection= BCCC_exit; while (ndb_binlog_thread_running > 0) sleep(1); thd->proc_info= save_info; DBUG_VOID_RETURN; } /************************************************************** Internal helper functions for creating/dropping ndb events Loading Loading @@ -3953,6 +3947,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) goto restart; } err: sql_print_information("Stopping Cluster Binlog"); DBUG_PRINT("info",("Shutting down cluster binlog thread")); thd->proc_info= "Shutting down"; close_thread_tables(thd); Loading @@ -3965,8 +3960,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) pthread_mutex_unlock(&injector_mutex); thd->db= 0; // as not to try to free memory sql_print_information("Stopping Cluster Binlog"); if (ndb_apply_status_share) { free_share(&ndb_apply_status_share); Loading