Loading mysql-test/r/ndb_lock.result +12 −3 Original line number Diff line number Diff line Loading @@ -64,17 +64,26 @@ pk u o insert into t1 values (1,1,1); drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); begin; select * from t1 where x = 1 for update; x y z 1 one 1 begin; select * from t1 where x = 2 for update; select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction rollback; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; x y z 2 two 2 1 one 1 select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction select * from t1 where x = 2 for update; x y z 2 two 2 rollback; commit; begin; Loading mysql-test/t/ndb_lock.test +13 −2 Original line number Diff line number Diff line Loading @@ -73,7 +73,7 @@ drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); # PK access connection con1; Loading @@ -82,11 +82,22 @@ select * from t1 where x = 1 for update; connection con2; begin; select * from t1 where x = 2 for update; --error 1205 select * from t1 where x = 1 for update; rollback; connection con1; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; connection con2; --error 1205 select * from t1 where x = 1 for update; select * from t1 where x = 2 for update; rollback; connection con1; commit; Loading ndb/include/ndbapi/NdbTransaction.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -140,6 +140,7 @@ class NdbTransaction friend class NdbIndexOperation; friend class NdbIndexScanOperation; friend class NdbBlob; friend class ha_ndbcluster; #endif public: Loading Loading @@ -791,6 +792,7 @@ private: // optim: any blobs bool theBlobFlag; Uint8 thePendingBlobOps; inline bool hasBlobOperation() { return theBlobFlag; } static void sendTC_COMMIT_ACK(NdbApiSignal *, Uint32 transId1, Uint32 transId2, Loading sql/ha_ndbcluster.cc +70 −17 Original line number Diff line number Diff line Loading @@ -228,13 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err) inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, h->m_force_send); Loading Loading @@ -267,13 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); Loading @@ -290,6 +294,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; query_state&= NDB_QUERY_NORMAL; } Thd_ndb::~Thd_ndb() Loading Loading @@ -1443,7 +1448,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1490,7 +1495,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ERR_RETURN(trans->getNdbError()); } } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1630,7 +1635,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false); else { // Table has no keys Loading Loading @@ -1679,7 +1684,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1727,7 +1732,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; Loading Loading @@ -1759,7 +1764,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else Loading Loading @@ -2063,7 +2068,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_RETURN(res); } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2096,7 +2101,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2228,7 +2233,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); Loading Loading @@ -2428,7 +2433,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Execute update operation if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2499,7 +2504,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2928,6 +2933,26 @@ int ha_ndbcluster::close_scan() NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor; if (m_lock_tuple) { /* Lock level m_lock.type either TL_WRITE_ALLOW_WRITE (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT LOCK WITH SHARE MODE) and row was not explictly unlocked with unlock_row() call */ NdbOperation *op; // Lock row DBUG_PRINT("info", ("Keeping lock on scanned row")); if (!(op= cursor->lockCurrentTuple())) { m_lock_tuple= false; ERR_RETURN(trans->getNdbError()); } m_ops_pending++; } m_lock_tuple= false; if (m_ops_pending) { /* Loading @@ -2935,7 +2960,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3345,7 +3370,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); Loading Loading @@ -3518,6 +3543,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else Loading @@ -3533,6 +3559,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* Loading Loading @@ -3739,6 +3766,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement Loading Loading @@ -5986,6 +6014,30 @@ int ha_ndbcluster::write_ndb_file() DBUG_RETURN(error); } void ha_ndbcluster::release_completed_operations(NdbTransaction *trans, bool force_release) { if (trans->hasBlobOperation()) { /* We are reading/writing BLOB fields, releasing operation records is unsafe */ return; } if (!force_release) { if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) { /* We are batching reads and have not consumed all fetched rows yet, releasing operation records is unsafe */ return; } } trans->releaseCompletedOperations(); } int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, Loading @@ -6000,6 +6052,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table->s->reclength; NdbOperation* op; Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value(m_retrieve_all_fields)) { Loading @@ -6013,7 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** Loading Loading @@ -6160,7 +6213,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; Loading sql/ha_ndbcluster.h +10 −2 Original line number Diff line number Diff line Loading @@ -435,6 +435,11 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 } NDB_QUERY_STATE_BITS; /* Place holder for ha_ndbcluster thread specific data */ Loading @@ -451,6 +456,7 @@ class Thd_ndb NdbTransaction *stmt; int error; List<NDB_SHARE> changed_tables; uint query_state; }; class ha_ndbcluster: public handler Loading Loading @@ -672,8 +678,8 @@ static void set_tabname(const char *pathname, char *tabname); NdbScanOperation* op); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; Loading Loading @@ -716,6 +722,8 @@ static void set_tabname(const char *pathname, char *tabname); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; byte *m_multi_range_result_ptr; Loading Loading
mysql-test/r/ndb_lock.result +12 −3 Original line number Diff line number Diff line Loading @@ -64,17 +64,26 @@ pk u o insert into t1 values (1,1,1); drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); begin; select * from t1 where x = 1 for update; x y z 1 one 1 begin; select * from t1 where x = 2 for update; select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction rollback; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; x y z 2 two 2 1 one 1 select * from t1 where x = 1 for update; ERROR HY000: Lock wait timeout exceeded; try restarting transaction select * from t1 where x = 2 for update; x y z 2 two 2 rollback; commit; begin; Loading
mysql-test/t/ndb_lock.test +13 −2 Original line number Diff line number Diff line Loading @@ -73,7 +73,7 @@ drop table t1; create table t1 (x integer not null primary key, y varchar(32), z integer, key(z)) engine = ndb; insert into t1 values (1,'one',1), (2,'two',2),(3,"three",3); insert into t1 values (1,'one',1); # PK access connection con1; Loading @@ -82,11 +82,22 @@ select * from t1 where x = 1 for update; connection con2; begin; select * from t1 where x = 2 for update; --error 1205 select * from t1 where x = 1 for update; rollback; connection con1; rollback; insert into t1 values (2,'two',2),(3,"three",3); begin; select * from t1 where x = 1 for update; connection con2; --error 1205 select * from t1 where x = 1 for update; select * from t1 where x = 2 for update; rollback; connection con1; commit; Loading
ndb/include/ndbapi/NdbTransaction.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -140,6 +140,7 @@ class NdbTransaction friend class NdbIndexOperation; friend class NdbIndexScanOperation; friend class NdbBlob; friend class ha_ndbcluster; #endif public: Loading Loading @@ -791,6 +792,7 @@ private: // optim: any blobs bool theBlobFlag; Uint8 thePendingBlobOps; inline bool hasBlobOperation() { return theBlobFlag; } static void sendTC_COMMIT_ACK(NdbApiSignal *, Uint32 transId1, Uint32 transId2, Loading
sql/ha_ndbcluster.cc +70 −17 Original line number Diff line number Diff line Loading @@ -228,13 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err) inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, h->m_force_send); Loading Loading @@ -267,13 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); Loading @@ -290,6 +294,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; query_state&= NDB_QUERY_NORMAL; } Thd_ndb::~Thd_ndb() Loading Loading @@ -1443,7 +1448,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1490,7 +1495,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ERR_RETURN(trans->getNdbError()); } } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1630,7 +1635,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false); else { // Table has no keys Loading Loading @@ -1679,7 +1684,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1727,7 +1732,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; Loading Loading @@ -1759,7 +1764,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else Loading Loading @@ -2063,7 +2068,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_RETURN(res); } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2096,7 +2101,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2228,7 +2233,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); Loading Loading @@ -2428,7 +2433,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Execute update operation if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2499,7 +2504,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2928,6 +2933,26 @@ int ha_ndbcluster::close_scan() NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor; if (m_lock_tuple) { /* Lock level m_lock.type either TL_WRITE_ALLOW_WRITE (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT LOCK WITH SHARE MODE) and row was not explictly unlocked with unlock_row() call */ NdbOperation *op; // Lock row DBUG_PRINT("info", ("Keeping lock on scanned row")); if (!(op= cursor->lockCurrentTuple())) { m_lock_tuple= false; ERR_RETURN(trans->getNdbError()); } m_ops_pending++; } m_lock_tuple= false; if (m_ops_pending) { /* Loading @@ -2935,7 +2960,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3345,7 +3370,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); Loading Loading @@ -3518,6 +3543,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else Loading @@ -3533,6 +3559,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* Loading Loading @@ -3739,6 +3766,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement Loading Loading @@ -5986,6 +6014,30 @@ int ha_ndbcluster::write_ndb_file() DBUG_RETURN(error); } void ha_ndbcluster::release_completed_operations(NdbTransaction *trans, bool force_release) { if (trans->hasBlobOperation()) { /* We are reading/writing BLOB fields, releasing operation records is unsafe */ return; } if (!force_release) { if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) { /* We are batching reads and have not consumed all fetched rows yet, releasing operation records is unsafe */ return; } } trans->releaseCompletedOperations(); } int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, Loading @@ -6000,6 +6052,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table->s->reclength; NdbOperation* op; Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value(m_retrieve_all_fields)) { Loading @@ -6013,7 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** Loading Loading @@ -6160,7 +6213,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; Loading
sql/ha_ndbcluster.h +10 −2 Original line number Diff line number Diff line Loading @@ -435,6 +435,11 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 } NDB_QUERY_STATE_BITS; /* Place holder for ha_ndbcluster thread specific data */ Loading @@ -451,6 +456,7 @@ class Thd_ndb NdbTransaction *stmt; int error; List<NDB_SHARE> changed_tables; uint query_state; }; class ha_ndbcluster: public handler Loading Loading @@ -672,8 +678,8 @@ static void set_tabname(const char *pathname, char *tabname); NdbScanOperation* op); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; Loading Loading @@ -716,6 +722,8 @@ static void set_tabname(const char *pathname, char *tabname); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; byte *m_multi_range_result_ptr; Loading