Loading sql/ha_ndbcluster.h +0 −2 Original line number Diff line number Diff line Loading @@ -115,8 +115,6 @@ typedef struct st_ndbcluster_share { TABLE *table; NdbValue *ndb_value[2]; MY_BITMAP *subscriber_bitmap; MY_BITMAP slock_bitmap; uint32 slock[256/32]; // 256 bits for lock status of table #endif } NDB_SHARE; Loading sql/ha_ndbcluster_binlog.cc +162 −65 Original line number Diff line number Diff line Loading @@ -86,6 +86,22 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *apply_status_share= 0; NDB_SHARE *schema_share= 0; /* Schema object distribution handling */ HASH ndb_schema_objects; typedef struct st_ndb_schema_object { pthread_mutex_t mutex; char *key; uint key_length; uint use_count; MY_BITMAP slock_bitmap; uint32 slock[256/32]; // 256 bits for lock status of table } NDB_SCHEMA_OBJECT; static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, my_bool create_if_not_exists, my_bool have_lock); static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, bool have_lock); /* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ extern Uint64 g_latest_trans_gci; Loading Loading @@ -328,9 +344,6 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) max_ndb_nodes, false); bitmap_clear_all(&share->subscriber_bitmap[i]); } bitmap_init(&share->slock_bitmap, share->slock, sizeof(share->slock)*8, false); bitmap_clear_all(&share->slock_bitmap); } if (!do_event_op) Loading Loading @@ -952,7 +965,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; int get_a_share= 0; switch (type) { case SOT_DROP_TABLE: Loading @@ -963,8 +975,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); if (!share) get_a_share= 1; break; case SOT_RENAME_TABLE: /* redo the rename table query as is may contain several tables */ Loading @@ -972,14 +982,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); if (!share) get_a_share= 1; break; case SOT_CREATE_TABLE: // fall through case SOT_ALTER_TABLE: if (!share) get_a_share= 1; break; case SOT_DROP_DB: break; Loading @@ -995,18 +1001,18 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, abort(); /* should not happen, programming error */ } if (get_a_share) NDB_SCHEMA_OBJECT *ndb_schema_object; { char key[FN_REFLEN]; build_table_filename(key, sizeof(key), db, table_name, ""); share= get_share(key, 0, false, false); ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE); } const NdbError *ndb_error= 0; uint32 node_id= g_ndb_cluster_connection->node_id(); Uint64 epoch= 0; MY_BITMAP schema_subscribers; uint32 bitbuf[sizeof(schema_share->slock)/4]; uint32 bitbuf[sizeof(ndb_schema_object->slock)/4]; { int i; bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false); Loading @@ -1022,11 +1028,12 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, (void) pthread_mutex_unlock(&schema_share->mutex); bitmap_clear_bit(&schema_subscribers, node_id); if (share) if (ndb_schema_object) { (void) pthread_mutex_lock(&share->mutex); memcpy(share->slock, schema_subscribers.bitmap, sizeof(share->slock)); (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_lock(&ndb_schema_object->mutex); memcpy(ndb_schema_object->slock, schema_subscribers.bitmap, sizeof(ndb_schema_object->slock)); (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap, Loading Loading @@ -1158,20 +1165,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, Wait for other mysqld's to acknowledge the table operation */ if (ndb_error == 0 && (type == SOT_CREATE_TABLE || type == SOT_RENAME_TABLE || type == SOT_ALTER_TABLE) && !bitmap_is_clear_all(&schema_subscribers)) { int max_timeout= 10; (void) pthread_mutex_lock(&share->mutex); (void) pthread_mutex_lock(&ndb_schema_object->mutex); while (1) { struct timespec abstime; int i; set_timespec(abstime, 1); (void) pthread_cond_timedwait(&injector_cond, &share->mutex, &ndb_schema_object->mutex, &abstime); (void) pthread_mutex_lock(&schema_share->mutex); Loading @@ -1184,14 +1188,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } (void) pthread_mutex_unlock(&schema_share->mutex); /* remove any unsubscribed from share->slock */ bitmap_intersect(&share->slock_bitmap, &schema_subscribers); /* remove any unsubscribed from ndb_schema_object->slock */ bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, no_bytes_in_map(&share->slock_bitmap)); DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", (char*)ndb_schema_object->slock_bitmap.bitmap, no_bytes_in_map(&ndb_schema_object->slock_bitmap)); if (bitmap_is_clear_all(&share->slock_bitmap)) if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) break; max_timeout--; Loading @@ -1203,16 +1207,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, if (ndb_extra_logging) sql_print_information("NDB create table: " "waiting max %u sec for create table %s.", max_timeout, share->key); max_timeout, ndb_schema_object->key); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } if (get_a_share && share) { free_share(&share); share= 0; } if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } Loading Loading @@ -1529,12 +1530,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, MEM_ROOT *mem_root) { DBUG_ENTER("ndb_binlog_thread_handle_schema_event"); NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); if (share && schema_share == share) NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData(); if (tmp_share && schema_share == tmp_share) { NDBEVENT::TableEvent ev_type= pOp->getEventType(); DBUG_PRINT("enter", ("%s.%s ev_type: %d", share->db, share->table_name, ev_type)); tmp_share->db, tmp_share->table_name, ev_type)); if (ev_type == NDBEVENT::TE_UPDATE || ev_type == NDBEVENT::TE_INSERT) { Loading @@ -1543,7 +1544,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, MY_BITMAP slock; bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(share, schema); ndbcluster_get_schema(tmp_share, schema); if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; Loading Loading @@ -1628,18 +1629,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, break; case SOT_CLEAR_SLOCK: { if (share) { pthread_mutex_lock(&share->mutex); memcpy(share->slock, schema->slock, sizeof(share->slock)); DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, no_bytes_in_map(&share->slock_bitmap)); pthread_mutex_unlock(&share->mutex); pthread_mutex_lock(&ndbcluster_mutex); NDB_SCHEMA_OBJECT *ndb_schema_object= (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects, (byte*) key, strlen(key)); if (ndb_schema_object) { pthread_mutex_lock(&ndb_schema_object->mutex); memcpy(ndb_schema_object->slock, schema->slock, sizeof(ndb_schema_object->slock)); DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", (char*)ndb_schema_object->slock_bitmap.bitmap, no_bytes_in_map(&ndb_schema_object->slock_bitmap)); pthread_mutex_unlock(&ndb_schema_object->mutex); pthread_cond_signal(&injector_cond); free_share(&share); share= 0; } if (share) free_share(&share, TRUE); pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } case SOT_TABLESPACE: Loading Loading @@ -1687,24 +1694,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema_share= 0; // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, tmp_share); break; case NDBEVENT::TE_NODE_FAILURE: { uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; DBUG_ASSERT(node_id != 0xFF); (void) pthread_mutex_lock(&share->mutex); bitmap_clear_all(&share->subscriber_bitmap[node_id]); (void) pthread_mutex_lock(&tmp_share->mutex); bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]); DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id)); if (ndb_extra_logging) { sql_print_information("NDB Binlog: Node: %d, down," " Subscriber bitmask %x%x", pOp->getNdbdNodeId(), share->subscriber_bitmap[node_id].bitmap[1], share->subscriber_bitmap[node_id].bitmap[0]); tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&tmp_share->mutex); (void) pthread_cond_signal(&injector_cond); break; } Loading @@ -1713,8 +1720,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; uint8 req_id= pOp->getReqNodeId(); DBUG_ASSERT(req_id != 0 && node_id != 0xFF); (void) pthread_mutex_lock(&share->mutex); bitmap_set_bit(&share->subscriber_bitmap[node_id], req_id); (void) pthread_mutex_lock(&tmp_share->mutex); bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id); DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id)); if (ndb_extra_logging) { Loading @@ -1722,10 +1729,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, " Subscriber bitmask %x%x", pOp->getNdbdNodeId(), req_id, share->subscriber_bitmap[node_id].bitmap[1], share->subscriber_bitmap[node_id].bitmap[0]); tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&tmp_share->mutex); (void) pthread_cond_signal(&injector_cond); break; } Loading @@ -1734,8 +1741,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; uint8 req_id= pOp->getReqNodeId(); DBUG_ASSERT(req_id != 0 && node_id != 0xFF); (void) pthread_mutex_lock(&share->mutex); bitmap_clear_bit(&share->subscriber_bitmap[node_id], req_id); (void) pthread_mutex_lock(&tmp_share->mutex); bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id); DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id)); if (ndb_extra_logging) { Loading @@ -1743,16 +1750,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, " Subscriber bitmask %x%x", pOp->getNdbdNodeId(), req_id, share->subscriber_bitmap[node_id].bitmap[1], share->subscriber_bitmap[node_id].bitmap[0]); tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&tmp_share->mutex); (void) pthread_cond_signal(&injector_cond); break; } default: sql_print_error("NDB Binlog: unknown non data event %d for %s. " "Ignoring...", (unsigned) ev_type, share->key); "Ignoring...", (unsigned) ev_type, tmp_share->key); } } DBUG_RETURN(0); Loading Loading @@ -2888,6 +2895,90 @@ class Timer Injector thread main loop ****************************************************************/ static byte *ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object, uint *length, my_bool not_used __attribute__((unused))) { *length= schema_object->key_length; return (byte*) schema_object->key; } static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, my_bool create_if_not_exists, my_bool have_lock) { NDB_SCHEMA_OBJECT *ndb_schema_object; uint length= (uint) strlen(key); DBUG_ENTER("ndb_get_schema_object"); DBUG_PRINT("enter", ("key: '%s'", key)); if (!have_lock) pthread_mutex_lock(&ndbcluster_mutex); while (!(ndb_schema_object= (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects, (byte*) key, length))) { if (!create_if_not_exists) { DBUG_PRINT("info", ("does not exist")); break; } if (!(ndb_schema_object= (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1, MYF(MY_WME | MY_ZEROFILL)))) { DBUG_PRINT("info", ("malloc error")); break; } ndb_schema_object->key= (char *)(ndb_schema_object+1); memcpy(ndb_schema_object->key, key, length + 1); ndb_schema_object->key_length= length; if (my_hash_insert(&ndb_schema_objects, (byte*) ndb_schema_object)) { my_free((gptr) ndb_schema_object, 0); break; } pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST); bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock, sizeof(ndb_schema_object->slock)*8, false); bitmap_clear_all(&ndb_schema_object->slock_bitmap); break; } if (ndb_schema_object) { ndb_schema_object->use_count++; DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count)); } if (!have_lock) pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(ndb_schema_object); } static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, bool have_lock) { DBUG_ENTER("ndb_free_schema_object"); DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key)); if (!have_lock) pthread_mutex_lock(&ndbcluster_mutex); if (!--(*ndb_schema_object)->use_count) { DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count)); hash_delete(&ndb_schema_objects, (byte*) *ndb_schema_object); pthread_mutex_destroy(&(*ndb_schema_object)->mutex); my_free((gptr) *ndb_schema_object, MYF(0)); *ndb_schema_object= 0; } else { DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count)); } if (!have_lock) pthread_mutex_unlock(&ndbcluster_mutex); DBUG_VOID_RETURN; } pthread_handler_t ndb_binlog_thread_func(void *arg) { THD *thd; /* needs to be first for thread_stack */ Loading Loading @@ -2961,6 +3052,10 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) goto err; } /* init hash for schema object distribution */ (void) hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0, (hash_get_key)ndb_schema_objects_get_key, 0, 0); /* Expose global reference to our ndb object. Loading Loading @@ -3360,6 +3455,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ndb= 0; } hash_free(&ndb_schema_objects); // Placed here to avoid a memory leak; TODO: check if needed net_end(&thd->net); delete thd; Loading Loading
sql/ha_ndbcluster.h +0 −2 Original line number Diff line number Diff line Loading @@ -115,8 +115,6 @@ typedef struct st_ndbcluster_share { TABLE *table; NdbValue *ndb_value[2]; MY_BITMAP *subscriber_bitmap; MY_BITMAP slock_bitmap; uint32 slock[256/32]; // 256 bits for lock status of table #endif } NDB_SHARE; Loading
sql/ha_ndbcluster_binlog.cc +162 −65 Original line number Diff line number Diff line Loading @@ -86,6 +86,22 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *apply_status_share= 0; NDB_SHARE *schema_share= 0; /* Schema object distribution handling */ HASH ndb_schema_objects; typedef struct st_ndb_schema_object { pthread_mutex_t mutex; char *key; uint key_length; uint use_count; MY_BITMAP slock_bitmap; uint32 slock[256/32]; // 256 bits for lock status of table } NDB_SCHEMA_OBJECT; static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, my_bool create_if_not_exists, my_bool have_lock); static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, bool have_lock); /* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */ extern Uint64 g_latest_trans_gci; Loading Loading @@ -328,9 +344,6 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) max_ndb_nodes, false); bitmap_clear_all(&share->subscriber_bitmap[i]); } bitmap_init(&share->slock_bitmap, share->slock, sizeof(share->slock)*8, false); bitmap_clear_all(&share->slock_bitmap); } if (!do_event_op) Loading Loading @@ -952,7 +965,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } char tmp_buf2[FN_REFLEN]; int get_a_share= 0; switch (type) { case SOT_DROP_TABLE: Loading @@ -963,8 +975,6 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "drop table `", table_name, "`", NullS) - tmp_buf2); if (!share) get_a_share= 1; break; case SOT_RENAME_TABLE: /* redo the rename table query as is may contain several tables */ Loading @@ -972,14 +982,10 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); if (!share) get_a_share= 1; break; case SOT_CREATE_TABLE: // fall through case SOT_ALTER_TABLE: if (!share) get_a_share= 1; break; case SOT_DROP_DB: break; Loading @@ -995,18 +1001,18 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, abort(); /* should not happen, programming error */ } if (get_a_share) NDB_SCHEMA_OBJECT *ndb_schema_object; { char key[FN_REFLEN]; build_table_filename(key, sizeof(key), db, table_name, ""); share= get_share(key, 0, false, false); ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE); } const NdbError *ndb_error= 0; uint32 node_id= g_ndb_cluster_connection->node_id(); Uint64 epoch= 0; MY_BITMAP schema_subscribers; uint32 bitbuf[sizeof(schema_share->slock)/4]; uint32 bitbuf[sizeof(ndb_schema_object->slock)/4]; { int i; bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, false); Loading @@ -1022,11 +1028,12 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, (void) pthread_mutex_unlock(&schema_share->mutex); bitmap_clear_bit(&schema_subscribers, node_id); if (share) if (ndb_schema_object) { (void) pthread_mutex_lock(&share->mutex); memcpy(share->slock, schema_subscribers.bitmap, sizeof(share->slock)); (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_lock(&ndb_schema_object->mutex); memcpy(ndb_schema_object->slock, schema_subscribers.bitmap, sizeof(ndb_schema_object->slock)); (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } DBUG_DUMP("schema_subscribers", (char*)schema_subscribers.bitmap, Loading Loading @@ -1158,20 +1165,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, Wait for other mysqld's to acknowledge the table operation */ if (ndb_error == 0 && (type == SOT_CREATE_TABLE || type == SOT_RENAME_TABLE || type == SOT_ALTER_TABLE) && !bitmap_is_clear_all(&schema_subscribers)) { int max_timeout= 10; (void) pthread_mutex_lock(&share->mutex); (void) pthread_mutex_lock(&ndb_schema_object->mutex); while (1) { struct timespec abstime; int i; set_timespec(abstime, 1); (void) pthread_cond_timedwait(&injector_cond, &share->mutex, &ndb_schema_object->mutex, &abstime); (void) pthread_mutex_lock(&schema_share->mutex); Loading @@ -1184,14 +1188,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } (void) pthread_mutex_unlock(&schema_share->mutex); /* remove any unsubscribed from share->slock */ bitmap_intersect(&share->slock_bitmap, &schema_subscribers); /* remove any unsubscribed from ndb_schema_object->slock */ bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, no_bytes_in_map(&share->slock_bitmap)); DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", (char*)ndb_schema_object->slock_bitmap.bitmap, no_bytes_in_map(&ndb_schema_object->slock_bitmap)); if (bitmap_is_clear_all(&share->slock_bitmap)) if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap)) break; max_timeout--; Loading @@ -1203,16 +1207,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, if (ndb_extra_logging) sql_print_information("NDB create table: " "waiting max %u sec for create table %s.", max_timeout, share->key); max_timeout, ndb_schema_object->key); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&ndb_schema_object->mutex); } if (get_a_share && share) { free_share(&share); share= 0; } if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } Loading Loading @@ -1529,12 +1530,12 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, MEM_ROOT *mem_root) { DBUG_ENTER("ndb_binlog_thread_handle_schema_event"); NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData(); if (share && schema_share == share) NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData(); if (tmp_share && schema_share == tmp_share) { NDBEVENT::TableEvent ev_type= pOp->getEventType(); DBUG_PRINT("enter", ("%s.%s ev_type: %d", share->db, share->table_name, ev_type)); tmp_share->db, tmp_share->table_name, ev_type)); if (ev_type == NDBEVENT::TE_UPDATE || ev_type == NDBEVENT::TE_INSERT) { Loading @@ -1543,7 +1544,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, MY_BITMAP slock; bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(share, schema); ndbcluster_get_schema(tmp_share, schema); if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; Loading Loading @@ -1628,18 +1629,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, break; case SOT_CLEAR_SLOCK: { if (share) { pthread_mutex_lock(&share->mutex); memcpy(share->slock, schema->slock, sizeof(share->slock)); DBUG_DUMP("share->slock_bitmap.bitmap", (char*)share->slock_bitmap.bitmap, no_bytes_in_map(&share->slock_bitmap)); pthread_mutex_unlock(&share->mutex); pthread_mutex_lock(&ndbcluster_mutex); NDB_SCHEMA_OBJECT *ndb_schema_object= (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects, (byte*) key, strlen(key)); if (ndb_schema_object) { pthread_mutex_lock(&ndb_schema_object->mutex); memcpy(ndb_schema_object->slock, schema->slock, sizeof(ndb_schema_object->slock)); DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", (char*)ndb_schema_object->slock_bitmap.bitmap, no_bytes_in_map(&ndb_schema_object->slock_bitmap)); pthread_mutex_unlock(&ndb_schema_object->mutex); pthread_cond_signal(&injector_cond); free_share(&share); share= 0; } if (share) free_share(&share, TRUE); pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } case SOT_TABLESPACE: Loading Loading @@ -1687,24 +1694,24 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, schema_share= 0; // fall through case NDBEVENT::TE_ALTER: ndb_handle_schema_change(thd, ndb, pOp, share); ndb_handle_schema_change(thd, ndb, pOp, tmp_share); break; case NDBEVENT::TE_NODE_FAILURE: { uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; DBUG_ASSERT(node_id != 0xFF); (void) pthread_mutex_lock(&share->mutex); bitmap_clear_all(&share->subscriber_bitmap[node_id]); (void) pthread_mutex_lock(&tmp_share->mutex); bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]); DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id)); if (ndb_extra_logging) { sql_print_information("NDB Binlog: Node: %d, down," " Subscriber bitmask %x%x", pOp->getNdbdNodeId(), share->subscriber_bitmap[node_id].bitmap[1], share->subscriber_bitmap[node_id].bitmap[0]); tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&tmp_share->mutex); (void) pthread_cond_signal(&injector_cond); break; } Loading @@ -1713,8 +1720,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; uint8 req_id= pOp->getReqNodeId(); DBUG_ASSERT(req_id != 0 && node_id != 0xFF); (void) pthread_mutex_lock(&share->mutex); bitmap_set_bit(&share->subscriber_bitmap[node_id], req_id); (void) pthread_mutex_lock(&tmp_share->mutex); bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id); DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id)); if (ndb_extra_logging) { Loading @@ -1722,10 +1729,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, " Subscriber bitmask %x%x", pOp->getNdbdNodeId(), req_id, share->subscriber_bitmap[node_id].bitmap[1], share->subscriber_bitmap[node_id].bitmap[0]); tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&tmp_share->mutex); (void) pthread_cond_signal(&injector_cond); break; } Loading @@ -1734,8 +1741,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; uint8 req_id= pOp->getReqNodeId(); DBUG_ASSERT(req_id != 0 && node_id != 0xFF); (void) pthread_mutex_lock(&share->mutex); bitmap_clear_bit(&share->subscriber_bitmap[node_id], req_id); (void) pthread_mutex_lock(&tmp_share->mutex); bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id); DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id)); if (ndb_extra_logging) { Loading @@ -1743,16 +1750,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, " Subscriber bitmask %x%x", pOp->getNdbdNodeId(), req_id, share->subscriber_bitmap[node_id].bitmap[1], share->subscriber_bitmap[node_id].bitmap[0]); tmp_share->subscriber_bitmap[node_id].bitmap[1], tmp_share->subscriber_bitmap[node_id].bitmap[0]); } (void) pthread_mutex_unlock(&share->mutex); (void) pthread_mutex_unlock(&tmp_share->mutex); (void) pthread_cond_signal(&injector_cond); break; } default: sql_print_error("NDB Binlog: unknown non data event %d for %s. " "Ignoring...", (unsigned) ev_type, share->key); "Ignoring...", (unsigned) ev_type, tmp_share->key); } } DBUG_RETURN(0); Loading Loading @@ -2888,6 +2895,90 @@ class Timer Injector thread main loop ****************************************************************/ static byte *ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object, uint *length, my_bool not_used __attribute__((unused))) { *length= schema_object->key_length; return (byte*) schema_object->key; } static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key, my_bool create_if_not_exists, my_bool have_lock) { NDB_SCHEMA_OBJECT *ndb_schema_object; uint length= (uint) strlen(key); DBUG_ENTER("ndb_get_schema_object"); DBUG_PRINT("enter", ("key: '%s'", key)); if (!have_lock) pthread_mutex_lock(&ndbcluster_mutex); while (!(ndb_schema_object= (NDB_SCHEMA_OBJECT*) hash_search(&ndb_schema_objects, (byte*) key, length))) { if (!create_if_not_exists) { DBUG_PRINT("info", ("does not exist")); break; } if (!(ndb_schema_object= (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1, MYF(MY_WME | MY_ZEROFILL)))) { DBUG_PRINT("info", ("malloc error")); break; } ndb_schema_object->key= (char *)(ndb_schema_object+1); memcpy(ndb_schema_object->key, key, length + 1); ndb_schema_object->key_length= length; if (my_hash_insert(&ndb_schema_objects, (byte*) ndb_schema_object)) { my_free((gptr) ndb_schema_object, 0); break; } pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST); bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock, sizeof(ndb_schema_object->slock)*8, false); bitmap_clear_all(&ndb_schema_object->slock_bitmap); break; } if (ndb_schema_object) { ndb_schema_object->use_count++; DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count)); } if (!have_lock) pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(ndb_schema_object); } static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object, bool have_lock) { DBUG_ENTER("ndb_free_schema_object"); DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key)); if (!have_lock) pthread_mutex_lock(&ndbcluster_mutex); if (!--(*ndb_schema_object)->use_count) { DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count)); hash_delete(&ndb_schema_objects, (byte*) *ndb_schema_object); pthread_mutex_destroy(&(*ndb_schema_object)->mutex); my_free((gptr) *ndb_schema_object, MYF(0)); *ndb_schema_object= 0; } else { DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count)); } if (!have_lock) pthread_mutex_unlock(&ndbcluster_mutex); DBUG_VOID_RETURN; } pthread_handler_t ndb_binlog_thread_func(void *arg) { THD *thd; /* needs to be first for thread_stack */ Loading Loading @@ -2961,6 +3052,10 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) goto err; } /* init hash for schema object distribution */ (void) hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0, (hash_get_key)ndb_schema_objects_get_key, 0, 0); /* Expose global reference to our ndb object. Loading Loading @@ -3360,6 +3455,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) ndb= 0; } hash_free(&ndb_schema_objects); // Placed here to avoid a memory leak; TODO: check if needed net_end(&thd->net); delete thd; Loading