Loading sql/ha_ndbcluster.cc +46 −24 Original line number Diff line number Diff line Loading @@ -315,6 +315,14 @@ int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) /* Place holder for ha_ndbcluster thread specific data */ static byte *thd_ndb_share_get_key(THD_NDB_SHARE *thd_ndb_share, uint *length, my_bool not_used __attribute__((unused))) { *length= sizeof(thd_ndb_share->key); return (byte*) thd_ndb_share->key; } Thd_ndb::Thd_ndb() { ndb= new Ndb(g_ndb_cluster_connection, ""); Loading @@ -324,6 +332,8 @@ Thd_ndb::Thd_ndb() stmt= NULL; error= 0; options= 0; (void) hash_init(&open_tables, system_charset_info, 32, 0, 0, (hash_get_key)thd_ndb_share_get_key, 0, 0); } Thd_ndb::~Thd_ndb() Loading @@ -347,6 +357,30 @@ Thd_ndb::~Thd_ndb() ndb= NULL; } changed_tables.empty(); hash_free(&open_tables); } void Thd_ndb::init_open_tables() { my_hash_reset(&open_tables); } THD_NDB_SHARE * Thd_ndb::get_open_table(THD *thd, const void *key) { DBUG_ENTER("Thd_ndb::get_open_table"); THD_NDB_SHARE *thd_ndb_share= (THD_NDB_SHARE*)hash_search(&open_tables, (byte *)key, sizeof(key)); if (thd_ndb_share == 0) { thd_ndb_share= (THD_NDB_SHARE *) alloc_root(&thd->transaction.mem_root, sizeof(THD_NDB_SHARE)); thd_ndb_share->key= key; my_hash_insert(&open_tables, (byte *)thd_ndb_share); } DBUG_PRINT("exit", ("thd_ndb_share: 0x%x", thd_ndb_share)); DBUG_RETURN(thd_ndb_share); } inline Loading @@ -359,12 +393,6 @@ Ndb *ha_ndbcluster::get_ndb() * manage uncommitted insert/deletes during transactio to get records correct */ struct Ndb_local_table_statistics { int no_uncommitted_rows_count; ulong last_count; ha_rows records; }; void ha_ndbcluster::set_rec_per_key() { DBUG_ENTER("ha_ndbcluster::get_status_const"); Loading @@ -380,14 +408,14 @@ void ha_ndbcluster::records_update() if (m_ha_not_exact_count) return; DBUG_ENTER("ha_ndbcluster::records_update"); struct Ndb_local_table_statistics *info= (struct Ndb_local_table_statistics *)m_table_info; struct Ndb_local_table_statistics *info= m_table_info; DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", ((const NDBTAB *)m_table)->getTableId(), info->no_uncommitted_rows_count)); // if (info->records == ~(ha_rows)0) { Ndb *ndb= get_ndb(); ndb->setDatabaseName(m_dbname); struct Ndb_statistics stat; if (ndb_get_table_statistics(ndb, m_tabname, &stat) == 0){ mean_rec_length= stat.row_size; Loading Loading @@ -418,8 +446,7 @@ void ha_ndbcluster::no_uncommitted_rows_init(THD *thd) if (m_ha_not_exact_count) return; DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init"); struct Ndb_local_table_statistics *info= (struct Ndb_local_table_statistics *)m_table_info; struct Ndb_local_table_statistics *info= m_table_info; Thd_ndb *thd_ndb= get_thd_ndb(thd); if (info->last_count != thd_ndb->count) { Loading @@ -438,8 +465,7 @@ void ha_ndbcluster::no_uncommitted_rows_update(int c) if (m_ha_not_exact_count) return; DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update"); struct Ndb_local_table_statistics *info= (struct Ndb_local_table_statistics *)m_table_info; struct Ndb_local_table_statistics *info= m_table_info; info->no_uncommitted_rows_count+= c; DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", ((const NDBTAB *)m_table)->getTableId(), Loading Loading @@ -3576,6 +3602,7 @@ void ha_ndbcluster::info(uint flag) if ((my_errno= check_ndb_connection())) DBUG_VOID_RETURN; Ndb *ndb= get_ndb(); ndb->setDatabaseName(m_dbname); struct Ndb_statistics stat; if (current_thd->variables.ndb_use_exact_count && ndb_get_table_statistics(ndb, m_tabname, &stat) == 0) Loading Loading @@ -3915,6 +3942,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) trans= ndb->startTransaction(); if (trans == NULL) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); Loading @@ -3930,6 +3958,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) trans= ndb->startTransaction(); if (trans == NULL) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; trans_register_ha(thd, TRUE, &ndbcluster_hton); Loading Loading @@ -3975,8 +4004,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) { NDBDICT *dict= ndb->getDictionary(); const NDBTAB *tab; void *tab_info; if (!(tab= dict->getTable(m_tabname, &tab_info))) if (!(tab= dict->getTable(m_tabname))) ERR_RETURN(dict->getNdbError()); DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); Loading @@ -3987,7 +4015,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) || tab->getObjectStatus() == NdbDictionary::Object::Invalid) { invalidate_dictionary_cache(FALSE, tab); if (!(tab= dict->getTable(m_tabname, &tab_info))) if (!(tab= dict->getTable(m_tabname))) ERR_RETURN(dict->getNdbError()); DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); Loading @@ -4007,8 +4035,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (!(my_errno= open_indexes(ndb, table, FALSE))) DBUG_RETURN(my_errno); } m_table_info= tab_info; } m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); m_table_info= &m_thd_ndb_share->stat; no_uncommitted_rows_init(thd); } else Loading Loading @@ -5550,9 +5579,6 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb() DBUG_ENTER("seize_thd_ndb"); thd_ndb= new Thd_ndb(); thd_ndb->ndb->getDictionary()->set_local_table_data_size( sizeof(Ndb_local_table_statistics) ); if (thd_ndb->ndb->init(max_transactions) != 0) { ERR_PRINT(thd_ndb->ndb->getNdbError()); Loading Loading @@ -5642,7 +5668,6 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name, DBUG_RETURN(HA_ERR_NO_CONNECTION); ndb->setDatabaseName(db); NDBDICT* dict= ndb->getDictionary(); dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); dict->invalidateTable(name); build_table_filename(key, sizeof(key), db, name, ""); NDB_SHARE *share= get_share(key, 0, false); Loading Loading @@ -5714,7 +5739,6 @@ int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name ndb->setDatabaseName(db); NDBDICT* dict= ndb->getDictionary(); dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); dict->invalidateTable(name); if (!(tab= dict->getTable(name))) { Loading Loading @@ -6212,7 +6236,6 @@ static bool ndbcluster_init() DBUG_PRINT("error", ("failed to create global ndb object")); goto ndbcluster_init_error; } g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); if (g_ndb->init() != 0) { ERR_PRINT (g_ndb->getNdbError()); Loading Loading @@ -6507,8 +6530,7 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key, { // We must provide approx table rows Uint64 table_rows=0; Ndb_local_table_statistics *info= (Ndb_local_table_statistics *)m_table_info; Ndb_local_table_statistics *info= m_table_info; if (info->records != ~(ha_rows)0 && info->records != 0) { table_rows = info->records; Loading sql/ha_ndbcluster.h +18 −1 Original line number Diff line number Diff line Loading @@ -523,11 +523,26 @@ enum THD_NDB_OPTIONS TNO_NO_LOG_SCHEMA_OP= 1 << 0 }; struct Ndb_local_table_statistics { int no_uncommitted_rows_count; ulong last_count; ha_rows records; }; typedef struct st_thd_ndb_share { const void *key; struct Ndb_local_table_statistics stat; } THD_NDB_SHARE; class Thd_ndb { public: Thd_ndb(); ~Thd_ndb(); void init_open_tables(); THD_NDB_SHARE *get_open_table(THD *thd, const void *key); Ndb *ndb; ulong count; uint lock_count; Loading @@ -536,6 +551,7 @@ class Thd_ndb int error; uint32 options; List<NDB_SHARE> changed_tables; HASH open_tables; }; class ha_ndbcluster: public handler Loading Loading @@ -819,7 +835,7 @@ static void set_tabname(const char *pathname, char *tabname); NdbScanOperation *m_active_cursor; const NdbDictionary::Table *m_table; int m_table_version; void *m_table_info; struct Ndb_local_table_statistics *m_table_info; char m_dbname[FN_HEADLEN]; //char m_schemaname[FN_HEADLEN]; char m_tabname[FN_HEADLEN]; Loading @@ -827,6 +843,7 @@ static void set_tabname(const char *pathname, char *tabname); THR_LOCK_DATA m_lock; NDB_SHARE *m_share; NDB_INDEX_DATA m_index[MAX_KEY]; THD_NDB_SHARE *m_thd_ndb_share; // NdbRecAttr has no reference to blob NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; byte m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH]; Loading Loading
sql/ha_ndbcluster.cc +46 −24 Original line number Diff line number Diff line Loading @@ -315,6 +315,14 @@ int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) /* Place holder for ha_ndbcluster thread specific data */ static byte *thd_ndb_share_get_key(THD_NDB_SHARE *thd_ndb_share, uint *length, my_bool not_used __attribute__((unused))) { *length= sizeof(thd_ndb_share->key); return (byte*) thd_ndb_share->key; } Thd_ndb::Thd_ndb() { ndb= new Ndb(g_ndb_cluster_connection, ""); Loading @@ -324,6 +332,8 @@ Thd_ndb::Thd_ndb() stmt= NULL; error= 0; options= 0; (void) hash_init(&open_tables, system_charset_info, 32, 0, 0, (hash_get_key)thd_ndb_share_get_key, 0, 0); } Thd_ndb::~Thd_ndb() Loading @@ -347,6 +357,30 @@ Thd_ndb::~Thd_ndb() ndb= NULL; } changed_tables.empty(); hash_free(&open_tables); } void Thd_ndb::init_open_tables() { my_hash_reset(&open_tables); } THD_NDB_SHARE * Thd_ndb::get_open_table(THD *thd, const void *key) { DBUG_ENTER("Thd_ndb::get_open_table"); THD_NDB_SHARE *thd_ndb_share= (THD_NDB_SHARE*)hash_search(&open_tables, (byte *)key, sizeof(key)); if (thd_ndb_share == 0) { thd_ndb_share= (THD_NDB_SHARE *) alloc_root(&thd->transaction.mem_root, sizeof(THD_NDB_SHARE)); thd_ndb_share->key= key; my_hash_insert(&open_tables, (byte *)thd_ndb_share); } DBUG_PRINT("exit", ("thd_ndb_share: 0x%x", thd_ndb_share)); DBUG_RETURN(thd_ndb_share); } inline Loading @@ -359,12 +393,6 @@ Ndb *ha_ndbcluster::get_ndb() * manage uncommitted insert/deletes during transactio to get records correct */ struct Ndb_local_table_statistics { int no_uncommitted_rows_count; ulong last_count; ha_rows records; }; void ha_ndbcluster::set_rec_per_key() { DBUG_ENTER("ha_ndbcluster::get_status_const"); Loading @@ -380,14 +408,14 @@ void ha_ndbcluster::records_update() if (m_ha_not_exact_count) return; DBUG_ENTER("ha_ndbcluster::records_update"); struct Ndb_local_table_statistics *info= (struct Ndb_local_table_statistics *)m_table_info; struct Ndb_local_table_statistics *info= m_table_info; DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", ((const NDBTAB *)m_table)->getTableId(), info->no_uncommitted_rows_count)); // if (info->records == ~(ha_rows)0) { Ndb *ndb= get_ndb(); ndb->setDatabaseName(m_dbname); struct Ndb_statistics stat; if (ndb_get_table_statistics(ndb, m_tabname, &stat) == 0){ mean_rec_length= stat.row_size; Loading Loading @@ -418,8 +446,7 @@ void ha_ndbcluster::no_uncommitted_rows_init(THD *thd) if (m_ha_not_exact_count) return; DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init"); struct Ndb_local_table_statistics *info= (struct Ndb_local_table_statistics *)m_table_info; struct Ndb_local_table_statistics *info= m_table_info; Thd_ndb *thd_ndb= get_thd_ndb(thd); if (info->last_count != thd_ndb->count) { Loading @@ -438,8 +465,7 @@ void ha_ndbcluster::no_uncommitted_rows_update(int c) if (m_ha_not_exact_count) return; DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update"); struct Ndb_local_table_statistics *info= (struct Ndb_local_table_statistics *)m_table_info; struct Ndb_local_table_statistics *info= m_table_info; info->no_uncommitted_rows_count+= c; DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d", ((const NDBTAB *)m_table)->getTableId(), Loading Loading @@ -3576,6 +3602,7 @@ void ha_ndbcluster::info(uint flag) if ((my_errno= check_ndb_connection())) DBUG_VOID_RETURN; Ndb *ndb= get_ndb(); ndb->setDatabaseName(m_dbname); struct Ndb_statistics stat; if (current_thd->variables.ndb_use_exact_count && ndb_get_table_statistics(ndb, m_tabname, &stat) == 0) Loading Loading @@ -3915,6 +3942,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) trans= ndb->startTransaction(); if (trans == NULL) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); Loading @@ -3930,6 +3958,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) trans= ndb->startTransaction(); if (trans == NULL) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; trans_register_ha(thd, TRUE, &ndbcluster_hton); Loading Loading @@ -3975,8 +4004,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) { NDBDICT *dict= ndb->getDictionary(); const NDBTAB *tab; void *tab_info; if (!(tab= dict->getTable(m_tabname, &tab_info))) if (!(tab= dict->getTable(m_tabname))) ERR_RETURN(dict->getNdbError()); DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); Loading @@ -3987,7 +4015,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) || tab->getObjectStatus() == NdbDictionary::Object::Invalid) { invalidate_dictionary_cache(FALSE, tab); if (!(tab= dict->getTable(m_tabname, &tab_info))) if (!(tab= dict->getTable(m_tabname))) ERR_RETURN(dict->getNdbError()); DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion())); Loading @@ -4007,8 +4035,9 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) if (!(my_errno= open_indexes(ndb, table, FALSE))) DBUG_RETURN(my_errno); } m_table_info= tab_info; } m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table); m_table_info= &m_thd_ndb_share->stat; no_uncommitted_rows_init(thd); } else Loading Loading @@ -5550,9 +5579,6 @@ Thd_ndb* ha_ndbcluster::seize_thd_ndb() DBUG_ENTER("seize_thd_ndb"); thd_ndb= new Thd_ndb(); thd_ndb->ndb->getDictionary()->set_local_table_data_size( sizeof(Ndb_local_table_statistics) ); if (thd_ndb->ndb->init(max_transactions) != 0) { ERR_PRINT(thd_ndb->ndb->getNdbError()); Loading Loading @@ -5642,7 +5668,6 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name, DBUG_RETURN(HA_ERR_NO_CONNECTION); ndb->setDatabaseName(db); NDBDICT* dict= ndb->getDictionary(); dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); dict->invalidateTable(name); build_table_filename(key, sizeof(key), db, name, ""); NDB_SHARE *share= get_share(key, 0, false); Loading Loading @@ -5714,7 +5739,6 @@ int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name ndb->setDatabaseName(db); NDBDICT* dict= ndb->getDictionary(); dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); dict->invalidateTable(name); if (!(tab= dict->getTable(name))) { Loading Loading @@ -6212,7 +6236,6 @@ static bool ndbcluster_init() DBUG_PRINT("error", ("failed to create global ndb object")); goto ndbcluster_init_error; } g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_local_table_statistics)); if (g_ndb->init() != 0) { ERR_PRINT (g_ndb->getNdbError()); Loading Loading @@ -6507,8 +6530,7 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key, { // We must provide approx table rows Uint64 table_rows=0; Ndb_local_table_statistics *info= (Ndb_local_table_statistics *)m_table_info; Ndb_local_table_statistics *info= m_table_info; if (info->records != ~(ha_rows)0 && info->records != 0) { table_rows = info->records; Loading
sql/ha_ndbcluster.h +18 −1 Original line number Diff line number Diff line Loading @@ -523,11 +523,26 @@ enum THD_NDB_OPTIONS TNO_NO_LOG_SCHEMA_OP= 1 << 0 }; struct Ndb_local_table_statistics { int no_uncommitted_rows_count; ulong last_count; ha_rows records; }; typedef struct st_thd_ndb_share { const void *key; struct Ndb_local_table_statistics stat; } THD_NDB_SHARE; class Thd_ndb { public: Thd_ndb(); ~Thd_ndb(); void init_open_tables(); THD_NDB_SHARE *get_open_table(THD *thd, const void *key); Ndb *ndb; ulong count; uint lock_count; Loading @@ -536,6 +551,7 @@ class Thd_ndb int error; uint32 options; List<NDB_SHARE> changed_tables; HASH open_tables; }; class ha_ndbcluster: public handler Loading Loading @@ -819,7 +835,7 @@ static void set_tabname(const char *pathname, char *tabname); NdbScanOperation *m_active_cursor; const NdbDictionary::Table *m_table; int m_table_version; void *m_table_info; struct Ndb_local_table_statistics *m_table_info; char m_dbname[FN_HEADLEN]; //char m_schemaname[FN_HEADLEN]; char m_tabname[FN_HEADLEN]; Loading @@ -827,6 +843,7 @@ static void set_tabname(const char *pathname, char *tabname); THR_LOCK_DATA m_lock; NDB_SHARE *m_share; NDB_INDEX_DATA m_index[MAX_KEY]; THD_NDB_SHARE *m_thd_ndb_share; // NdbRecAttr has no reference to blob NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; byte m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH]; Loading