Loading mysql-test/r/ndb_multi_row.result +2 −0 Original line number Diff line number Diff line Loading @@ -62,4 +62,6 @@ t4 drop table t1, t2, t3, t4; drop table if exists t1, t3, t4; Warnings: Error 155 Table 'test.t1' doesn't exist Error 155 Table 'test.t3' doesn't exist Error 155 Table 'test.t4' doesn't exist sql/ha_ndbcluster.cc +353 −462 File changed.Preview size limit exceeded, changes collapsed. Show changes sql/ha_ndbcluster.h +4 −10 Original line number Diff line number Diff line Loading @@ -70,8 +70,8 @@ typedef enum ndb_index_status { typedef struct ndb_index_data { NDB_INDEX_TYPE type; NDB_INDEX_STATUS status; void *index; void *unique_index; const NdbDictionary::Index *index; const NdbDictionary::Index *unique_index; unsigned char *unique_index_attrid_map; // In this version stats are not shared between threads NdbIndexStat* index_stat; Loading Loading @@ -560,6 +560,7 @@ class ha_ndbcluster: public handler ha_ndbcluster(TABLE_SHARE *table); ~ha_ndbcluster(); int ha_initialise(); int open(const char *name, int mode, uint test_if_locked); int close(void); Loading Loading @@ -708,19 +709,15 @@ static void set_tabname(const char *pathname, char *tabname); Ndb *ndb, NdbEventOperation *pOp, NDB_SHARE *share); int alter_table_name(const char *to); static int delete_table(ha_ndbcluster *h, Ndb *ndb, const char *path, const char *db, const char *table_name); int drop_ndb_table(); int create_ndb_index(const char *name, KEY *key_info, bool unique); int create_ordered_index(const char *name, KEY *key_info); int create_unique_index(const char *name, KEY *key_info); int create_index(const char *name, KEY *key_info, NDB_INDEX_TYPE idx_type, uint idx_no); int drop_ndb_index(const char *name); int table_changed(const void *pack_frm_data, uint pack_frm_len); // Index list management int create_indexes(Ndb *ndb, TABLE *tab); void clear_index(int i); Loading @@ -732,7 +729,7 @@ static void set_tabname(const char *pathname, char *tabname); KEY *key_info, const char *index_name, uint index_no); int initialize_autoincrement(const void *table); int get_metadata(const char* path); void release_metadata(); void release_metadata(THD *thd, Ndb *ndb); NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info, Loading Loading @@ -795,8 +792,6 @@ static void set_tabname(const char *pathname, char *tabname); void print_results(); ulonglong get_auto_increment(); int invalidate_dictionary_cache(bool global, const NdbDictionary::Table *ndbtab); int ndb_err(NdbTransaction*); bool uses_blob_value(); Loading Loading @@ -834,7 +829,6 @@ static void set_tabname(const char *pathname, char *tabname); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; const NdbDictionary::Table *m_table; int m_table_version; struct Ndb_local_table_statistics *m_table_info; char m_dbname[FN_HEADLEN]; //char m_schemaname[FN_HEADLEN]; Loading sql/ha_ndbcluster_binlog.cc +162 −115 Original line number Diff line number Diff line Loading @@ -986,7 +986,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, uint32 ndb_table_id, uint32 ndb_table_version, enum SCHEMA_OP_TYPE type, const char *old_db, const char *old_table_name) const char *new_db, const char *new_table_name) { DBUG_ENTER("ndbcluster_log_schema_op"); Thd_ndb *thd_ndb= get_thd_ndb(thd); Loading Loading @@ -1026,8 +1026,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, /* redo the rename table query as is may contain several tables */ query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); db, ".", table_name, "` to `", new_db, ".", new_table_name, "`", NullS) - tmp_buf2); type_str= "rename table"; break; case SOT_CREATE_TABLE: Loading Loading @@ -1067,6 +1067,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, Uint64 epoch= 0; MY_BITMAP schema_subscribers; uint32 bitbuf[sizeof(ndb_schema_object->slock)/4]; uint32 bitbuf_e[sizeof(bitbuf)]; bzero((char *)bitbuf_e, sizeof(bitbuf_e)); { int i, updated= 0; int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); Loading Loading @@ -1110,7 +1112,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE); const NDBTAB *ndbtab= ndbtab_g.get_table(); NdbTransaction *trans= 0; int retries= 100; const NDBCOL *col[SCHEMA_SIZE]; Loading Loading @@ -1141,8 +1144,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, while (1) { const char *log_db= db; const char *log_tab= table_name; const char *log_subscribers= (char*)schema_subscribers.bitmap; uint32 log_type= (uint32)type; if ((trans= ndb->startTransaction()) == 0) goto err; while (1) { NdbOperation *op= 0; int r= 0; Loading @@ -1152,17 +1160,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, DBUG_ASSERT(r == 0); /* db */ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db)); ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db)); r|= op->equal(SCHEMA_DB_I, tmp_buf); DBUG_ASSERT(r == 0); /* name */ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name, strlen(table_name)); ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab, strlen(log_tab)); r|= op->equal(SCHEMA_NAME_I, tmp_buf); DBUG_ASSERT(r == 0); /* slock */ DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf)); r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap); r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers); DBUG_ASSERT(r == 0); /* query */ { Loading @@ -1186,8 +1194,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version); DBUG_ASSERT(r == 0); /* type */ r|= op->setValue(SCHEMA_TYPE_I, (uint32)type); r|= op->setValue(SCHEMA_TYPE_I, log_type); DBUG_ASSERT(r == 0); if (log_db != new_db && new_db && new_table_name) { log_db= new_db; log_tab= new_table_name; log_subscribers= (const char *)bitbuf_e; // no ack expected on this log_type= (uint32)SOT_RENAME_TABLE_NEW; continue; } break; } if (trans->execute(NdbTransaction::Commit) == 0) { Loading Loading @@ -1306,7 +1323,8 @@ ndbcluster_update_slock(THD *thd, char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE); const NDBTAB *ndbtab= ndbtab_g.get_table(); NdbTransaction *trans= 0; int retries= 100; const NDBCOL *col[SCHEMA_SIZE]; Loading Loading @@ -1452,30 +1470,27 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, { if (pOp->tableFrmChanged()) { DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed")); is_online_alter_table= TRUE; } else { DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed")); DBUG_ASSERT(pOp->tableNameChanged()); is_rename_table= TRUE; } } /* Refresh local dictionary cache by invalidating table and all it's indexes */ { ndb->setDatabaseName(dbname); Thd_ndb *thd_ndb= get_thd_ndb(thd); DBUG_ASSERT(thd_ndb != NULL); Ndb* old_ndb= thd_ndb->ndb; thd_ndb->ndb= ndb; ha_ndbcluster table_handler(table_share); (void)strxmov(table_handler.m_dbname, dbname, NullS); (void)strxmov(table_handler.m_tabname, tabname, NullS); table_handler.open_indexes(ndb, table, TRUE); table_handler.invalidate_dictionary_cache(TRUE, 0); thd_ndb->ndb= old_ndb; Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname); const NDBTAB *ev_tab= pOp->getTable(); const NDBTAB *cache_tab= ndbtab_g.get_table(); if (cache_tab && cache_tab->getObjectId() == ev_tab->getObjectId() && cache_tab->getObjectVersion() <= ev_tab->getObjectVersion()) ndbtab_g.invalidate(); } /* Refresh local frm file and dictionary cache if Loading Loading @@ -1505,7 +1520,8 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, DBUG_DUMP("frm", (char*)altered_table->getFrmData(), altered_table->getFrmLength()); pthread_mutex_lock(&LOCK_open); const NDBTAB *old= dict->getTable(tabname); Ndb_table_guard ndbtab_g(dict, tabname); const NDBTAB *old= ndbtab_g.get_table(); if (!old && old->getObjectVersion() != altered_table->getObjectVersion()) dict->putTable(altered_table); Loading @@ -1517,7 +1533,13 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, dbname, tabname, error); } ndbcluster_binlog_close_table(thd, share); close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); TABLE_LIST table_list; bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; close_cached_tables(thd, 0, &table_list, TRUE); if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table))) sql_print_information("NDB: Failed to re-open table %s.%s", Loading Loading @@ -1545,26 +1567,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, share_prefix, share->table->s->db.str, share->table->s->table_name.str, share->key); { ndb->setDatabaseName(share->table->s->db.str); Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table->s->table_name.str); const NDBTAB *ev_tab= pOp->getTable(); const NDBTAB *cache_tab= ndbtab_g.get_table(); if (cache_tab && cache_tab->getObjectId() == ev_tab->getObjectId() && cache_tab->getObjectVersion() <= ev_tab->getObjectVersion()) ndbtab_g.invalidate(); } /* do the rename of the table in the share */ share->table->s->db.str= share->db; share->table->s->db.length= strlen(share->db); share->table->s->table_name.str= share->table_name; share->table->s->table_name.length= strlen(share->table_name); /* Refresh local dictionary cache by invalidating any old table with same name and all it's indexes */ ndb->setDatabaseName(dbname); Thd_ndb *thd_ndb= get_thd_ndb(thd); DBUG_ASSERT(thd_ndb != NULL); Ndb* old_ndb= thd_ndb->ndb; thd_ndb->ndb= ndb; ha_ndbcluster table_handler(table_share); table_handler.set_dbname(share->key); table_handler.set_tabname(share->key); table_handler.open_indexes(ndb, table, TRUE); table_handler.invalidate_dictionary_cache(TRUE, 0); thd_ndb->ndb= old_ndb; } DBUG_ASSERT(share->op == pOp || share->op_old == pOp); if (share->op_old == pOp) Loading @@ -1582,14 +1600,19 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, if (is_remote_change && share && share->state != NSS_DROPPED) { DBUG_PRINT("info", ("remote change")); share->state= NSS_DROPPED; if (share->use_count != 1) do_close_cached_tables= TRUE; share->state= NSS_DROPPED; else { free_share(&share, TRUE); share= 0; } } else share= 0; pthread_mutex_unlock(&ndbcluster_mutex); share= 0; pOp->setCustomData(0); pthread_mutex_lock(&injector_mutex); Loading @@ -1598,7 +1621,14 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, pthread_mutex_unlock(&injector_mutex); if (do_close_cached_tables) close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0); { TABLE_LIST table_list; bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; close_cached_tables(thd, 0, &table_list); free_share(&share); } DBUG_RETURN(0); } Loading Loading @@ -1630,53 +1660,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; DBUG_PRINT("info", ("log query_length: %d query: '%s'", schema->query_length, schema->query)); DBUG_PRINT("info", ("%s.%s: log query_length: %d query: '%s' type: %d", schema->db, schema->name, schema->query_length, schema->query, schema->type)); char key[FN_REFLEN]; build_table_filename(key, sizeof(key), schema->db, schema->name, ""); NDB_SHARE *share= get_share(key, 0, false, false); switch ((enum SCHEMA_OP_TYPE)schema->type) { case SOT_DROP_TABLE: /* binlog dropping table after any table operations */ if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } /* table is either ignored or logging is postponed to later */ log_query= 0; break; // fall through case SOT_RENAME_TABLE: if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; // fall through case SOT_RENAME_TABLE_NEW: // fall through case SOT_ALTER_TABLE: if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; break; case SOT_CREATE_TABLE: sot_create_table: /* we need to free any share here as command below may need to call handle_trailing_share */ if (share) { free_share(&share); share= 0; } pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) { Loading @@ -1694,12 +1698,9 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE, /* print error */ TRUE); /* don't binlog the query */ /* binlog dropping database after any table operations */ if (ndb_binlog_running) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } break; case SOT_CREATE_DB: /* fall through */ Loading @@ -1726,8 +1727,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, pthread_mutex_unlock(&ndb_schema_object->mutex); pthread_cond_signal(&injector_cond); } if (share) free_share(&share, TRUE); pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } Loading @@ -1736,11 +1735,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, log_query= 1; break; } if (share) { free_share(&share); share= 0; } if (log_query && ndb_binlog_running) { char *thd_db_save= thd->db; Loading Loading @@ -1864,26 +1858,69 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, List<Cluster_schema> *post_epoch_unlock_list) { if (post_epoch_log_list->elements == 0) return; DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch"); Cluster_schema *schema; while ((schema= post_epoch_log_list->pop())) { DBUG_PRINT("info", ("log query_length: %d query: '%s'", schema->query_length, schema->query)); DBUG_PRINT("info", ("%s.%s: log query_length: %d query: '%s' type: %d", schema->db, schema->name, schema->query_length, schema->query, schema->type)); int log_query= 0; { char key[FN_REFLEN]; build_table_filename(key, sizeof(key), schema->db, schema->name, ""); NDB_SHARE *share= get_share(key, 0, false, false); switch ((enum SCHEMA_OP_TYPE)schema->type) enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type; switch (schema_type) { case SOT_DROP_DB: log_query= 1; break; case SOT_DROP_TABLE: // invalidation already handled by binlog thread if (share && share->op) { log_query= 1; break; } // fall through case SOT_RENAME_TABLE: // fall through case SOT_ALTER_TABLE: if (share && share->op) // invalidation already handled by binlog thread if (!share || !share->op) { { injector_ndb->setDatabaseName(schema->db); Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(), schema->name); ndbtab_g.invalidate(); } TABLE_LIST table_list; bzero((char*) &table_list,sizeof(table_list)); table_list.db= schema->db; table_list.alias= table_list.table_name= schema->name; close_cached_tables(thd, 0, &table_list, FALSE); } if (schema_type != SOT_ALTER_TABLE) break; // fall through case SOT_RENAME_TABLE_NEW: log_query= 1; if (ndb_binlog_running) { /* we need to free any share here as command below may need to call handle_trailing_share */ if (share) { break; /* discovery handled by binlog */ free_share(&share); share= 0; } pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) Loading @@ -1894,6 +1931,8 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, schema->node_id); } pthread_mutex_unlock(&LOCK_open); } break; default: DBUG_ASSERT(false); } Loading @@ -1903,6 +1942,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, share= 0; } } if (ndb_binlog_running && log_query) { char *thd_db_save= thd->db; thd->db= schema->db; Loading Loading @@ -2186,7 +2226,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ndb->setDatabaseName(db); NDBDICT *dict= ndb->getDictionary(); const NDBTAB *ndbtab= dict->getTable(table_name); Ndb_table_guard ndbtab_g(dict, table_name); const NDBTAB *ndbtab= ndbtab_g.get_table(); if (ndbtab == 0) { if (ndb_extra_logging) Loading @@ -2201,7 +2242,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, event should have been created by someone else, but let's make sure, and create if it doesn't exist */ if (!dict->getEvent(event_name.c_ptr())) const NDBEVENT *ev= dict->getEvent(event_name.c_ptr()); if (!ev) { if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share)) { Loading @@ -2216,9 +2258,12 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, event_name.c_ptr()); } else { delete ev; if (ndb_extra_logging) sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s", event_name.c_ptr()); } /* create the event operations for receiving logging events Loading Loading @@ -2328,8 +2373,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, try retrieving the event, if table version/id matches, we will get a valid event. Otherwise we have a trailing event from before */ if (dict->getEvent(event_name)) const NDBEVENT *ev; if ((ev= dict->getEvent(event_name))) { delete ev; DBUG_RETURN(0); } Loading sql/ha_ndbcluster_binlog.h +44 −4 Original line number Diff line number Diff line Loading @@ -41,14 +41,15 @@ enum SCHEMA_OP_TYPE { SOT_DROP_TABLE= 0, SOT_CREATE_TABLE= 1, SOT_RENAME_TABLE= 2, SOT_RENAME_TABLE_NEW= 2, SOT_ALTER_TABLE= 3, SOT_DROP_DB= 4, SOT_CREATE_DB= 5, SOT_ALTER_DB= 6, SOT_CLEAR_SLOCK= 7, SOT_TABLESPACE= 8, SOT_LOGFILE_GROUP= 9 SOT_LOGFILE_GROUP= 9, SOT_RENAME_TABLE= 10 }; const uint max_ndb_nodes= 64; /* multiple of 32 */ Loading @@ -56,6 +57,45 @@ const uint max_ndb_nodes= 64; /* multiple of 32 */ static const char *ha_ndb_ext=".ndb"; static const char share_prefix[]= "./"; class Ndb_table_guard { public: Ndb_table_guard(NDBDICT *dict, const char *tabname) : m_dict(dict) { DBUG_ENTER("Ndb_table_guard"); m_ndbtab= m_dict->getTableGlobal(tabname); m_invalidate= 0; DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab)); DBUG_VOID_RETURN; } ~Ndb_table_guard() { DBUG_ENTER("~Ndb_table_guard"); if (m_ndbtab) { DBUG_PRINT("info", ("m_ndbtab: %p m_invalidate: %d", m_ndbtab, m_invalidate)); m_dict->removeTableGlobal(*m_ndbtab, m_invalidate); } DBUG_VOID_RETURN; } const NDBTAB *get_table() { return m_ndbtab; } void invalidate() { m_invalidate= 1; } const NDBTAB *release() { DBUG_ENTER("Ndb_table_guard::release"); const NDBTAB *tmp= m_ndbtab; DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab)); m_ndbtab = 0; DBUG_RETURN(tmp); } private: const NDBTAB *m_ndbtab; NDBDICT *m_dict; int m_invalidate; }; #ifdef HAVE_NDB_BINLOG extern pthread_t ndb_binlog_thread; extern pthread_mutex_t injector_mutex; Loading Loading @@ -98,8 +138,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, uint32 ndb_table_id, uint32 ndb_table_version, enum SCHEMA_OP_TYPE type, const char *old_db= 0, const char *old_table_name= 0); const char *new_db= 0, const char *new_table_name= 0); int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share, const char *type_str); Loading Loading
mysql-test/r/ndb_multi_row.result +2 −0 Original line number Diff line number Diff line Loading @@ -62,4 +62,6 @@ t4 drop table t1, t2, t3, t4; drop table if exists t1, t3, t4; Warnings: Error 155 Table 'test.t1' doesn't exist Error 155 Table 'test.t3' doesn't exist Error 155 Table 'test.t4' doesn't exist
sql/ha_ndbcluster.cc +353 −462 File changed.Preview size limit exceeded, changes collapsed. Show changes
sql/ha_ndbcluster.h +4 −10 Original line number Diff line number Diff line Loading @@ -70,8 +70,8 @@ typedef enum ndb_index_status { typedef struct ndb_index_data { NDB_INDEX_TYPE type; NDB_INDEX_STATUS status; void *index; void *unique_index; const NdbDictionary::Index *index; const NdbDictionary::Index *unique_index; unsigned char *unique_index_attrid_map; // In this version stats are not shared between threads NdbIndexStat* index_stat; Loading Loading @@ -560,6 +560,7 @@ class ha_ndbcluster: public handler ha_ndbcluster(TABLE_SHARE *table); ~ha_ndbcluster(); int ha_initialise(); int open(const char *name, int mode, uint test_if_locked); int close(void); Loading Loading @@ -708,19 +709,15 @@ static void set_tabname(const char *pathname, char *tabname); Ndb *ndb, NdbEventOperation *pOp, NDB_SHARE *share); int alter_table_name(const char *to); static int delete_table(ha_ndbcluster *h, Ndb *ndb, const char *path, const char *db, const char *table_name); int drop_ndb_table(); int create_ndb_index(const char *name, KEY *key_info, bool unique); int create_ordered_index(const char *name, KEY *key_info); int create_unique_index(const char *name, KEY *key_info); int create_index(const char *name, KEY *key_info, NDB_INDEX_TYPE idx_type, uint idx_no); int drop_ndb_index(const char *name); int table_changed(const void *pack_frm_data, uint pack_frm_len); // Index list management int create_indexes(Ndb *ndb, TABLE *tab); void clear_index(int i); Loading @@ -732,7 +729,7 @@ static void set_tabname(const char *pathname, char *tabname); KEY *key_info, const char *index_name, uint index_no); int initialize_autoincrement(const void *table); int get_metadata(const char* path); void release_metadata(); void release_metadata(THD *thd, Ndb *ndb); NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; NDB_INDEX_TYPE get_index_type_from_key(uint index_no, KEY *key_info, Loading Loading @@ -795,8 +792,6 @@ static void set_tabname(const char *pathname, char *tabname); void print_results(); ulonglong get_auto_increment(); int invalidate_dictionary_cache(bool global, const NdbDictionary::Table *ndbtab); int ndb_err(NdbTransaction*); bool uses_blob_value(); Loading Loading @@ -834,7 +829,6 @@ static void set_tabname(const char *pathname, char *tabname); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; const NdbDictionary::Table *m_table; int m_table_version; struct Ndb_local_table_statistics *m_table_info; char m_dbname[FN_HEADLEN]; //char m_schemaname[FN_HEADLEN]; Loading
sql/ha_ndbcluster_binlog.cc +162 −115 Original line number Diff line number Diff line Loading @@ -986,7 +986,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, uint32 ndb_table_id, uint32 ndb_table_version, enum SCHEMA_OP_TYPE type, const char *old_db, const char *old_table_name) const char *new_db, const char *new_table_name) { DBUG_ENTER("ndbcluster_log_schema_op"); Thd_ndb *thd_ndb= get_thd_ndb(thd); Loading Loading @@ -1026,8 +1026,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, /* redo the rename table query as is may contain several tables */ query= tmp_buf2; query_length= (uint) (strxmov(tmp_buf2, "rename table `", old_db, ".", old_table_name, "` to `", db, ".", table_name, "`", NullS) - tmp_buf2); db, ".", table_name, "` to `", new_db, ".", new_table_name, "`", NullS) - tmp_buf2); type_str= "rename table"; break; case SOT_CREATE_TABLE: Loading Loading @@ -1067,6 +1067,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, Uint64 epoch= 0; MY_BITMAP schema_subscribers; uint32 bitbuf[sizeof(ndb_schema_object->slock)/4]; uint32 bitbuf_e[sizeof(bitbuf)]; bzero((char *)bitbuf_e, sizeof(bitbuf_e)); { int i, updated= 0; int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); Loading Loading @@ -1110,7 +1112,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE); const NDBTAB *ndbtab= ndbtab_g.get_table(); NdbTransaction *trans= 0; int retries= 100; const NDBCOL *col[SCHEMA_SIZE]; Loading Loading @@ -1141,8 +1144,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, while (1) { const char *log_db= db; const char *log_tab= table_name; const char *log_subscribers= (char*)schema_subscribers.bitmap; uint32 log_type= (uint32)type; if ((trans= ndb->startTransaction()) == 0) goto err; while (1) { NdbOperation *op= 0; int r= 0; Loading @@ -1152,17 +1160,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, DBUG_ASSERT(r == 0); /* db */ ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db)); ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db)); r|= op->equal(SCHEMA_DB_I, tmp_buf); DBUG_ASSERT(r == 0); /* name */ ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name, strlen(table_name)); ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab, strlen(log_tab)); r|= op->equal(SCHEMA_NAME_I, tmp_buf); DBUG_ASSERT(r == 0); /* slock */ DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf)); r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap); r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers); DBUG_ASSERT(r == 0); /* query */ { Loading @@ -1186,8 +1194,17 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version); DBUG_ASSERT(r == 0); /* type */ r|= op->setValue(SCHEMA_TYPE_I, (uint32)type); r|= op->setValue(SCHEMA_TYPE_I, log_type); DBUG_ASSERT(r == 0); if (log_db != new_db && new_db && new_table_name) { log_db= new_db; log_tab= new_table_name; log_subscribers= (const char *)bitbuf_e; // no ack expected on this log_type= (uint32)SOT_RENAME_TABLE_NEW; continue; } break; } if (trans->execute(NdbTransaction::Commit) == 0) { Loading Loading @@ -1306,7 +1323,8 @@ ndbcluster_update_slock(THD *thd, char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE); const NDBTAB *ndbtab= ndbtab_g.get_table(); NdbTransaction *trans= 0; int retries= 100; const NDBCOL *col[SCHEMA_SIZE]; Loading Loading @@ -1452,30 +1470,27 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, { if (pOp->tableFrmChanged()) { DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed")); is_online_alter_table= TRUE; } else { DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed")); DBUG_ASSERT(pOp->tableNameChanged()); is_rename_table= TRUE; } } /* Refresh local dictionary cache by invalidating table and all it's indexes */ { ndb->setDatabaseName(dbname); Thd_ndb *thd_ndb= get_thd_ndb(thd); DBUG_ASSERT(thd_ndb != NULL); Ndb* old_ndb= thd_ndb->ndb; thd_ndb->ndb= ndb; ha_ndbcluster table_handler(table_share); (void)strxmov(table_handler.m_dbname, dbname, NullS); (void)strxmov(table_handler.m_tabname, tabname, NullS); table_handler.open_indexes(ndb, table, TRUE); table_handler.invalidate_dictionary_cache(TRUE, 0); thd_ndb->ndb= old_ndb; Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname); const NDBTAB *ev_tab= pOp->getTable(); const NDBTAB *cache_tab= ndbtab_g.get_table(); if (cache_tab && cache_tab->getObjectId() == ev_tab->getObjectId() && cache_tab->getObjectVersion() <= ev_tab->getObjectVersion()) ndbtab_g.invalidate(); } /* Refresh local frm file and dictionary cache if Loading Loading @@ -1505,7 +1520,8 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, DBUG_DUMP("frm", (char*)altered_table->getFrmData(), altered_table->getFrmLength()); pthread_mutex_lock(&LOCK_open); const NDBTAB *old= dict->getTable(tabname); Ndb_table_guard ndbtab_g(dict, tabname); const NDBTAB *old= ndbtab_g.get_table(); if (!old && old->getObjectVersion() != altered_table->getObjectVersion()) dict->putTable(altered_table); Loading @@ -1517,7 +1533,13 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, dbname, tabname, error); } ndbcluster_binlog_close_table(thd, share); close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE); TABLE_LIST table_list; bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; close_cached_tables(thd, 0, &table_list, TRUE); if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table))) sql_print_information("NDB: Failed to re-open table %s.%s", Loading Loading @@ -1545,26 +1567,22 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, share_prefix, share->table->s->db.str, share->table->s->table_name.str, share->key); { ndb->setDatabaseName(share->table->s->db.str); Ndb_table_guard ndbtab_g(ndb->getDictionary(), share->table->s->table_name.str); const NDBTAB *ev_tab= pOp->getTable(); const NDBTAB *cache_tab= ndbtab_g.get_table(); if (cache_tab && cache_tab->getObjectId() == ev_tab->getObjectId() && cache_tab->getObjectVersion() <= ev_tab->getObjectVersion()) ndbtab_g.invalidate(); } /* do the rename of the table in the share */ share->table->s->db.str= share->db; share->table->s->db.length= strlen(share->db); share->table->s->table_name.str= share->table_name; share->table->s->table_name.length= strlen(share->table_name); /* Refresh local dictionary cache by invalidating any old table with same name and all it's indexes */ ndb->setDatabaseName(dbname); Thd_ndb *thd_ndb= get_thd_ndb(thd); DBUG_ASSERT(thd_ndb != NULL); Ndb* old_ndb= thd_ndb->ndb; thd_ndb->ndb= ndb; ha_ndbcluster table_handler(table_share); table_handler.set_dbname(share->key); table_handler.set_tabname(share->key); table_handler.open_indexes(ndb, table, TRUE); table_handler.invalidate_dictionary_cache(TRUE, 0); thd_ndb->ndb= old_ndb; } DBUG_ASSERT(share->op == pOp || share->op_old == pOp); if (share->op_old == pOp) Loading @@ -1582,14 +1600,19 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, if (is_remote_change && share && share->state != NSS_DROPPED) { DBUG_PRINT("info", ("remote change")); share->state= NSS_DROPPED; if (share->use_count != 1) do_close_cached_tables= TRUE; share->state= NSS_DROPPED; else { free_share(&share, TRUE); share= 0; } } else share= 0; pthread_mutex_unlock(&ndbcluster_mutex); share= 0; pOp->setCustomData(0); pthread_mutex_lock(&injector_mutex); Loading @@ -1598,7 +1621,14 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp, pthread_mutex_unlock(&injector_mutex); if (do_close_cached_tables) close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0); { TABLE_LIST table_list; bzero((char*) &table_list,sizeof(table_list)); table_list.db= (char *)dbname; table_list.alias= table_list.table_name= (char *)tabname; close_cached_tables(thd, 0, &table_list); free_share(&share); } DBUG_RETURN(0); } Loading Loading @@ -1630,53 +1660,27 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; DBUG_PRINT("info", ("log query_length: %d query: '%s'", schema->query_length, schema->query)); DBUG_PRINT("info", ("%s.%s: log query_length: %d query: '%s' type: %d", schema->db, schema->name, schema->query_length, schema->query, schema->type)); char key[FN_REFLEN]; build_table_filename(key, sizeof(key), schema->db, schema->name, ""); NDB_SHARE *share= get_share(key, 0, false, false); switch ((enum SCHEMA_OP_TYPE)schema->type) { case SOT_DROP_TABLE: /* binlog dropping table after any table operations */ if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } /* table is either ignored or logging is postponed to later */ log_query= 0; break; // fall through case SOT_RENAME_TABLE: if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; // fall through case SOT_RENAME_TABLE_NEW: // fall through case SOT_ALTER_TABLE: if (share && share->op) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; break; /* discovery will be handled by binlog */ } goto sot_create_table; break; case SOT_CREATE_TABLE: sot_create_table: /* we need to free any share here as command below may need to call handle_trailing_share */ if (share) { free_share(&share); share= 0; } pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) { Loading @@ -1694,12 +1698,9 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, TRUE, /* print error */ TRUE); /* don't binlog the query */ /* binlog dropping database after any table operations */ if (ndb_binlog_running) { post_epoch_log_list->push_back(schema, mem_root); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } break; case SOT_CREATE_DB: /* fall through */ Loading @@ -1726,8 +1727,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, pthread_mutex_unlock(&ndb_schema_object->mutex); pthread_cond_signal(&injector_cond); } if (share) free_share(&share, TRUE); pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(0); } Loading @@ -1736,11 +1735,6 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, log_query= 1; break; } if (share) { free_share(&share); share= 0; } if (log_query && ndb_binlog_running) { char *thd_db_save= thd->db; Loading Loading @@ -1864,26 +1858,69 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, List<Cluster_schema> *post_epoch_unlock_list) { if (post_epoch_log_list->elements == 0) return; DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch"); Cluster_schema *schema; while ((schema= post_epoch_log_list->pop())) { DBUG_PRINT("info", ("log query_length: %d query: '%s'", schema->query_length, schema->query)); DBUG_PRINT("info", ("%s.%s: log query_length: %d query: '%s' type: %d", schema->db, schema->name, schema->query_length, schema->query, schema->type)); int log_query= 0; { char key[FN_REFLEN]; build_table_filename(key, sizeof(key), schema->db, schema->name, ""); NDB_SHARE *share= get_share(key, 0, false, false); switch ((enum SCHEMA_OP_TYPE)schema->type) enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type; switch (schema_type) { case SOT_DROP_DB: log_query= 1; break; case SOT_DROP_TABLE: // invalidation already handled by binlog thread if (share && share->op) { log_query= 1; break; } // fall through case SOT_RENAME_TABLE: // fall through case SOT_ALTER_TABLE: if (share && share->op) // invalidation already handled by binlog thread if (!share || !share->op) { { injector_ndb->setDatabaseName(schema->db); Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(), schema->name); ndbtab_g.invalidate(); } TABLE_LIST table_list; bzero((char*) &table_list,sizeof(table_list)); table_list.db= schema->db; table_list.alias= table_list.table_name= schema->name; close_cached_tables(thd, 0, &table_list, FALSE); } if (schema_type != SOT_ALTER_TABLE) break; // fall through case SOT_RENAME_TABLE_NEW: log_query= 1; if (ndb_binlog_running) { /* we need to free any share here as command below may need to call handle_trailing_share */ if (share) { break; /* discovery handled by binlog */ free_share(&share); share= 0; } pthread_mutex_lock(&LOCK_open); if (ndb_create_table_from_engine(thd, schema->db, schema->name)) Loading @@ -1894,6 +1931,8 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, schema->node_id); } pthread_mutex_unlock(&LOCK_open); } break; default: DBUG_ASSERT(false); } Loading @@ -1903,6 +1942,7 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd, share= 0; } } if (ndb_binlog_running && log_query) { char *thd_db_save= thd->db; thd->db= schema->db; Loading Loading @@ -2186,7 +2226,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, ndb->setDatabaseName(db); NDBDICT *dict= ndb->getDictionary(); const NDBTAB *ndbtab= dict->getTable(table_name); Ndb_table_guard ndbtab_g(dict, table_name); const NDBTAB *ndbtab= ndbtab_g.get_table(); if (ndbtab == 0) { if (ndb_extra_logging) Loading @@ -2201,7 +2242,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, event should have been created by someone else, but let's make sure, and create if it doesn't exist */ if (!dict->getEvent(event_name.c_ptr())) const NDBEVENT *ev= dict->getEvent(event_name.c_ptr()); if (!ev) { if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share)) { Loading @@ -2216,9 +2258,12 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, event_name.c_ptr()); } else { delete ev; if (ndb_extra_logging) sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s", event_name.c_ptr()); } /* create the event operations for receiving logging events Loading Loading @@ -2328,8 +2373,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, try retrieving the event, if table version/id matches, we will get a valid event. Otherwise we have a trailing event from before */ if (dict->getEvent(event_name)) const NDBEVENT *ev; if ((ev= dict->getEvent(event_name))) { delete ev; DBUG_RETURN(0); } Loading
sql/ha_ndbcluster_binlog.h +44 −4 Original line number Diff line number Diff line Loading @@ -41,14 +41,15 @@ enum SCHEMA_OP_TYPE { SOT_DROP_TABLE= 0, SOT_CREATE_TABLE= 1, SOT_RENAME_TABLE= 2, SOT_RENAME_TABLE_NEW= 2, SOT_ALTER_TABLE= 3, SOT_DROP_DB= 4, SOT_CREATE_DB= 5, SOT_ALTER_DB= 6, SOT_CLEAR_SLOCK= 7, SOT_TABLESPACE= 8, SOT_LOGFILE_GROUP= 9 SOT_LOGFILE_GROUP= 9, SOT_RENAME_TABLE= 10 }; const uint max_ndb_nodes= 64; /* multiple of 32 */ Loading @@ -56,6 +57,45 @@ const uint max_ndb_nodes= 64; /* multiple of 32 */ static const char *ha_ndb_ext=".ndb"; static const char share_prefix[]= "./"; class Ndb_table_guard { public: Ndb_table_guard(NDBDICT *dict, const char *tabname) : m_dict(dict) { DBUG_ENTER("Ndb_table_guard"); m_ndbtab= m_dict->getTableGlobal(tabname); m_invalidate= 0; DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab)); DBUG_VOID_RETURN; } ~Ndb_table_guard() { DBUG_ENTER("~Ndb_table_guard"); if (m_ndbtab) { DBUG_PRINT("info", ("m_ndbtab: %p m_invalidate: %d", m_ndbtab, m_invalidate)); m_dict->removeTableGlobal(*m_ndbtab, m_invalidate); } DBUG_VOID_RETURN; } const NDBTAB *get_table() { return m_ndbtab; } void invalidate() { m_invalidate= 1; } const NDBTAB *release() { DBUG_ENTER("Ndb_table_guard::release"); const NDBTAB *tmp= m_ndbtab; DBUG_PRINT("info", ("m_ndbtab: %p", m_ndbtab)); m_ndbtab = 0; DBUG_RETURN(tmp); } private: const NDBTAB *m_ndbtab; NDBDICT *m_dict; int m_invalidate; }; #ifdef HAVE_NDB_BINLOG extern pthread_t ndb_binlog_thread; extern pthread_mutex_t injector_mutex; Loading Loading @@ -98,8 +138,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, uint32 ndb_table_id, uint32 ndb_table_version, enum SCHEMA_OP_TYPE type, const char *old_db= 0, const char *old_table_name= 0); const char *new_db= 0, const char *new_table_name= 0); int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name, NDB_SHARE *share, const char *type_str); Loading