Loading sql/ha_ndbcluster.cc +16 −48 Original line number Diff line number Diff line Loading @@ -4189,8 +4189,8 @@ int ha_ndbcluster::create(const char *name, if ((my_errno= write_ndb_file(name))) DBUG_RETURN(my_errno); #ifdef HAVE_NDB_BINLOG if (ndb_binlog_thread_running > 0) ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0); ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2), m_dbname, m_tabname, FALSE); #endif /* HAVE_NDB_BINLOG */ DBUG_RETURN(my_errno); } Loading Loading @@ -4386,6 +4386,8 @@ int ha_ndbcluster::create(const char *name, " Event: %s", name2); /* a warning has been issued to the client */ } if (share && ndb_binlog_thread_running <= 0) share->flags|= NSF_NO_BINLOG; ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, share->db, share->table_name, Loading Loading @@ -5460,7 +5462,7 @@ int ndbcluster_find_all_files(THD *thd) /* no such database defined, skip table */ continue; } strxnmov(end, FN_LEN-1-(key-end), "/", elmt.name, NullS); end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS); const void *data= 0, *pack_data= 0; uint length, pack_length; int discover= 0; Loading @@ -5486,41 +5488,25 @@ int ndbcluster_find_all_files(THD *thd) my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR)); pthread_mutex_lock(&LOCK_open); if (discover) { /* ToDo 4.1 database needs to be created if missing */ pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, elmt.database, elmt.name)) { /* ToDo 4.1 handle error */ } pthread_mutex_unlock(&LOCK_open); } #ifdef HAVE_NDB_BINLOG else if (ndb_binlog_thread_running > 0) else { /* set up replication for this table */ NDB_SHARE *share; pthread_mutex_lock(&ndbcluster_mutex); if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, (byte*) key, strlen(key))) && share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG)) || share == 0) { /* there is no binlog creation setup for this table attempt to do it */ pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_lock(&LOCK_open); ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name, share); pthread_mutex_unlock(&LOCK_open); } else pthread_mutex_unlock(&ndbcluster_mutex); ndbcluster_create_binlog_setup(ndb, key, end-key, elmt.database, elmt.name, TRUE); } #endif pthread_mutex_unlock(&LOCK_open); } } while (unhandled && retries--); Loading Loading @@ -5635,36 +5621,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, #ifdef HAVE_NDB_BINLOG /* setup logging to binlog for all discovered tables */ if (ndb_binlog_thread_running > 0) { char *end; char *end1= char *end, *end1= strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS); NDB_SHARE *share; pthread_mutex_lock(&ndbcluster_mutex); for (i= 0; i < ok_tables.records; i++) { file_name= (char*)hash_element(&ok_tables, i); end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS); if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, (byte*)name, end - name)) && share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG)) { /* there is no binlog creation setup for this table attempt to do it */ pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_lock(&LOCK_open); ndbcluster_create_binlog_setup(ndb, name, db, file_name, share); ndbcluster_create_binlog_setup(ndb, name, end-name, db, file_name, TRUE); pthread_mutex_unlock(&LOCK_open); pthread_mutex_lock(&ndbcluster_mutex); } /* Table existed in the mysqld so there should be a share */ DBUG_ASSERT(share != NULL); } pthread_mutex_unlock(&ndbcluster_mutex); } #endif Loading sql/ha_ndbcluster_binlog.cc +39 −13 Original line number Diff line number Diff line Loading @@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) share->table= 0; if (ndb_binlog_thread_running <= 0) { DBUG_ASSERT(_table != 0); if (_table) { if (_table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; if (_table->s->blob_fields != 0) share->flags|= NSF_BLOB_FLAG; } else { share->flags|= NSF_NO_BINLOG; } return; } while (1) Loading Loading @@ -1626,25 +1632,37 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl) create/discover. */ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, uint key_len, const char *db, const char *table_name, NDB_SHARE *share) my_bool share_may_exist) { DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d", key, key_len, db, table_name, share_may_exist)); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); DBUG_ASSERT(strlen(key) == key_len); pthread_mutex_lock(&ndbcluster_mutex); /* Handle any trailing share */ if (share == 0) NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, (byte*) key, key_len); if (share && share_may_exist) { if (share->flags & NSF_NO_BINLOG || share->op != 0 || share->op_old != 0) { share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, (byte*) key, strlen(key)); pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); // replication already setup, or should not } } if (share) { handle_trailing_share(share); } else handle_trailing_share(share); /* Create share which is needed to hold replication information */ if (!(share= get_share(key, 0, true, true))) Loading @@ -1652,6 +1670,13 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, sql_print_error("NDB Binlog: " "allocating table share for %s failed", key); } if (ndb_binlog_thread_running <= 0) { share->flags|= NSF_NO_BINLOG; pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } pthread_mutex_unlock(&ndbcluster_mutex); while (share && !IS_TMP_PREFIX(table_name)) Loading Loading @@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, sql_print_error("NDB Binlog: logging of table %s " "with no PK and blob attributes is not supported", share->key); share->flags|= NSF_NO_BINLOG; DBUG_RETURN(-1); } /* No primary key, subscribe for all attributes */ Loading sql/ha_ndbcluster_binlog.h +2 −1 Original line number Diff line number Diff line Loading @@ -72,9 +72,10 @@ void ndbcluster_binlog_init_handlerton(); void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table); int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, uint key_len, const char *db, const char *table_name, NDB_SHARE *share); my_bool share_may_exist); int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table, const char *event_name, NDB_SHARE *share); int ndbcluster_create_event_ops(NDB_SHARE *share, Loading Loading
sql/ha_ndbcluster.cc +16 −48 Original line number Diff line number Diff line Loading @@ -4189,8 +4189,8 @@ int ha_ndbcluster::create(const char *name, if ((my_errno= write_ndb_file(name))) DBUG_RETURN(my_errno); #ifdef HAVE_NDB_BINLOG if (ndb_binlog_thread_running > 0) ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0); ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2), m_dbname, m_tabname, FALSE); #endif /* HAVE_NDB_BINLOG */ DBUG_RETURN(my_errno); } Loading Loading @@ -4386,6 +4386,8 @@ int ha_ndbcluster::create(const char *name, " Event: %s", name2); /* a warning has been issued to the client */ } if (share && ndb_binlog_thread_running <= 0) share->flags|= NSF_NO_BINLOG; ndbcluster_log_schema_op(current_thd, share, current_thd->query, current_thd->query_length, share->db, share->table_name, Loading Loading @@ -5460,7 +5462,7 @@ int ndbcluster_find_all_files(THD *thd) /* no such database defined, skip table */ continue; } strxnmov(end, FN_LEN-1-(key-end), "/", elmt.name, NullS); end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS); const void *data= 0, *pack_data= 0; uint length, pack_length; int discover= 0; Loading @@ -5486,41 +5488,25 @@ int ndbcluster_find_all_files(THD *thd) my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR)); my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR)); pthread_mutex_lock(&LOCK_open); if (discover) { /* ToDo 4.1 database needs to be created if missing */ pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, elmt.database, elmt.name)) { /* ToDo 4.1 handle error */ } pthread_mutex_unlock(&LOCK_open); } #ifdef HAVE_NDB_BINLOG else if (ndb_binlog_thread_running > 0) else { /* set up replication for this table */ NDB_SHARE *share; pthread_mutex_lock(&ndbcluster_mutex); if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, (byte*) key, strlen(key))) && share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG)) || share == 0) { /* there is no binlog creation setup for this table attempt to do it */ pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_lock(&LOCK_open); ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name, share); pthread_mutex_unlock(&LOCK_open); } else pthread_mutex_unlock(&ndbcluster_mutex); ndbcluster_create_binlog_setup(ndb, key, end-key, elmt.database, elmt.name, TRUE); } #endif pthread_mutex_unlock(&LOCK_open); } } while (unhandled && retries--); Loading Loading @@ -5635,36 +5621,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path, #ifdef HAVE_NDB_BINLOG /* setup logging to binlog for all discovered tables */ if (ndb_binlog_thread_running > 0) { char *end; char *end1= char *end, *end1= strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS); NDB_SHARE *share; pthread_mutex_lock(&ndbcluster_mutex); for (i= 0; i < ok_tables.records; i++) { file_name= (char*)hash_element(&ok_tables, i); end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS); if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables, (byte*)name, end - name)) && share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG)) { /* there is no binlog creation setup for this table attempt to do it */ pthread_mutex_unlock(&ndbcluster_mutex); pthread_mutex_lock(&LOCK_open); ndbcluster_create_binlog_setup(ndb, name, db, file_name, share); ndbcluster_create_binlog_setup(ndb, name, end-name, db, file_name, TRUE); pthread_mutex_unlock(&LOCK_open); pthread_mutex_lock(&ndbcluster_mutex); } /* Table existed in the mysqld so there should be a share */ DBUG_ASSERT(share != NULL); } pthread_mutex_unlock(&ndbcluster_mutex); } #endif Loading
sql/ha_ndbcluster_binlog.cc +39 −13 Original line number Diff line number Diff line Loading @@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) share->table= 0; if (ndb_binlog_thread_running <= 0) { DBUG_ASSERT(_table != 0); if (_table) { if (_table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; if (_table->s->blob_fields != 0) share->flags|= NSF_BLOB_FLAG; } else { share->flags|= NSF_NO_BINLOG; } return; } while (1) Loading Loading @@ -1626,25 +1632,37 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl) create/discover. */ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, uint key_len, const char *db, const char *table_name, NDB_SHARE *share) my_bool share_may_exist) { DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d", key, key_len, db, table_name, share_may_exist)); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); DBUG_ASSERT(strlen(key) == key_len); pthread_mutex_lock(&ndbcluster_mutex); /* Handle any trailing share */ if (share == 0) NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, (byte*) key, key_len); if (share && share_may_exist) { if (share->flags & NSF_NO_BINLOG || share->op != 0 || share->op_old != 0) { share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables, (byte*) key, strlen(key)); pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); // replication already setup, or should not } } if (share) { handle_trailing_share(share); } else handle_trailing_share(share); /* Create share which is needed to hold replication information */ if (!(share= get_share(key, 0, true, true))) Loading @@ -1652,6 +1670,13 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, sql_print_error("NDB Binlog: " "allocating table share for %s failed", key); } if (ndb_binlog_thread_running <= 0) { share->flags|= NSF_NO_BINLOG; pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } pthread_mutex_unlock(&ndbcluster_mutex); while (share && !IS_TMP_PREFIX(table_name)) Loading Loading @@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, sql_print_error("NDB Binlog: logging of table %s " "with no PK and blob attributes is not supported", share->key); share->flags|= NSF_NO_BINLOG; DBUG_RETURN(-1); } /* No primary key, subscribe for all attributes */ Loading
sql/ha_ndbcluster_binlog.h +2 −1 Original line number Diff line number Diff line Loading @@ -72,9 +72,10 @@ void ndbcluster_binlog_init_handlerton(); void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table); int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, uint key_len, const char *db, const char *table_name, NDB_SHARE *share); my_bool share_may_exist); int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table, const char *event_name, NDB_SHARE *share); int ndbcluster_create_event_ops(NDB_SHARE *share, Loading