Loading sql/ha_ndbcluster.cc +41 −21 Original line number Diff line number Diff line Loading @@ -228,14 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err) inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans); h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, h->m_force_send); Loading Loading @@ -268,14 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans); h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); Loading @@ -292,6 +294,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; query_state&= NDB_QUERY_NORMAL; } Thd_ndb::~Thd_ndb() Loading Loading @@ -1445,7 +1448,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1492,7 +1495,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ERR_RETURN(trans->getNdbError()); } } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1632,7 +1635,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false); else { // Table has no keys Loading Loading @@ -1681,7 +1684,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1729,7 +1732,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; Loading Loading @@ -1761,7 +1764,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else Loading Loading @@ -2065,7 +2068,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_RETURN(res); } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2098,7 +2101,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2230,7 +2233,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); Loading Loading @@ -2430,7 +2433,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Execute update operation if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2501,7 +2504,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2942,7 +2945,7 @@ int ha_ndbcluster::close_scan() // Lock row DBUG_PRINT("info", ("Keeping lock on scanned row")); if (!(op= cursor->lockTuple())) if (!(op= cursor->lockCurrentTuple())) { m_lock_tuple= false; ERR_RETURN(trans->getNdbError()); Loading @@ -2957,7 +2960,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3367,7 +3370,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); Loading Loading @@ -3540,6 +3543,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else Loading @@ -3555,6 +3559,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* Loading Loading @@ -3761,6 +3766,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement Loading Loading @@ -6009,7 +6015,8 @@ int ha_ndbcluster::write_ndb_file() } void ha_ndbcluster::release_completed_operations(NdbConnection *trans) ha_ndbcluster::release_completed_operations(NdbTransaction *trans, bool force_release) { if (trans->hasBlobOperation()) { Loading @@ -6018,7 +6025,19 @@ ha_ndbcluster::release_completed_operations(NdbConnection *trans) */ return; } if (!force_release) { if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) { /* We are batching reads and have not consumed all fetched rows yet, releasing operation records is unsafe */ return; } } trans->releaseCompletedOperations(); } int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, Loading @@ -6033,6 +6052,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table->s->reclength; NdbOperation* op; Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value(m_retrieve_all_fields)) { Loading @@ -6046,7 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** Loading Loading @@ -6193,7 +6213,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; Loading sql/ha_ndbcluster.h +9 −3 Original line number Diff line number Diff line Loading @@ -435,6 +435,11 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 } NDB_QUERY_STATE_BITS; /* Place holder for ha_ndbcluster thread specific data */ Loading @@ -451,6 +456,7 @@ class Thd_ndb NdbTransaction *stmt; int error; List<NDB_SHARE> changed_tables; uint query_state; }; class ha_ndbcluster: public handler Loading Loading @@ -672,8 +678,8 @@ static void set_tabname(const char *pathname, char *tabname); NdbScanOperation* op); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; Loading Loading @@ -716,7 +722,7 @@ static void set_tabname(const char *pathname, char *tabname); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; void release_completed_operations(NdbConnection*); void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; Loading Loading
sql/ha_ndbcluster.cc +41 −21 Original line number Diff line number Diff line Loading @@ -228,14 +228,15 @@ static int ndb_to_mysql_error(const NdbError *err) inline int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans); h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AbortOnError, h->m_force_send); Loading Loading @@ -268,14 +269,15 @@ int execute_commit(THD *thd, NdbTransaction *trans) } inline int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans) int execute_no_commit_ie(ha_ndbcluster *h, NdbTransaction *trans, bool force_release) { #ifdef NOT_USED int m_batch_execute= 0; if (m_batch_execute) return 0; #endif h->release_completed_operations(trans); h->release_completed_operations(trans, force_release); return trans->execute(NdbTransaction::NoCommit, NdbTransaction::AO_IgnoreError, h->m_force_send); Loading @@ -292,6 +294,7 @@ Thd_ndb::Thd_ndb() all= NULL; stmt= NULL; error= 0; query_state&= NDB_QUERY_NORMAL; } Thd_ndb::~Thd_ndb() Loading Loading @@ -1445,7 +1448,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1492,7 +1495,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) ERR_RETURN(trans->getNdbError()); } } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1632,7 +1635,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record) } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); res= execute_no_commit_ie(this,trans,false); else { // Table has no keys Loading Loading @@ -1681,7 +1684,7 @@ int ha_ndbcluster::unique_index_read(const byte *key, if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit_ie(this,trans) != 0) if (execute_no_commit_ie(this,trans,false) != 0) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); Loading Loading @@ -1729,7 +1732,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) */ if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); m_ops_pending= 0; m_blobs_pending= FALSE; Loading Loading @@ -1761,7 +1764,7 @@ inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor) { if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(-1); } else Loading Loading @@ -2065,7 +2068,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, DBUG_RETURN(res); } if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2098,7 +2101,7 @@ int ha_ndbcluster::full_table_scan(byte *buf) if ((res= define_read_attrs(buf, op))) DBUG_RETURN(res); if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) DBUG_RETURN(ndb_err(trans)); DBUG_PRINT("exit", ("Scan started successfully")); DBUG_RETURN(next_result(buf)); Loading Loading @@ -2230,7 +2233,7 @@ int ha_ndbcluster::write_row(byte *record) m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this,trans) != 0) if (execute_no_commit(this,trans,false) != 0) { m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); Loading Loading @@ -2430,7 +2433,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Execute update operation if (!cursor && execute_no_commit(this,trans) != 0) { if (!cursor && execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2501,7 +2504,7 @@ int ha_ndbcluster::delete_row(const byte *record) } // Execute delete operation if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -2942,7 +2945,7 @@ int ha_ndbcluster::close_scan() // Lock row DBUG_PRINT("info", ("Keeping lock on scanned row")); if (!(op= cursor->lockTuple())) if (!(op= cursor->lockCurrentTuple())) { m_lock_tuple= false; ERR_RETURN(trans->getNdbError()); Loading @@ -2957,7 +2960,7 @@ int ha_ndbcluster::close_scan() deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { if (execute_no_commit(this,trans,false) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading Loading @@ -3367,7 +3370,7 @@ int ha_ndbcluster::end_bulk_insert() m_bulk_insert_not_flushed= FALSE; if (m_transaction_on) { if (execute_no_commit(this, trans) != 0) if (execute_no_commit(this, trans,false) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); Loading Loading @@ -3540,6 +3543,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->stmt= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, FALSE, &ndbcluster_hton); } else Loading @@ -3555,6 +3559,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ERR_RETURN(ndb->getNdbError()); no_uncommitted_rows_reset(thd); thd_ndb->all= trans; thd_ndb->query_state&= NDB_QUERY_NORMAL; trans_register_ha(thd, TRUE, &ndbcluster_hton); /* Loading Loading @@ -3761,6 +3766,7 @@ int ha_ndbcluster::start_stmt(THD *thd, thr_lock_type lock_type) thd_ndb->stmt= trans; trans_register_ha(thd, FALSE, &ndbcluster_hton); } thd_ndb->query_state&= NDB_QUERY_NORMAL; m_active_trans= trans; // Start of statement Loading Loading @@ -6009,7 +6015,8 @@ int ha_ndbcluster::write_ndb_file() } void ha_ndbcluster::release_completed_operations(NdbConnection *trans) ha_ndbcluster::release_completed_operations(NdbTransaction *trans, bool force_release) { if (trans->hasBlobOperation()) { Loading @@ -6018,7 +6025,19 @@ ha_ndbcluster::release_completed_operations(NdbConnection *trans) */ return; } if (!force_release) { if (get_thd_ndb(current_thd)->query_state & NDB_QUERY_MULTI_READ_RANGE) { /* We are batching reads and have not consumed all fetched rows yet, releasing operation records is unsafe */ return; } } trans->releaseCompletedOperations(); } int ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, KEY_MULTI_RANGE *ranges, Loading @@ -6033,6 +6052,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, NDB_INDEX_TYPE index_type= get_index_type(active_index); ulong reclength= table->s->reclength; NdbOperation* op; Thd_ndb *thd_ndb= get_thd_ndb(current_thd); if (uses_blob_value(m_retrieve_all_fields)) { Loading @@ -6046,7 +6066,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, sorted, buffer)); } thd_ndb->query_state|= NDB_QUERY_MULTI_READ_RANGE; m_disable_multi_read= FALSE; /** Loading Loading @@ -6193,7 +6213,7 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, */ m_current_multi_operation= lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation(); if (!(res= execute_no_commit_ie(this, m_active_trans))) if (!(res= execute_no_commit_ie(this, m_active_trans, true))) { m_multi_range_defined= multi_range_curr; multi_range_curr= ranges; Loading
sql/ha_ndbcluster.h +9 −3 Original line number Diff line number Diff line Loading @@ -435,6 +435,11 @@ class Ndb_cond_traverse_context Ndb_rewrite_context *rewrite_stack; }; typedef enum ndb_query_state_bits { NDB_QUERY_NORMAL = 0, NDB_QUERY_MULTI_READ_RANGE = 1 } NDB_QUERY_STATE_BITS; /* Place holder for ha_ndbcluster thread specific data */ Loading @@ -451,6 +456,7 @@ class Thd_ndb NdbTransaction *stmt; int error; List<NDB_SHARE> changed_tables; uint query_state; }; class ha_ndbcluster: public handler Loading Loading @@ -672,8 +678,8 @@ static void set_tabname(const char *pathname, char *tabname); NdbScanOperation* op); friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*, bool); NdbTransaction *m_active_trans; NdbScanOperation *m_active_cursor; Loading Loading @@ -716,7 +722,7 @@ static void set_tabname(const char *pathname, char *tabname); bool m_force_send; ha_rows m_autoincrement_prefetch; bool m_transaction_on; void release_completed_operations(NdbConnection*); void release_completed_operations(NdbTransaction*, bool); Ndb_cond_stack *m_cond_stack; bool m_disable_multi_read; Loading