Loading mysql-test/r/ndb_lock.result +12 −3 Original line number Diff line number Diff line Loading @@ -64,17 +64,26 @@ pk u o insert into t1 values (1,1,1); drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); begin; select * from t1 where x = 1 for update; x y z 1 one 1 begin; select * from t1 where x = 2 for update; select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction rollback; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; x y z 2 two 2 1 one 1 select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction select * from t1 where x = 2 for update; x y z 2 two 2 rollback; commit; begin; Loading mysql-test/t/ndb_lock.test +13 −2 Original line number Diff line number Diff line Loading @@ -73,7 +73,7 @@ drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); # PK access connection con1; Loading @@ -82,11 +82,22 @@ select * from t1 where x = 1 for update; connection con2; begin; select * from t1 where x = 2 for update; --error 1205 select * from t1 where x = 1 for update; rollback; connection con1; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; connection con2; --error 1205 select * from t1 where x = 1 for update; select * from t1 where x = 2 for update; rollback; connection con1; commit; Loading sql/ha_ndbcluster.cc +70 −17 Original line number Diff line number Diff line Loading @@ -256,13 +256,15 @@ int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans) } inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return h->m_ignore_no_key ? execute_no_commit_ignore_no_key(h,trans) : trans->execute(NdbTransaction::NoCommit, Loading Loading @@ -297,13 +299,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); Loading @@ -328,6 +332,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; query_state&= NDB_QUERY_NORMAL; options= 0; (void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0, (hash_get_key)thd_ndb_share_get_key, 0, 0); Loading Loading @@ -1696,7 +1701,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf, ERR_RETURN(trans->getNdbError()); } if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1761,7 +1766,7 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data, } } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1914,7 +1919,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false); else { // Table has no keys Loading Loading @@ -1963,7 +1968,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -2011,7 +2016,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; Loading Loading @@ -2043,7 +2048,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else Loading Loading @@ -2370,7 +2375,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ERR_RETURN(trans->getNdbError()); } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2440,7 +2445,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2603,7 +2608,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); Loading Loading @@ -2840,7 +2845,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) op->setValue(no_fields, part_func_value); } // Execute update operation if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2926,7 +2931,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3392,6 +3397,26 @@ int ha_ndbcluster::close_scan() NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor; if (m_lock_tuple) { /* Lock level m_lock.type either TL_WRITE_ALLOW_WRITE (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT LOCK WITH SHARE MODE) and row was not explictly unlocked with unlock_row() call */ NdbOperation *op; // Lock row DBUG_PRINT("info", ("Keeping lock on scanned row")); if (!(op= cursor->lockCurrentTuple())) { m_lock_tuple= false; ERR_RETURN(trans->getNdbError()); } m_ops_pending++; } m_lock_tuple= false; if (m_ops_pending) { /* Loading @@ -3399,7 +3424,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3793,7 +3818,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); Loading Loading @@ -3968,6 +3993,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else Loading @@ -3983,6 +4009,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* Loading Loading @@ -4139,6 +4166,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement Loading Loading @@ -7557,6 +7585,30 @@ int ha_ndbcluster::write_ndb_file(const char *name) DBUG_RETURN(error); } void ha_ndbcluster::release_completed_operations(NdbTransaction *trans, bool force_release) { if (trans->hasBlobOperation()) { /* We are reading/writing BLOB fields, releasing operation records is unsafe */ return; } if (!force_release) { if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) { /* We are batching reads and have not consumed all fetched rows yet, releasing operation records is unsafe */ return; } } trans->releaseCompletedOperations(); } int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, Loading @@ -7572,6 +7624,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table_share->reclength; NdbOperation* op; Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value()) { Loading @@ -7585,7 +7638,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** Loading Loading @@ -7757,7 +7810,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; Loading sql/ha_ndbcluster.h +10 −2 Original line number Diff line number Diff line Loading @@ -534,6 +534,11 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 } NDB_QUERY_STATE_BITS; /* Place holder for ha_ndbcluster thread specific data */ Loading Loading @@ -571,6 +576,7 @@ class Thd_ndb int error; uint32 options; List<NDB_SHARE> changed_tables; uint query_state; HASH open_tables; }; Loading Loading @@ -849,8 +855,8 @@ static void set_tabname(const char *pathname, char *tabname); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; Loading Loading @@ -898,6 +904,8 @@ static void set_tabname(const char *pathname, char *tabname); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; byte *m_multi_range_result_ptr; Loading storage/ndb/include/ndbapi/NdbTransaction.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -140,6 +140,7 @@ class NdbTransaction friend class NdbIndexOperation; friend class NdbIndexScanOperation; friend class NdbBlob; friend class ha_ndbcluster; #endif public: Loading Loading @@ -791,6 +792,7 @@ private: // optim: any blobs bool theBlobFlag; Uint8 thePendingBlobOps; inline bool hasBlobOperation() { return theBlobFlag; } static void sendTC_COMMIT_ACK(class TransporterFacade *, NdbApiSignal *, Uint32 transId1, Uint32 transId2, Loading Loading
mysql-test/r/ndb_lock.result +12 −3 Original line number Diff line number Diff line Loading @@ -64,17 +64,26 @@ pk u o insert into t1 values (1,1,1); drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); begin; select * from t1 where x = 1 for update; x y z 1 one 1 begin; select * from t1 where x = 2 for update; select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction rollback; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; x y z 2 two 2 1 one 1 select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction select * from t1 where x = 2 for update; x y z 2 two 2 rollback; commit; begin; Loading
mysql-test/t/ndb_lock.test +13 −2 Original line number Diff line number Diff line Loading @@ -73,7 +73,7 @@ drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); # PK access connection con1; Loading @@ -82,11 +82,22 @@ select * from t1 where x = 1 for update; connection con2; begin; select * from t1 where x = 2 for update; --error 1205 select * from t1 where x = 1 for update; rollback; connection con1; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; connection con2; --error 1205 select * from t1 where x = 1 for update; select * from t1 where x = 2 for update; rollback; connection con1; commit; Loading
sql/ha_ndbcluster.cc +70 −17 Original line number Diff line number Diff line Loading @@ -256,13 +256,15 @@ int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans) } inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return h->m_ignore_no_key ? execute_no_commit_ignore_no_key(h,trans) : trans->execute(NdbTransaction::NoCommit, Loading Loading @@ -297,13 +299,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); Loading @@ -328,6 +332,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; query_state&= NDB_QUERY_NORMAL; options= 0; (void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0, (hash_get_key)thd_ndb_share_get_key, 0, 0); Loading Loading @@ -1696,7 +1701,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf, ERR_RETURN(trans->getNdbError()); } if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1761,7 +1766,7 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data, } } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1914,7 +1919,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false); else { // Table has no keys Loading Loading @@ -1963,7 +1968,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -2011,7 +2016,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; Loading Loading @@ -2043,7 +2048,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else Loading Loading @@ -2370,7 +2375,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ERR_RETURN(trans->getNdbError()); } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2440,7 +2445,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2603,7 +2608,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); Loading Loading @@ -2840,7 +2845,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) op->setValue(no_fields, part_func_value); } // Execute update operation if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2926,7 +2931,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3392,6 +3397,26 @@ int ha_ndbcluster::close_scan() NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor; if (m_lock_tuple) { /* Lock level m_lock.type either TL_WRITE_ALLOW_WRITE (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT LOCK WITH SHARE MODE) and row was not explictly unlocked with unlock_row() call */ NdbOperation *op; // Lock row DBUG_PRINT("info", ("Keeping lock on scanned row")); if (!(op= cursor->lockCurrentTuple())) { m_lock_tuple= false; ERR_RETURN(trans->getNdbError()); } m_ops_pending++; } m_lock_tuple= false; if (m_ops_pending) { /* Loading @@ -3399,7 +3424,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3793,7 +3818,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); Loading Loading @@ -3968,6 +3993,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else Loading @@ -3983,6 +4009,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); thd_ndb->init_open_tables(); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* Loading Loading @@ -4139,6 +4166,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement Loading Loading @@ -7557,6 +7585,30 @@ int ha_ndbcluster::write_ndb_file(const char *name) DBUG_RETURN(error); } void ha_ndbcluster::release_completed_operations(NdbTransaction *trans, bool force_release) { if (trans->hasBlobOperation()) { /* We are reading/writing BLOB fields, releasing operation records is unsafe */ return; } if (!force_release) { if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) { /* We are batching reads and have not consumed all fetched rows yet, releasing operation records is unsafe */ return; } } trans->releaseCompletedOperations(); } int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, Loading @@ -7572,6 +7624,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table_share->reclength; NdbOperation* op; Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value()) { Loading @@ -7585,7 +7638,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** Loading Loading @@ -7757,7 +7810,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; Loading
sql/ha_ndbcluster.h +10 −2 Original line number Diff line number Diff line Loading @@ -534,6 +534,11 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 } NDB_QUERY_STATE_BITS; /* Place holder for ha_ndbcluster thread specific data */ Loading Loading @@ -571,6 +576,7 @@ class Thd_ndb int error; uint32 options; List<NDB_SHARE> changed_tables; uint query_state; HASH open_tables; }; Loading Loading @@ -849,8 +855,8 @@ static void set_tabname(const char *pathname, char *tabname); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; Loading Loading @@ -898,6 +904,8 @@ static void set_tabname(const char *pathname, char *tabname); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; byte *m_multi_range_result_ptr; Loading
storage/ndb/include/ndbapi/NdbTransaction.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -140,6 +140,7 @@ class NdbTransaction friend class NdbIndexOperation; friend class NdbIndexScanOperation; friend class NdbBlob; friend class ha_ndbcluster; #endif public: Loading Loading @@ -791,6 +792,7 @@ private: // optim: any blobs bool theBlobFlag; Uint8 thePendingBlobOps; inline bool hasBlobOperation() { return theBlobFlag; } static void sendTC_COMMIT_ACK(class TransporterFacade *, NdbApiSignal *, Uint32 transId1, Uint32 transId2, Loading