Loading mysql-test/r/ndb_basic.result +5 −0 Original line number Diff line number Diff line Loading @@ -40,6 +40,11 @@ SELECT * FROM t1 ORDER BY pk1; pk1 attr1 attr2 attr3 3 1 NULL 9412 9412 9413 17 9413 UPDATE t1 SET pk1=4 WHERE pk1 = 3; SELECT * FROM t1 ORDER BY pk1; pk1 attr1 attr2 attr3 4 1 NULL 9412 9412 9413 17 9413 DELETE FROM t1; SELECT * FROM t1; pk1 attr1 attr2 attr3 Loading mysql-test/t/ndb_basic.test +2 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,8 @@ UPDATE t1 SET pk1=2 WHERE attr1=1; SELECT * FROM t1 ORDER BY pk1; UPDATE t1 SET pk1=pk1 + 1; SELECT * FROM t1 ORDER BY pk1; UPDATE t1 SET pk1=4 WHERE pk1 = 3; SELECT * FROM t1 ORDER BY pk1; # Delete the record DELETE FROM t1; Loading sql/ha_ndbcluster.cc +80 −75 Original line number Diff line number Diff line Loading @@ -324,7 +324,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans) DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d", err.code, res)); if (res == HA_ERR_FOUND_DUPP_KEY) dupkey= table->primary_key; m_dupkey= table->primary_key; DBUG_RETURN(res); } Loading Loading @@ -551,7 +551,7 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) blob_size+= 8 - blob_size % 8; if (loop == 1) { char *buf= blobs_buffer + offset; char *buf= m_blobs_buffer + offset; uint32 len= 0xffffffff; // Max uint32 DBUG_PRINT("value", ("read blob ptr=%x len=%u", (uint)buf, (uint)blob_len)); Loading @@ -563,15 +563,15 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) offset+= blob_size; } } if (loop == 0 && offset > blobs_buffer_size) if (loop == 0 && offset > m_blobs_buffer_size) { my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); blobs_buffer_size= 0; my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer_size= 0; DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); blobs_buffer= my_malloc(offset, MYF(MY_WME)); if (blobs_buffer == NULL) m_blobs_buffer= my_malloc(offset, MYF(MY_WME)); if (m_blobs_buffer == NULL) DBUG_RETURN(-1); blobs_buffer_size= offset; m_blobs_buffer_size= offset; } } DBUG_RETURN(0); Loading Loading @@ -854,7 +854,7 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { if (type >= TL_WRITE_ALLOW_WRITE) return NdbOperation::LM_Exclusive; else if (uses_blob_value(retrieve_all_fields)) else if (uses_blob_value(m_retrieve_all_fields)) return NdbOperation::LM_Read; else return NdbOperation::LM_CommittedRead; Loading Loading @@ -1018,7 +1018,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) { Field *field= table->field[i]; if ((thd->query_id == field->query_id) || retrieve_all_fields) m_retrieve_all_fields) { if (get_ndb_value(op, field, i, buf)) ERR_RETURN(trans->getNdbError()); Loading Loading @@ -1055,7 +1055,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) THD *thd= current_thd; DBUG_ENTER("complemented_pk_read"); if (retrieve_all_fields) if (m_retrieve_all_fields) // We have allready retrieved all fields, nothing to complement DBUG_RETURN(0); Loading Loading @@ -1192,12 +1192,12 @@ inline int ha_ndbcluster::next_result(byte *buf) /* We can only handle one tuple with blobs at a time. */ if (ops_pending && blobs_pending) if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) DBUG_RETURN(ndb_err(trans)); ops_pending= 0; blobs_pending= FALSE; m_ops_pending= 0; m_blobs_pending= FALSE; } check= cursor->nextResult(contact_ndb); if (check == 0) Loading @@ -1219,8 +1219,8 @@ inline int ha_ndbcluster::next_result(byte *buf) all pending update or delete operations should be sent to NDB */ DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); if (ops_pending) DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (m_ops_pending) { if (current_thd->transaction.on) { Loading @@ -1234,7 +1234,7 @@ inline int ha_ndbcluster::next_result(byte *buf) int res= trans->restart(); DBUG_ASSERT(res == 0); } ops_pending= 0; m_ops_pending= 0; } contact_ndb= (check == 2); Loading Loading @@ -1423,7 +1423,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) Field *field= table->field[i]; if ((thd->query_id == field->query_id) || (field->flags & PRI_KEY_FLAG) || retrieve_all_fields) m_retrieve_all_fields) { if (get_ndb_value(op, field, i, buf)) ERR_RETURN(op->getNdbError()); Loading Loading @@ -1666,9 +1666,9 @@ int ha_ndbcluster::write_row(byte *record) if (has_auto_increment) { skip_auto_increment= FALSE; m_skip_auto_increment= FALSE; update_auto_increment(); skip_auto_increment= !auto_increment_column_changed; m_skip_auto_increment= !auto_increment_column_changed; } if ((res= set_primary_key(op))) Loading @@ -1683,7 +1683,7 @@ int ha_ndbcluster::write_row(byte *record) if (!(field->flags & PRI_KEY_FLAG) && set_ndb_value(op, field, i, &set_blob_value)) { skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; ERR_RETURN(op->getNdbError()); } } Loading @@ -1695,25 +1695,25 @@ int ha_ndbcluster::write_row(byte *record) to NoCommit the transaction between each row. Find out how this is detected! */ rows_inserted++; m_rows_inserted++; no_uncommitted_rows_update(1); bulk_insert_not_flushed= TRUE; if ((rows_to_insert == 1) || ((rows_inserted % bulk_insert_rows) == 0) || m_bulk_insert_not_flushed= TRUE; if ((m_rows_to_insert == 1) || ((m_rows_inserted % m_bulk_insert_rows) == 0) || set_blob_value) { THD *thd= current_thd; // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", (int)rows_inserted, (int)bulk_insert_rows)); (int)m_rows_inserted, (int)m_bulk_insert_rows)); bulk_insert_not_flushed= FALSE; m_bulk_insert_not_flushed= FALSE; if (thd->transaction.on) { if (execute_no_commit(this,trans) != 0) { skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading @@ -1722,7 +1722,7 @@ int ha_ndbcluster::write_row(byte *record) { if (execute_commit(this,trans) != 0) { skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading @@ -1730,7 +1730,7 @@ int ha_ndbcluster::write_row(byte *record) DBUG_ASSERT(res == 0); } } if ((has_auto_increment) && (skip_auto_increment)) if ((has_auto_increment) && (m_skip_auto_increment)) { Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1; DBUG_PRINT("info", Loading @@ -1740,7 +1740,7 @@ int ha_ndbcluster::write_row(byte *record) DBUG_PRINT("info", ("Setting next auto increment value to %u", next_val)); } skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; DBUG_RETURN(0); } Loading Loading @@ -1820,7 +1820,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Delete old row DBUG_PRINT("info", ("insert succeded")); m_primary_key_update= TRUE; delete_res= delete_row(old_data); m_primary_key_update= FALSE; if (delete_res) { DBUG_PRINT("info", ("delete failed")); Loading @@ -1843,9 +1845,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) DBUG_PRINT("info", ("Calling updateTuple on cursor")); if (!(op= cursor->updateTuple())) ERR_RETURN(trans->getNdbError()); ops_pending++; m_ops_pending++; if (uses_blob_value(FALSE)) blobs_pending= TRUE; m_blobs_pending= TRUE; } else { Loading Loading @@ -1921,7 +1923,7 @@ int ha_ndbcluster::delete_row(const byte *record) DBUG_PRINT("info", ("Calling deleteTuple on cursor")); if (cursor->deleteTuple() != 0) ERR_RETURN(trans->getNdbError()); ops_pending++; m_ops_pending++; no_uncommitted_rows_update(-1); Loading Loading @@ -1951,7 +1953,9 @@ int ha_ndbcluster::delete_row(const byte *record) else { int res; if ((res= set_primary_key(op))) if ((res= (m_primary_key_update ? set_primary_key_from_old_data(op, record) : set_primary_key(op)))) return res; } } Loading Loading @@ -2411,18 +2415,18 @@ int ha_ndbcluster::close_scan() DBUG_RETURN(1); if (ops_pending) if (m_ops_pending) { /* Take over any pending transactions to the deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } ops_pending= 0; m_ops_pending= 0; } cursor->close(); Loading Loading @@ -2555,7 +2559,7 @@ void ha_ndbcluster::info(uint flag) if (flag & HA_STATUS_ERRKEY) { DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); errkey= dupkey; errkey= m_dupkey; } if (flag & HA_STATUS_AUTO) DBUG_PRINT("info", ("HA_STATUS_AUTO")); Loading Loading @@ -2664,7 +2668,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) where field->query_id is the same as the current query id */ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS")); retrieve_all_fields= TRUE; m_retrieve_all_fields= TRUE; break; case HA_EXTRA_PREPARE_FOR_DELETE: DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE")); Loading Loading @@ -2708,8 +2712,8 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) DBUG_ENTER("start_bulk_insert"); DBUG_PRINT("enter", ("rows: %d", (int)rows)); rows_inserted= 0; rows_to_insert= rows; m_rows_inserted= 0; m_rows_to_insert= rows; /* Calculate how many rows that should be inserted Loading @@ -2723,7 +2727,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) batch= bytesperbatch/bytes; batch= batch == 0 ? 1 : batch; DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes)); bulk_insert_rows= batch; m_bulk_insert_rows= batch; DBUG_VOID_RETURN; } Loading @@ -2737,22 +2741,22 @@ int ha_ndbcluster::end_bulk_insert() DBUG_ENTER("end_bulk_insert"); // Check if last inserts need to be flushed if (bulk_insert_not_flushed) if (m_bulk_insert_not_flushed) { NdbConnection *trans= m_active_trans; // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", rows_inserted, bulk_insert_rows)); bulk_insert_not_flushed= FALSE; m_rows_inserted, m_bulk_insert_rows)); m_bulk_insert_not_flushed= FALSE; if (execute_no_commit(this,trans) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); } } rows_inserted= 0; rows_to_insert= 1; m_rows_inserted= 0; m_rows_to_insert= 1; DBUG_RETURN(error); } Loading Loading @@ -2938,8 +2942,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) (NdbConnection*)thd->transaction.stmt.ndb_tid; DBUG_ASSERT(m_active_trans); // Start of transaction retrieve_all_fields= FALSE; ops_pending= 0; m_retrieve_all_fields= FALSE; m_ops_pending= 0; { NDBDICT *dict= m_ndb->getDictionary(); const NDBTAB *tab; Loading Loading @@ -2987,13 +2991,13 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) DBUG_PRINT("warning", ("m_active_cursor != NULL")); m_active_cursor= NULL; if (blobs_pending) if (m_blobs_pending) DBUG_PRINT("warning", ("blobs_pending != 0")); blobs_pending= 0; m_blobs_pending= 0; if (ops_pending) if (m_ops_pending) DBUG_PRINT("warning", ("ops_pending != 0L")); ops_pending= 0; m_ops_pending= 0; } DBUG_RETURN(error); } Loading Loading @@ -3030,8 +3034,8 @@ int ha_ndbcluster::start_stmt(THD *thd) m_active_trans= trans; // Start of statement retrieve_all_fields= FALSE; ops_pending= 0; m_retrieve_all_fields= FALSE; m_ops_pending= 0; DBUG_RETURN(error); } Loading Loading @@ -3572,13 +3576,13 @@ longlong ha_ndbcluster::get_auto_increment() DBUG_ENTER("get_auto_increment"); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); int cache_size= (rows_to_insert - rows_inserted < autoincrement_prefetch) ? rows_to_insert - rows_inserted : (rows_to_insert > autoincrement_prefetch) ? rows_to_insert (m_rows_to_insert - m_rows_inserted < autoincrement_prefetch) ? m_rows_to_insert - m_rows_inserted : (m_rows_to_insert > autoincrement_prefetch) ? m_rows_to_insert : autoincrement_prefetch; Uint64 auto_value= (skip_auto_increment) ? (m_skip_auto_increment) ? m_ndb->readAutoIncrementValue((const NDBTAB *) m_table) : m_ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size); DBUG_RETURN((longlong)auto_value); Loading @@ -3603,17 +3607,18 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_share(0), m_use_write(FALSE), m_ignore_dup_key_not_supported(FALSE), retrieve_all_fields(FALSE), rows_to_insert(1), rows_inserted(0), bulk_insert_rows(1024), bulk_insert_not_flushed(FALSE), ops_pending(0), skip_auto_increment(TRUE), blobs_pending(0), blobs_buffer(0), blobs_buffer_size(0), dupkey((uint) -1) m_primary_key_update(FALSE), m_retrieve_all_fields(FALSE), m_rows_to_insert(1), m_rows_inserted(0), m_bulk_insert_rows(1024), m_bulk_insert_not_flushed(FALSE), m_ops_pending(0), m_skip_auto_increment(TRUE), m_blobs_pending(0), m_blobs_buffer(0), m_blobs_buffer_size(0), m_dupkey((uint) -1) { int i; Loading Loading @@ -3647,8 +3652,8 @@ ha_ndbcluster::~ha_ndbcluster() if (m_share) free_share(m_share); release_metadata(); my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); blobs_buffer= 0; my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer= 0; // Check for open cursor/transaction if (m_active_cursor) { Loading sql/ha_ndbcluster.h +12 −11 Original line number Diff line number Diff line Loading @@ -243,18 +243,19 @@ class ha_ndbcluster: public handler NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; bool m_use_write; bool m_ignore_dup_key_not_supported; bool retrieve_all_fields; ha_rows rows_to_insert; ha_rows rows_inserted; ha_rows bulk_insert_rows; bool bulk_insert_not_flushed; ha_rows ops_pending; bool skip_auto_increment; bool blobs_pending; bool m_primary_key_update; bool m_retrieve_all_fields; ha_rows m_rows_to_insert; ha_rows m_rows_inserted; ha_rows m_bulk_insert_rows; bool m_bulk_insert_not_flushed; ha_rows m_ops_pending; bool m_skip_auto_increment; bool m_blobs_pending; // memory for blobs in one tuple char *blobs_buffer; uint32 blobs_buffer_size; uint dupkey; char *m_blobs_buffer; uint32 m_blobs_buffer_size; uint m_dupkey; void set_rec_per_key(); void records_update(); Loading Loading
mysql-test/r/ndb_basic.result +5 −0 Original line number Diff line number Diff line Loading @@ -40,6 +40,11 @@ SELECT * FROM t1 ORDER BY pk1; pk1 attr1 attr2 attr3 3 1 NULL 9412 9412 9413 17 9413 UPDATE t1 SET pk1=4 WHERE pk1 = 3; SELECT * FROM t1 ORDER BY pk1; pk1 attr1 attr2 attr3 4 1 NULL 9412 9412 9413 17 9413 DELETE FROM t1; SELECT * FROM t1; pk1 attr1 attr2 attr3 Loading
mysql-test/t/ndb_basic.test +2 −0 Original line number Diff line number Diff line Loading @@ -36,6 +36,8 @@ UPDATE t1 SET pk1=2 WHERE attr1=1; SELECT * FROM t1 ORDER BY pk1; UPDATE t1 SET pk1=pk1 + 1; SELECT * FROM t1 ORDER BY pk1; UPDATE t1 SET pk1=4 WHERE pk1 = 3; SELECT * FROM t1 ORDER BY pk1; # Delete the record DELETE FROM t1; Loading
sql/ha_ndbcluster.cc +80 −75 Original line number Diff line number Diff line Loading @@ -324,7 +324,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans) DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d", err.code, res)); if (res == HA_ERR_FOUND_DUPP_KEY) dupkey= table->primary_key; m_dupkey= table->primary_key; DBUG_RETURN(res); } Loading Loading @@ -551,7 +551,7 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) blob_size+= 8 - blob_size % 8; if (loop == 1) { char *buf= blobs_buffer + offset; char *buf= m_blobs_buffer + offset; uint32 len= 0xffffffff; // Max uint32 DBUG_PRINT("value", ("read blob ptr=%x len=%u", (uint)buf, (uint)blob_len)); Loading @@ -563,15 +563,15 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) offset+= blob_size; } } if (loop == 0 && offset > blobs_buffer_size) if (loop == 0 && offset > m_blobs_buffer_size) { my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); blobs_buffer_size= 0; my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer_size= 0; DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); blobs_buffer= my_malloc(offset, MYF(MY_WME)); if (blobs_buffer == NULL) m_blobs_buffer= my_malloc(offset, MYF(MY_WME)); if (m_blobs_buffer == NULL) DBUG_RETURN(-1); blobs_buffer_size= offset; m_blobs_buffer_size= offset; } } DBUG_RETURN(0); Loading Loading @@ -854,7 +854,7 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type) { if (type >= TL_WRITE_ALLOW_WRITE) return NdbOperation::LM_Exclusive; else if (uses_blob_value(retrieve_all_fields)) else if (uses_blob_value(m_retrieve_all_fields)) return NdbOperation::LM_Read; else return NdbOperation::LM_CommittedRead; Loading Loading @@ -1018,7 +1018,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) { Field *field= table->field[i]; if ((thd->query_id == field->query_id) || retrieve_all_fields) m_retrieve_all_fields) { if (get_ndb_value(op, field, i, buf)) ERR_RETURN(trans->getNdbError()); Loading Loading @@ -1055,7 +1055,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data) THD *thd= current_thd; DBUG_ENTER("complemented_pk_read"); if (retrieve_all_fields) if (m_retrieve_all_fields) // We have allready retrieved all fields, nothing to complement DBUG_RETURN(0); Loading Loading @@ -1192,12 +1192,12 @@ inline int ha_ndbcluster::next_result(byte *buf) /* We can only handle one tuple with blobs at a time. */ if (ops_pending && blobs_pending) if (m_ops_pending && m_blobs_pending) { if (execute_no_commit(this,trans) != 0) DBUG_RETURN(ndb_err(trans)); ops_pending= 0; blobs_pending= FALSE; m_ops_pending= 0; m_blobs_pending= FALSE; } check= cursor->nextResult(contact_ndb); if (check == 0) Loading @@ -1219,8 +1219,8 @@ inline int ha_ndbcluster::next_result(byte *buf) all pending update or delete operations should be sent to NDB */ DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); if (ops_pending) DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (m_ops_pending) { if (current_thd->transaction.on) { Loading @@ -1234,7 +1234,7 @@ inline int ha_ndbcluster::next_result(byte *buf) int res= trans->restart(); DBUG_ASSERT(res == 0); } ops_pending= 0; m_ops_pending= 0; } contact_ndb= (check == 2); Loading Loading @@ -1423,7 +1423,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) Field *field= table->field[i]; if ((thd->query_id == field->query_id) || (field->flags & PRI_KEY_FLAG) || retrieve_all_fields) m_retrieve_all_fields) { if (get_ndb_value(op, field, i, buf)) ERR_RETURN(op->getNdbError()); Loading Loading @@ -1666,9 +1666,9 @@ int ha_ndbcluster::write_row(byte *record) if (has_auto_increment) { skip_auto_increment= FALSE; m_skip_auto_increment= FALSE; update_auto_increment(); skip_auto_increment= !auto_increment_column_changed; m_skip_auto_increment= !auto_increment_column_changed; } if ((res= set_primary_key(op))) Loading @@ -1683,7 +1683,7 @@ int ha_ndbcluster::write_row(byte *record) if (!(field->flags & PRI_KEY_FLAG) && set_ndb_value(op, field, i, &set_blob_value)) { skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; ERR_RETURN(op->getNdbError()); } } Loading @@ -1695,25 +1695,25 @@ int ha_ndbcluster::write_row(byte *record) to NoCommit the transaction between each row. Find out how this is detected! */ rows_inserted++; m_rows_inserted++; no_uncommitted_rows_update(1); bulk_insert_not_flushed= TRUE; if ((rows_to_insert == 1) || ((rows_inserted % bulk_insert_rows) == 0) || m_bulk_insert_not_flushed= TRUE; if ((m_rows_to_insert == 1) || ((m_rows_inserted % m_bulk_insert_rows) == 0) || set_blob_value) { THD *thd= current_thd; // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", (int)rows_inserted, (int)bulk_insert_rows)); (int)m_rows_inserted, (int)m_bulk_insert_rows)); bulk_insert_not_flushed= FALSE; m_bulk_insert_not_flushed= FALSE; if (thd->transaction.on) { if (execute_no_commit(this,trans) != 0) { skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading @@ -1722,7 +1722,7 @@ int ha_ndbcluster::write_row(byte *record) { if (execute_commit(this,trans) != 0) { skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } Loading @@ -1730,7 +1730,7 @@ int ha_ndbcluster::write_row(byte *record) DBUG_ASSERT(res == 0); } } if ((has_auto_increment) && (skip_auto_increment)) if ((has_auto_increment) && (m_skip_auto_increment)) { Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1; DBUG_PRINT("info", Loading @@ -1740,7 +1740,7 @@ int ha_ndbcluster::write_row(byte *record) DBUG_PRINT("info", ("Setting next auto increment value to %u", next_val)); } skip_auto_increment= TRUE; m_skip_auto_increment= TRUE; DBUG_RETURN(0); } Loading Loading @@ -1820,7 +1820,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) } // Delete old row DBUG_PRINT("info", ("insert succeded")); m_primary_key_update= TRUE; delete_res= delete_row(old_data); m_primary_key_update= FALSE; if (delete_res) { DBUG_PRINT("info", ("delete failed")); Loading @@ -1843,9 +1845,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) DBUG_PRINT("info", ("Calling updateTuple on cursor")); if (!(op= cursor->updateTuple())) ERR_RETURN(trans->getNdbError()); ops_pending++; m_ops_pending++; if (uses_blob_value(FALSE)) blobs_pending= TRUE; m_blobs_pending= TRUE; } else { Loading Loading @@ -1921,7 +1923,7 @@ int ha_ndbcluster::delete_row(const byte *record) DBUG_PRINT("info", ("Calling deleteTuple on cursor")); if (cursor->deleteTuple() != 0) ERR_RETURN(trans->getNdbError()); ops_pending++; m_ops_pending++; no_uncommitted_rows_update(-1); Loading Loading @@ -1951,7 +1953,9 @@ int ha_ndbcluster::delete_row(const byte *record) else { int res; if ((res= set_primary_key(op))) if ((res= (m_primary_key_update ? set_primary_key_from_old_data(op, record) : set_primary_key(op)))) return res; } } Loading Loading @@ -2411,18 +2415,18 @@ int ha_ndbcluster::close_scan() DBUG_RETURN(1); if (ops_pending) if (m_ops_pending) { /* Take over any pending transactions to the deleteing/updating transaction before closing the scan */ DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending)); if (execute_no_commit(this,trans) != 0) { no_uncommitted_rows_execute_failure(); DBUG_RETURN(ndb_err(trans)); } ops_pending= 0; m_ops_pending= 0; } cursor->close(); Loading Loading @@ -2555,7 +2559,7 @@ void ha_ndbcluster::info(uint flag) if (flag & HA_STATUS_ERRKEY) { DBUG_PRINT("info", ("HA_STATUS_ERRKEY")); errkey= dupkey; errkey= m_dupkey; } if (flag & HA_STATUS_AUTO) DBUG_PRINT("info", ("HA_STATUS_AUTO")); Loading Loading @@ -2664,7 +2668,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) where field->query_id is the same as the current query id */ DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS")); retrieve_all_fields= TRUE; m_retrieve_all_fields= TRUE; break; case HA_EXTRA_PREPARE_FOR_DELETE: DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE")); Loading Loading @@ -2708,8 +2712,8 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) DBUG_ENTER("start_bulk_insert"); DBUG_PRINT("enter", ("rows: %d", (int)rows)); rows_inserted= 0; rows_to_insert= rows; m_rows_inserted= 0; m_rows_to_insert= rows; /* Calculate how many rows that should be inserted Loading @@ -2723,7 +2727,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) batch= bytesperbatch/bytes; batch= batch == 0 ? 1 : batch; DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes)); bulk_insert_rows= batch; m_bulk_insert_rows= batch; DBUG_VOID_RETURN; } Loading @@ -2737,22 +2741,22 @@ int ha_ndbcluster::end_bulk_insert() DBUG_ENTER("end_bulk_insert"); // Check if last inserts need to be flushed if (bulk_insert_not_flushed) if (m_bulk_insert_not_flushed) { NdbConnection *trans= m_active_trans; // Send rows to NDB DBUG_PRINT("info", ("Sending inserts to NDB, "\ "rows_inserted:%d, bulk_insert_rows: %d", rows_inserted, bulk_insert_rows)); bulk_insert_not_flushed= FALSE; m_rows_inserted, m_bulk_insert_rows)); m_bulk_insert_not_flushed= FALSE; if (execute_no_commit(this,trans) != 0) { no_uncommitted_rows_execute_failure(); my_errno= error= ndb_err(trans); } } rows_inserted= 0; rows_to_insert= 1; m_rows_inserted= 0; m_rows_to_insert= 1; DBUG_RETURN(error); } Loading Loading @@ -2938,8 +2942,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) (NdbConnection*)thd->transaction.stmt.ndb_tid; DBUG_ASSERT(m_active_trans); // Start of transaction retrieve_all_fields= FALSE; ops_pending= 0; m_retrieve_all_fields= FALSE; m_ops_pending= 0; { NDBDICT *dict= m_ndb->getDictionary(); const NDBTAB *tab; Loading Loading @@ -2987,13 +2991,13 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) DBUG_PRINT("warning", ("m_active_cursor != NULL")); m_active_cursor= NULL; if (blobs_pending) if (m_blobs_pending) DBUG_PRINT("warning", ("blobs_pending != 0")); blobs_pending= 0; m_blobs_pending= 0; if (ops_pending) if (m_ops_pending) DBUG_PRINT("warning", ("ops_pending != 0L")); ops_pending= 0; m_ops_pending= 0; } DBUG_RETURN(error); } Loading Loading @@ -3030,8 +3034,8 @@ int ha_ndbcluster::start_stmt(THD *thd) m_active_trans= trans; // Start of statement retrieve_all_fields= FALSE; ops_pending= 0; m_retrieve_all_fields= FALSE; m_ops_pending= 0; DBUG_RETURN(error); } Loading Loading @@ -3572,13 +3576,13 @@ longlong ha_ndbcluster::get_auto_increment() DBUG_ENTER("get_auto_increment"); DBUG_PRINT("enter", ("m_tabname: %s", m_tabname)); int cache_size= (rows_to_insert - rows_inserted < autoincrement_prefetch) ? rows_to_insert - rows_inserted : (rows_to_insert > autoincrement_prefetch) ? rows_to_insert (m_rows_to_insert - m_rows_inserted < autoincrement_prefetch) ? m_rows_to_insert - m_rows_inserted : (m_rows_to_insert > autoincrement_prefetch) ? m_rows_to_insert : autoincrement_prefetch; Uint64 auto_value= (skip_auto_increment) ? (m_skip_auto_increment) ? m_ndb->readAutoIncrementValue((const NDBTAB *) m_table) : m_ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size); DBUG_RETURN((longlong)auto_value); Loading @@ -3603,17 +3607,18 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): m_share(0), m_use_write(FALSE), m_ignore_dup_key_not_supported(FALSE), retrieve_all_fields(FALSE), rows_to_insert(1), rows_inserted(0), bulk_insert_rows(1024), bulk_insert_not_flushed(FALSE), ops_pending(0), skip_auto_increment(TRUE), blobs_pending(0), blobs_buffer(0), blobs_buffer_size(0), dupkey((uint) -1) m_primary_key_update(FALSE), m_retrieve_all_fields(FALSE), m_rows_to_insert(1), m_rows_inserted(0), m_bulk_insert_rows(1024), m_bulk_insert_not_flushed(FALSE), m_ops_pending(0), m_skip_auto_increment(TRUE), m_blobs_pending(0), m_blobs_buffer(0), m_blobs_buffer_size(0), m_dupkey((uint) -1) { int i; Loading Loading @@ -3647,8 +3652,8 @@ ha_ndbcluster::~ha_ndbcluster() if (m_share) free_share(m_share); release_metadata(); my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); blobs_buffer= 0; my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer= 0; // Check for open cursor/transaction if (m_active_cursor) { Loading
sql/ha_ndbcluster.h +12 −11 Original line number Diff line number Diff line Loading @@ -243,18 +243,19 @@ class ha_ndbcluster: public handler NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; bool m_use_write; bool m_ignore_dup_key_not_supported; bool retrieve_all_fields; ha_rows rows_to_insert; ha_rows rows_inserted; ha_rows bulk_insert_rows; bool bulk_insert_not_flushed; ha_rows ops_pending; bool skip_auto_increment; bool blobs_pending; bool m_primary_key_update; bool m_retrieve_all_fields; ha_rows m_rows_to_insert; ha_rows m_rows_inserted; ha_rows m_bulk_insert_rows; bool m_bulk_insert_not_flushed; ha_rows m_ops_pending; bool m_skip_auto_increment; bool m_blobs_pending; // memory for blobs in one tuple char *blobs_buffer; uint32 blobs_buffer_size; uint dupkey; char *m_blobs_buffer; uint32 m_blobs_buffer_size; uint m_dupkey; void set_rec_per_key(); void records_update(); Loading