Loading sql/ha_ndbcluster_binlog.cc +36 −0 Original line number Diff line number Diff line Loading @@ -97,6 +97,7 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; pthread_mutex_t ndb_schema_share_mutex; /* Schema object distribution handling */ HASH ndb_schema_objects; Loading Loading @@ -621,6 +622,7 @@ static int ndbcluster_binlog_end(THD *thd) pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); pthread_mutex_destroy(&ndb_schema_share_mutex); #endif DBUG_RETURN(0); } Loading Loading @@ -1271,6 +1273,16 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE); bitmap_set_all(&schema_subscribers); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1283,6 +1295,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ if (updated) { bitmap_clear_bit(&schema_subscribers, node_id); Loading Loading @@ -1478,6 +1493,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, &abstime); if (thd->killed) break; /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); break; } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1487,6 +1510,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, bitmap_intersect(&schema_subscribers, tmp); } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ /* remove any unsubscribed from ndb_schema_object->slock */ bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); Loading Loading @@ -1910,8 +1935,14 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE); // fall through case NDBEVENT::TE_ALTER: Loading Loading @@ -2278,6 +2309,7 @@ int ndbcluster_binlog_start() pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); pthread_cond_init(&injector_cond, NULL); pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); /* Create injector thread */ if (pthread_create(&ndb_binlog_thread, &connection_attrib, Loading Loading @@ -3966,8 +3998,12 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } if (ndb_schema_share) { /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ } /* remove all event operations */ Loading Loading
sql/ha_ndbcluster_binlog.cc +36 −0 Original line number Diff line number Diff line Loading @@ -97,6 +97,7 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; pthread_mutex_t ndb_schema_share_mutex; /* Schema object distribution handling */ HASH ndb_schema_objects; Loading Loading @@ -621,6 +622,7 @@ static int ndbcluster_binlog_end(THD *thd) pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); pthread_mutex_destroy(&ndb_schema_share_mutex); #endif DBUG_RETURN(0); } Loading Loading @@ -1271,6 +1273,16 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE); bitmap_set_all(&schema_subscribers); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1283,6 +1295,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ if (updated) { bitmap_clear_bit(&schema_subscribers, node_id); Loading Loading @@ -1478,6 +1493,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, &abstime); if (thd->killed) break; /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); break; } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1487,6 +1510,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, bitmap_intersect(&schema_subscribers, tmp); } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ /* remove any unsubscribed from ndb_schema_object->slock */ bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); Loading Loading @@ -1910,8 +1935,14 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE); // fall through case NDBEVENT::TE_ALTER: Loading Loading @@ -2278,6 +2309,7 @@ int ndbcluster_binlog_start() pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); pthread_cond_init(&injector_cond, NULL); pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); /* Create injector thread */ if (pthread_create(&ndb_binlog_thread, &connection_attrib, Loading Loading @@ -3966,8 +3998,12 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } if (ndb_schema_share) { /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ } /* remove all event operations */ Loading