Loading mysql-test/r/ndb_read_multi_range.result +17 −0 Original line number Diff line number Diff line Loading @@ -442,3 +442,20 @@ SELECT id, tag, doc, type FROM t1 WHERE id IN ('flipper','sakila'); id tag doc type sakila 1 Some text goes here text DROP TABLE t1; CREATE TABLE t1 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=ndbcluster DEFAULT CHARSET=ascii CHECKSUM=1; CREATE TABLE t2 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=MyISAM DEFAULT CHARSET=ascii CHECKSUM=1; CREATE TRIGGER testtrigger AFTER UPDATE ON t1 FOR EACH ROW BEGIN REPLACE INTO t2 SELECT * FROM t1 WHERE t1.var1 = NEW.var1;END| INSERT INTO t1 VALUES (1,1),(2,2),(3,3); UPDATE t1 SET var2 = 9 WHERE var1 IN(1,2,3); DROP TRIGGER testtrigger; DROP TABLE t1, t2; mysql-test/t/ndb_read_multi_range.test +29 −0 Original line number Diff line number Diff line Loading @@ -272,3 +272,32 @@ SELECT id, tag, doc, type FROM t1 WHERE id IN ('flipper','orka'); SELECT id, tag, doc, type FROM t1 WHERE id IN ('flipper','sakila'); DROP TABLE t1; #bug#25522 CREATE TABLE t1 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=ndbcluster DEFAULT CHARSET=ascii CHECKSUM=1; CREATE TABLE t2 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=MyISAM DEFAULT CHARSET=ascii CHECKSUM=1; DELIMITER |; CREATE TRIGGER testtrigger AFTER UPDATE ON t1 FOR EACH ROW BEGIN REPLACE INTO t2 SELECT * FROM t1 WHERE t1.var1 = NEW.var1;END| DELIMITER ;| INSERT INTO t1 VALUES (1,1),(2,2),(3,3); UPDATE t1 SET var2 = 9 WHERE var1 IN(1,2,3); DROP TRIGGER testtrigger; DROP TABLE t1, t2; sql/ha_ndbcluster.cc +37 −25 Original line number Diff line number Diff line Loading @@ -260,13 +260,14 @@ static int ndb_to_mysql_error(const NdbError *ndberr) int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans) { int res= trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, NdbOperation::AO_IgnoreError, h->m_force_send); if (res == 0) return 0; if (res == -1) return -1; const NdbError &err= trans->getNdbError(); if (err.classification != NdbError::ConstraintViolation && if (err.classification != NdbError::NoError && err.classification != NdbError::ConstraintViolation && err.classification != NdbError::NoDataFound) return res; Loading @@ -286,7 +287,7 @@ int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, return h->m_ignore_no_key ? execute_no_commit_ignore_no_key(h,trans) : trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, h->m_force_send); } Loading @@ -299,7 +300,7 @@ int execute_commit(ha_ndbcluster *h, NdbTransaction *trans) return 0; #endif return trans->execute(NdbTransaction::Commit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, h->m_force_send); } Loading @@ -312,7 +313,7 @@ int execute_commit(THD *thd, NdbTransaction *trans) return 0; #endif return trans->execute(NdbTransaction::Commit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, thd->variables.ndb_force_send); } Loading @@ -327,7 +328,7 @@ int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, NdbOperation::AO_IgnoreError, h->m_force_send); } Loading Loading @@ -1726,7 +1727,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf, ERR_RETURN(trans->getNdbError()); } if (execute_no_commit_ie(this,trans,FALSE) != 0) if ((res = execute_no_commit_ie(this,trans,FALSE)) != 0 || op->getNdbError().code) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1998,7 +2000,8 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans,FALSE) != 0) if (execute_no_commit_ie(this,trans,FALSE) != 0 || op->getNdbError().code) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -4337,11 +4340,10 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement m_ops_pending= 0; thd->set_current_stmt_binlog_row_based_if_mixed(); Loading Loading @@ -6829,7 +6831,7 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) fprintf(stderr, "NDB: table share %s with use_count %d not freed\n", share->key, share->use_count); #endif real_free_share(&share); ndbcluster_real_free_share(&share); } pthread_mutex_unlock(&ndbcluster_mutex); } Loading Loading @@ -7441,14 +7443,20 @@ int handle_trailing_share(NDB_SHARE *share) bzero((char*) &table_list,sizeof(table_list)); table_list.db= share->db; table_list.alias= table_list.table_name= share->table_name; safe_mutex_assert_owner(&LOCK_open); close_cached_tables(thd, 0, &table_list, TRUE); pthread_mutex_lock(&ndbcluster_mutex); if (!--share->use_count) { DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.", share->key)); real_free_share(&share); if (ndb_extra_logging) sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) " "released by close_cached_tables at " "connect_count: %u", share->key, share->connect_count, g_ndb_cluster_connection->get_connect_count()); ndbcluster_real_free_share(&share); DBUG_RETURN(0); } Loading @@ -7458,10 +7466,14 @@ int handle_trailing_share(NDB_SHARE *share) */ if (share->state != NSS_DROPPED && !--share->use_count) { DBUG_PRINT("info", ("NDB_SHARE: %s already exists, " "use_count=%d state != NSS_DROPPED.", share->key, share->use_count)); real_free_share(&share); if (ndb_extra_logging) sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) " "released after NSS_DROPPED check " "at connect_count: %u", share->key, share->connect_count, g_ndb_cluster_connection->get_connect_count()); ndbcluster_real_free_share(&share); DBUG_RETURN(0); } DBUG_PRINT("error", ("NDB_SHARE: %s already exists use_count=%d.", Loading Loading @@ -7727,7 +7739,7 @@ void ndbcluster_free_share(NDB_SHARE **share, bool have_lock) (*share)->util_lock= 0; if (!--(*share)->use_count) { real_free_share(share); ndbcluster_real_free_share(share); } else { Loading Loading @@ -7800,7 +7812,7 @@ ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb, const (char*)&var_mem); if (pTrans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, TRUE) == -1) { error= pTrans->getNdbError(); Loading Loading @@ -8057,7 +8069,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, !op->readTuple(lm) && !set_primary_key(op, multi_range_curr->start_key.key) && !define_read_attrs(curr, op) && (op->setAbortOption(AO_IgnoreError), TRUE) && (!m_use_partition_function || (op->setPartitionId(part_spec.start_part), TRUE))) curr += reclength; Loading @@ -8079,8 +8090,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && !op->readTuple(lm) && !set_index_key(op, key_info, multi_range_curr->start_key.key) && !define_read_attrs(curr, op) && (op->setAbortOption(AO_IgnoreError), TRUE)) !define_read_attrs(curr, op)) curr += reclength; else ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); Loading Loading @@ -8280,6 +8290,8 @@ ha_ndbcluster::read_multi_range_next(KEY_MULTI_RANGE ** multi_range_found_p) if (multi_range_curr == multi_range_end) { DBUG_MULTI_RANGE(16); Thd_ndb *thd_ndb= get_thd_ndb(current_thd); thd_ndb->query_state&= NDB_QUERY_NORMAL; DBUG_RETURN(HA_ERR_END_OF_FILE); } Loading sql/ha_ndbcluster.h +1 −0 Original line number Diff line number Diff line Loading @@ -108,6 +108,7 @@ typedef struct st_ndbcluster_share { char *table_name; Ndb::TupleIdRange tuple_id_range; #ifdef HAVE_NDB_BINLOG uint32 connect_count; uint32 flags; NdbEventOperation *op; NdbEventOperation *op_old; // for rename table Loading sql/ha_ndbcluster_binlog.cc +52 −5 Original line number Diff line number Diff line Loading @@ -97,6 +97,7 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; pthread_mutex_t ndb_schema_share_mutex; /* Schema object distribution handling */ HASH ndb_schema_objects; Loading Loading @@ -361,6 +362,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) int do_event_op= ndb_binlog_running; DBUG_ENTER("ndbcluster_binlog_init_share"); share->connect_count= g_ndb_cluster_connection->get_connect_count(); share->op= 0; share->table= 0; Loading Loading @@ -604,7 +607,7 @@ static int ndbcluster_binlog_end(THD *thd) ("table->s->db.table_name: %s.%s", share->table->s->db.str, share->table->s->table_name.str)); if (share->state != NSS_DROPPED && !--share->use_count) real_free_share(&share); ndbcluster_real_free_share(&share); else { DBUG_PRINT("share", Loading @@ -621,6 +624,7 @@ static int ndbcluster_binlog_end(THD *thd) pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); pthread_mutex_destroy(&ndb_schema_share_mutex); #endif DBUG_RETURN(0); } Loading Loading @@ -1271,6 +1275,16 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE); bitmap_set_all(&schema_subscribers); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1283,6 +1297,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ if (updated) { bitmap_clear_bit(&schema_subscribers, node_id); Loading Loading @@ -1478,6 +1495,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, &abstime); if (thd->killed) break; /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); break; } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1487,6 +1512,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, bitmap_intersect(&schema_subscribers, tmp); } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ /* remove any unsubscribed from ndb_schema_object->slock */ bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); Loading Loading @@ -1910,8 +1937,14 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE); // fall through case NDBEVENT::TE_ALTER: Loading Loading @@ -2278,6 +2311,7 @@ int ndbcluster_binlog_start() pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); pthread_cond_init(&injector_cond, NULL); pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); /* Create injector thread */ if (pthread_create(&ndb_binlog_thread, &connection_attrib, Loading Loading @@ -2411,11 +2445,20 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(1); } if (share->connect_count != g_ndb_cluster_connection->get_connect_count()) { handle_trailing_share(share); share= NULL; } } /* Create share which is needed to hold replication information */ if (!(share= get_share(key, 0, TRUE, TRUE))) if (share) { ++share->use_count; } else if (!(share= get_share(key, 0, TRUE, TRUE))) { sql_print_error("NDB Binlog: " "allocating table share for %s failed", key); Loading Loading @@ -3924,9 +3967,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) "%ld(%d e/s), total time %ld(%d e/s)", (ulong)gci, event_count, write_timer.elapsed_ms(), event_count / write_timer.elapsed_ms(), (1000*event_count) / write_timer.elapsed_ms(), gci_timer.elapsed_ms(), event_count / gci_timer.elapsed_ms()); (1000*event_count) / gci_timer.elapsed_ms()); #endif } } Loading Loading @@ -3966,8 +4009,12 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } if (ndb_schema_share) { /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ } /* remove all event operations */ Loading Loading
mysql-test/r/ndb_read_multi_range.result +17 −0 Original line number Diff line number Diff line Loading @@ -442,3 +442,20 @@ SELECT id, tag, doc, type FROM t1 WHERE id IN ('flipper','sakila'); id tag doc type sakila 1 Some text goes here text DROP TABLE t1; CREATE TABLE t1 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=ndbcluster DEFAULT CHARSET=ascii CHECKSUM=1; CREATE TABLE t2 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=MyISAM DEFAULT CHARSET=ascii CHECKSUM=1; CREATE TRIGGER testtrigger AFTER UPDATE ON t1 FOR EACH ROW BEGIN REPLACE INTO t2 SELECT * FROM t1 WHERE t1.var1 = NEW.var1;END| INSERT INTO t1 VALUES (1,1),(2,2),(3,3); UPDATE t1 SET var2 = 9 WHERE var1 IN(1,2,3); DROP TRIGGER testtrigger; DROP TABLE t1, t2;
mysql-test/t/ndb_read_multi_range.test +29 −0 Original line number Diff line number Diff line Loading @@ -272,3 +272,32 @@ SELECT id, tag, doc, type FROM t1 WHERE id IN ('flipper','orka'); SELECT id, tag, doc, type FROM t1 WHERE id IN ('flipper','sakila'); DROP TABLE t1; #bug#25522 CREATE TABLE t1 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=ndbcluster DEFAULT CHARSET=ascii CHECKSUM=1; CREATE TABLE t2 ( var1 int(2) NOT NULL, var2 int(2) NOT NULL, PRIMARY KEY (var1) ) ENGINE=MyISAM DEFAULT CHARSET=ascii CHECKSUM=1; DELIMITER |; CREATE TRIGGER testtrigger AFTER UPDATE ON t1 FOR EACH ROW BEGIN REPLACE INTO t2 SELECT * FROM t1 WHERE t1.var1 = NEW.var1;END| DELIMITER ;| INSERT INTO t1 VALUES (1,1),(2,2),(3,3); UPDATE t1 SET var2 = 9 WHERE var1 IN(1,2,3); DROP TRIGGER testtrigger; DROP TABLE t1, t2;
sql/ha_ndbcluster.cc +37 −25 Original line number Diff line number Diff line Loading @@ -260,13 +260,14 @@ static int ndb_to_mysql_error(const NdbError *ndberr) int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans) { int res= trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, NdbOperation::AO_IgnoreError, h->m_force_send); if (res == 0) return 0; if (res == -1) return -1; const NdbError &err= trans->getNdbError(); if (err.classification != NdbError::ConstraintViolation && if (err.classification != NdbError::NoError && err.classification != NdbError::ConstraintViolation && err.classification != NdbError::NoDataFound) return res; Loading @@ -286,7 +287,7 @@ int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, return h->m_ignore_no_key ? execute_no_commit_ignore_no_key(h,trans) : trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, h->m_force_send); } Loading @@ -299,7 +300,7 @@ int execute_commit(ha_ndbcluster *h, NdbTransaction *trans) return 0; #endif return trans->execute(NdbTransaction::Commit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, h->m_force_send); } Loading @@ -312,7 +313,7 @@ int execute_commit(THD *thd, NdbTransaction *trans) return 0; #endif return trans->execute(NdbTransaction::Commit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, thd->variables.ndb_force_send); } Loading @@ -327,7 +328,7 @@ int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, NdbOperation::AO_IgnoreError, h->m_force_send); } Loading Loading @@ -1726,7 +1727,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf, ERR_RETURN(trans->getNdbError()); } if (execute_no_commit_ie(this,trans,FALSE) != 0) if ((res = execute_no_commit_ie(this,trans,FALSE)) != 0 || op->getNdbError().code) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1998,7 +2000,8 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans,FALSE) != 0) if (execute_no_commit_ie(this,trans,FALSE) != 0 || op->getNdbError().code) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -4337,11 +4340,10 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement m_ops_pending= 0; thd->set_current_stmt_binlog_row_based_if_mixed(); Loading Loading @@ -6829,7 +6831,7 @@ static int ndbcluster_end(handlerton *hton, ha_panic_function type) fprintf(stderr, "NDB: table share %s with use_count %d not freed\n", share->key, share->use_count); #endif real_free_share(&share); ndbcluster_real_free_share(&share); } pthread_mutex_unlock(&ndbcluster_mutex); } Loading Loading @@ -7441,14 +7443,20 @@ int handle_trailing_share(NDB_SHARE *share) bzero((char*) &table_list,sizeof(table_list)); table_list.db= share->db; table_list.alias= table_list.table_name= share->table_name; safe_mutex_assert_owner(&LOCK_open); close_cached_tables(thd, 0, &table_list, TRUE); pthread_mutex_lock(&ndbcluster_mutex); if (!--share->use_count) { DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.", share->key)); real_free_share(&share); if (ndb_extra_logging) sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) " "released by close_cached_tables at " "connect_count: %u", share->key, share->connect_count, g_ndb_cluster_connection->get_connect_count()); ndbcluster_real_free_share(&share); DBUG_RETURN(0); } Loading @@ -7458,10 +7466,14 @@ int handle_trailing_share(NDB_SHARE *share) */ if (share->state != NSS_DROPPED && !--share->use_count) { DBUG_PRINT("info", ("NDB_SHARE: %s already exists, " "use_count=%d state != NSS_DROPPED.", share->key, share->use_count)); real_free_share(&share); if (ndb_extra_logging) sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) " "released after NSS_DROPPED check " "at connect_count: %u", share->key, share->connect_count, g_ndb_cluster_connection->get_connect_count()); ndbcluster_real_free_share(&share); DBUG_RETURN(0); } DBUG_PRINT("error", ("NDB_SHARE: %s already exists use_count=%d.", Loading Loading @@ -7727,7 +7739,7 @@ void ndbcluster_free_share(NDB_SHARE **share, bool have_lock) (*share)->util_lock= 0; if (!--(*share)->use_count) { real_free_share(share); ndbcluster_real_free_share(share); } else { Loading Loading @@ -7800,7 +7812,7 @@ ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb, const (char*)&var_mem); if (pTrans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, NdbOperation::AbortOnError, TRUE) == -1) { error= pTrans->getNdbError(); Loading Loading @@ -8057,7 +8069,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, !op->readTuple(lm) && !set_primary_key(op, multi_range_curr->start_key.key) && !define_read_attrs(curr, op) && (op->setAbortOption(AO_IgnoreError), TRUE) && (!m_use_partition_function || (op->setPartitionId(part_spec.start_part), TRUE))) curr += reclength; Loading @@ -8079,8 +8090,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && !op->readTuple(lm) && !set_index_key(op, key_info, multi_range_curr->start_key.key) && !define_read_attrs(curr, op) && (op->setAbortOption(AO_IgnoreError), TRUE)) !define_read_attrs(curr, op)) curr += reclength; else ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); Loading Loading @@ -8280,6 +8290,8 @@ ha_ndbcluster::read_multi_range_next(KEY_MULTI_RANGE ** multi_range_found_p) if (multi_range_curr == multi_range_end) { DBUG_MULTI_RANGE(16); Thd_ndb *thd_ndb= get_thd_ndb(current_thd); thd_ndb->query_state&= NDB_QUERY_NORMAL; DBUG_RETURN(HA_ERR_END_OF_FILE); } Loading
sql/ha_ndbcluster.h +1 −0 Original line number Diff line number Diff line Loading @@ -108,6 +108,7 @@ typedef struct st_ndbcluster_share { char *table_name; Ndb::TupleIdRange tuple_id_range; #ifdef HAVE_NDB_BINLOG uint32 connect_count; uint32 flags; NdbEventOperation *op; NdbEventOperation *op_old; // for rename table Loading
sql/ha_ndbcluster_binlog.cc +52 −5 Original line number Diff line number Diff line Loading @@ -97,6 +97,7 @@ static ulonglong ndb_latest_received_binlog_epoch= 0; NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; pthread_mutex_t ndb_schema_share_mutex; /* Schema object distribution handling */ HASH ndb_schema_objects; Loading Loading @@ -361,6 +362,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) int do_event_op= ndb_binlog_running; DBUG_ENTER("ndbcluster_binlog_init_share"); share->connect_count= g_ndb_cluster_connection->get_connect_count(); share->op= 0; share->table= 0; Loading Loading @@ -604,7 +607,7 @@ static int ndbcluster_binlog_end(THD *thd) ("table->s->db.table_name: %s.%s", share->table->s->db.str, share->table->s->table_name.str)); if (share->state != NSS_DROPPED && !--share->use_count) real_free_share(&share); ndbcluster_real_free_share(&share); else { DBUG_PRINT("share", Loading @@ -621,6 +624,7 @@ static int ndbcluster_binlog_end(THD *thd) pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); pthread_mutex_destroy(&ndb_schema_share_mutex); #endif DBUG_RETURN(0); } Loading Loading @@ -1271,6 +1275,16 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes(); bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE); bitmap_set_all(&schema_subscribers); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); if (ndb_schema_object) ndb_free_schema_object(&ndb_schema_object, FALSE); DBUG_RETURN(0); } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1283,6 +1297,9 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, } } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ if (updated) { bitmap_clear_bit(&schema_subscribers, node_id); Loading Loading @@ -1478,6 +1495,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, &abstime); if (thd->killed) break; /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); if (ndb_schema_share == 0) { pthread_mutex_unlock(&ndb_schema_share_mutex); break; } (void) pthread_mutex_lock(&ndb_schema_share->mutex); for (i= 0; i < no_storage_nodes; i++) { Loading @@ -1487,6 +1512,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, bitmap_intersect(&schema_subscribers, tmp); } (void) pthread_mutex_unlock(&ndb_schema_share->mutex); pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ /* remove any unsubscribed from ndb_schema_object->slock */ bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers); Loading Loading @@ -1910,8 +1937,14 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, ndb_binlog_tables_inited && ndb_binlog_running) sql_print_information("NDB Binlog: ndb tables initially " "read only on reconnect."); /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, FALSE); // fall through case NDBEVENT::TE_ALTER: Loading Loading @@ -2278,6 +2311,7 @@ int ndbcluster_binlog_start() pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); pthread_cond_init(&injector_cond, NULL); pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); /* Create injector thread */ if (pthread_create(&ndb_binlog_thread, &connection_attrib, Loading Loading @@ -2411,11 +2445,20 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, pthread_mutex_unlock(&ndbcluster_mutex); DBUG_RETURN(1); } if (share->connect_count != g_ndb_cluster_connection->get_connect_count()) { handle_trailing_share(share); share= NULL; } } /* Create share which is needed to hold replication information */ if (!(share= get_share(key, 0, TRUE, TRUE))) if (share) { ++share->use_count; } else if (!(share= get_share(key, 0, TRUE, TRUE))) { sql_print_error("NDB Binlog: " "allocating table share for %s failed", key); Loading Loading @@ -3924,9 +3967,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) "%ld(%d e/s), total time %ld(%d e/s)", (ulong)gci, event_count, write_timer.elapsed_ms(), event_count / write_timer.elapsed_ms(), (1000*event_count) / write_timer.elapsed_ms(), gci_timer.elapsed_ms(), event_count / gci_timer.elapsed_ms()); (1000*event_count) / gci_timer.elapsed_ms()); #endif } } Loading Loading @@ -3966,8 +4009,12 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) } if (ndb_schema_share) { /* begin protect ndb_schema_share */ pthread_mutex_lock(&ndb_schema_share_mutex); free_share(&ndb_schema_share); ndb_schema_share= 0; pthread_mutex_unlock(&ndb_schema_share_mutex); /* end protect ndb_schema_share */ } /* remove all event operations */ Loading