Loading sql/ha_ndbcluster.cc +175 −38 Original line number Diff line number Diff line Loading @@ -1242,6 +1242,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info, { char unique_index_name[FN_LEN]; static const char* unique_suffix= "$unique"; m_has_unique_index= TRUE; strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS); DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name)); const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname); Loading @@ -1268,7 +1269,7 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error) KEY* key_info= tab->key_info; const char **key_name= tab->s->keynames.type_names; DBUG_ENTER("ha_ndbcluster::open_indexes"); m_has_unique_index= FALSE; for (i= 0; i < tab->s->keys; i++, key_info++, key_name++) { if ((error= add_index_handle(thd, dict, key_info, *key_name, i))) Loading Loading @@ -1570,6 +1571,25 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec DBUG_RETURN(0); } int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno) { KEY* key_info= table->key_info + keyno; KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* end= key_part+key_info->key_parts; uint i; DBUG_ENTER("set_index_key_from_record"); for (i= 0; key_part != end; key_part++, i++) { Field* field= key_part->field; if (set_ndb_key(op, field, m_index[keyno].unique_index_attrid_map[i], record+key_part->offset)) ERR_RETURN(m_active_trans->getNdbError()); } DBUG_RETURN(0); } int ha_ndbcluster::set_index_key(NdbOperation *op, const KEY *key_info, Loading Loading @@ -1778,22 +1798,89 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data, } /* Peek to check if a particular row already exists * Check that all operations between first and last all * have gotten the errcode * If checking for HA_ERR_KEY_NOT_FOUND then update m_dupkey * for all succeeding operations */ bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans, const NdbOperation *first, const NdbOperation *last, uint errcode) { const NdbOperation *op= first; DBUG_ENTER("ha_ndbcluster::check_all_operations_for_error"); while(op) { NdbError err= op->getNdbError(); if (err.status != NdbError::Success) { if (ndb_to_mysql_error(&err) != (int) errcode) DBUG_RETURN(false); if (op == last) break; op= trans->getNextCompletedOperation(op); } else { // We found a duplicate if (op->getType() == NdbOperation::UniqueIndexAccess) { if (errcode == HA_ERR_KEY_NOT_FOUND) { NdbIndexOperation *iop= (NdbIndexOperation *) op; const NDBINDEX *index= iop->getIndex(); // Find the key_no of the index for(uint i= 0; i<table->s->keys; i++) { if (m_index[i].unique_index == index) { m_dupkey= i; break; } } } } else { // Must have been primary key access DBUG_ASSERT(op->getType() == NdbOperation::PrimaryKeyAccess); if (errcode == HA_ERR_KEY_NOT_FOUND) m_dupkey= table->s->primary_key; } DBUG_RETURN(false); } } DBUG_RETURN(true); } /* * Peek to check if any rows already exist with conflicting * primary key or unique index values */ int ha_ndbcluster::peek_row(const byte *record) int ha_ndbcluster::peek_indexed_rows(const byte *record) { NdbTransaction *trans= m_active_trans; NdbOperation *op; DBUG_ENTER("peek_row"); const NdbOperation *first, *last; uint i; int res; DBUG_ENTER("peek_indexed_rows"); NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); NdbOperation::LockMode lm= NdbOperation::LM_Read; first= NULL; if (table->s->primary_key != MAX_KEY) { /* * Fetch any row with colliding primary key */ if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || op->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); int res; first= op; if ((res= set_primary_key_from_record(op, record))) ERR_RETURN(trans->getNdbError()); Loading @@ -1809,15 +1896,56 @@ int ha_ndbcluster::peek_row(const byte *record) } op->setPartitionId(part_id); } } /* * Fetch any rows with colliding unique indexes */ KEY* key_info; KEY_PART_INFO *key_part, *end; for (i= 0, key_info= table->key_info; i < table->s->keys; i++, key_info++) { if (i != table->s->primary_key && key_info->flags & HA_NOSAME) { // A unique index is defined on table NdbIndexOperation *iop; NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index; key_part= key_info->key_part; end= key_part + key_info->key_parts; if (!(iop= trans->getNdbIndexOperation(unique_index, (const NDBTAB *) m_table)) || iop->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); if (execute_no_commit_ie(this,trans) != 0) if (!first) first= iop; if ((res= set_index_key_from_record(iop, record, i))) ERR_RETURN(trans->getNdbError()); } } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); else { // Table has no keys table->status= STATUS_NOT_FOUND; DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); } if (check_all_operations_for_error(trans, first, last, HA_ERR_KEY_NOT_FOUND)) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); } else { DBUG_PRINT("info", ("m_dupkey %d", m_dupkey)); } DBUG_RETURN(0); } /* Read one record from NDB using unique secondary index */ Loading Loading @@ -2312,13 +2440,33 @@ int ha_ndbcluster::write_row(byte *record) DBUG_ENTER("ha_ndbcluster::write_row"); m_write_op= TRUE; if (!m_use_write && m_ignore_dup_key && table_share->primary_key != MAX_KEY) has_auto_increment= (table->next_number_field && record == table->record[0]); if (table_share->primary_key != MAX_KEY) { /* * Increase any auto_incremented primary key */ if (has_auto_increment) { int peek_res= peek_row(record); THD *thd= table->in_use; m_skip_auto_increment= FALSE; update_auto_increment(); /* Ensure that handler is always called for auto_increment values */ thd->next_insert_id= 0; m_skip_auto_increment= !auto_increment_column_changed; } } /* * If IGNORE the ignore constraint violations on primary and unique keys */ if (!m_use_write && m_ignore_dup_key) { int peek_res= peek_indexed_rows(record); if (!peek_res) { m_dupkey= table_share->primary_key; DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); } if (peek_res != HA_ERR_KEY_NOT_FOUND) Loading @@ -2328,7 +2476,6 @@ int ha_ndbcluster::write_row(byte *record) statistic_increment(thd->status_var.ha_write_count, &LOCK_status); if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) table->timestamp_field->set_time(); has_auto_increment= (table->next_number_field && record == table->record[0]); if (!(op= trans->getNdbOperation((const NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); Loading Loading @@ -2369,17 +2516,6 @@ int ha_ndbcluster::write_row(byte *record) { int res; if (has_auto_increment) { THD *thd= table->in_use; m_skip_auto_increment= FALSE; update_auto_increment(); /* Ensure that handler is always called for auto_increment values */ thd->next_insert_id= 0; m_skip_auto_increment= !auto_increment_column_changed; } if ((res= set_primary_key_from_record(op, record))) return res; } Loading Loading @@ -3465,7 +3601,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) break; case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY")); if (current_thd->lex->sql_command == SQLCOM_REPLACE) if (current_thd->lex->sql_command == SQLCOM_REPLACE && !m_has_unique_index) { DBUG_PRINT("info", ("Turning ON use of write instead of insert")); m_use_write= TRUE; Loading Loading @@ -5139,6 +5275,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg): m_sorted(FALSE), m_use_write(FALSE), m_ignore_dup_key(FALSE), m_has_unique_index(FALSE), m_primary_key_update(FALSE), m_ignore_no_key(FALSE), m_rows_to_insert((ha_rows) 1), Loading sql/ha_ndbcluster.h +8 −1 Original line number Diff line number Diff line Loading @@ -738,7 +738,11 @@ static void set_tabname(const char *pathname, char *tabname); part_id_range *part_spec); int full_table_scan(byte * buf); int peek_row(const byte *record); bool check_all_operations_for_error(NdbTransaction *trans, const NdbOperation *first, const NdbOperation *last, uint errcode); int peek_indexed_rows(const byte *record); int unique_index_read(const byte *key, uint key_len, byte *buf); int fetch_next(NdbScanOperation* op); Loading Loading @@ -766,6 +770,8 @@ static void set_tabname(const char *pathname, char *tabname); int get_ndb_blobs_value(NdbBlob *last_ndb_blob); int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key_from_record(NdbOperation *op, const byte *record); int set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno); int set_bounds(NdbIndexScanOperation*, uint inx, bool rir, const key_range *keys[2], uint= 0); int key_cmp(uint keynr, const byte * old_row, const byte * new_row); Loading Loading @@ -832,6 +838,7 @@ static void set_tabname(const char *pathname, char *tabname); bool m_sorted; bool m_use_write; bool m_ignore_dup_key; bool m_has_unique_index; bool m_primary_key_update; bool m_write_op; bool m_ignore_no_key; Loading Loading
sql/ha_ndbcluster.cc +175 −38 Original line number Diff line number Diff line Loading @@ -1242,6 +1242,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info, { char unique_index_name[FN_LEN]; static const char* unique_suffix= "$unique"; m_has_unique_index= TRUE; strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS); DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name)); const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname); Loading @@ -1268,7 +1269,7 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error) KEY* key_info= tab->key_info; const char **key_name= tab->s->keynames.type_names; DBUG_ENTER("ha_ndbcluster::open_indexes"); m_has_unique_index= FALSE; for (i= 0; i < tab->s->keys; i++, key_info++, key_name++) { if ((error= add_index_handle(thd, dict, key_info, *key_name, i))) Loading Loading @@ -1570,6 +1571,25 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec DBUG_RETURN(0); } int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno) { KEY* key_info= table->key_info + keyno; KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* end= key_part+key_info->key_parts; uint i; DBUG_ENTER("set_index_key_from_record"); for (i= 0; key_part != end; key_part++, i++) { Field* field= key_part->field; if (set_ndb_key(op, field, m_index[keyno].unique_index_attrid_map[i], record+key_part->offset)) ERR_RETURN(m_active_trans->getNdbError()); } DBUG_RETURN(0); } int ha_ndbcluster::set_index_key(NdbOperation *op, const KEY *key_info, Loading Loading @@ -1778,22 +1798,89 @@ int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data, } /* Peek to check if a particular row already exists * Check that all operations between first and last all * have gotten the errcode * If checking for HA_ERR_KEY_NOT_FOUND then update m_dupkey * for all succeeding operations */ bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans, const NdbOperation *first, const NdbOperation *last, uint errcode) { const NdbOperation *op= first; DBUG_ENTER("ha_ndbcluster::check_all_operations_for_error"); while(op) { NdbError err= op->getNdbError(); if (err.status != NdbError::Success) { if (ndb_to_mysql_error(&err) != (int) errcode) DBUG_RETURN(false); if (op == last) break; op= trans->getNextCompletedOperation(op); } else { // We found a duplicate if (op->getType() == NdbOperation::UniqueIndexAccess) { if (errcode == HA_ERR_KEY_NOT_FOUND) { NdbIndexOperation *iop= (NdbIndexOperation *) op; const NDBINDEX *index= iop->getIndex(); // Find the key_no of the index for(uint i= 0; i<table->s->keys; i++) { if (m_index[i].unique_index == index) { m_dupkey= i; break; } } } } else { // Must have been primary key access DBUG_ASSERT(op->getType() == NdbOperation::PrimaryKeyAccess); if (errcode == HA_ERR_KEY_NOT_FOUND) m_dupkey= table->s->primary_key; } DBUG_RETURN(false); } } DBUG_RETURN(true); } /* * Peek to check if any rows already exist with conflicting * primary key or unique index values */ int ha_ndbcluster::peek_row(const byte *record) int ha_ndbcluster::peek_indexed_rows(const byte *record) { NdbTransaction *trans= m_active_trans; NdbOperation *op; DBUG_ENTER("peek_row"); const NdbOperation *first, *last; uint i; int res; DBUG_ENTER("peek_indexed_rows"); NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); NdbOperation::LockMode lm= NdbOperation::LM_Read; first= NULL; if (table->s->primary_key != MAX_KEY) { /* * Fetch any row with colliding primary key */ if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || op->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); int res; first= op; if ((res= set_primary_key_from_record(op, record))) ERR_RETURN(trans->getNdbError()); Loading @@ -1809,15 +1896,56 @@ int ha_ndbcluster::peek_row(const byte *record) } op->setPartitionId(part_id); } } /* * Fetch any rows with colliding unique indexes */ KEY* key_info; KEY_PART_INFO *key_part, *end; for (i= 0, key_info= table->key_info; i < table->s->keys; i++, key_info++) { if (i != table->s->primary_key && key_info->flags & HA_NOSAME) { // A unique index is defined on table NdbIndexOperation *iop; NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index; key_part= key_info->key_part; end= key_part + key_info->key_parts; if (!(iop= trans->getNdbIndexOperation(unique_index, (const NDBTAB *) m_table)) || iop->readTuple(lm) != 0) ERR_RETURN(trans->getNdbError()); if (execute_no_commit_ie(this,trans) != 0) if (!first) first= iop; if ((res= set_index_key_from_record(iop, record, i))) ERR_RETURN(trans->getNdbError()); } } last= trans->getLastDefinedOperation(); if (first) res= execute_no_commit_ie(this,trans); else { // Table has no keys table->status= STATUS_NOT_FOUND; DBUG_RETURN(HA_ERR_KEY_NOT_FOUND); } if (check_all_operations_for_error(trans, first, last, HA_ERR_KEY_NOT_FOUND)) { table->status= STATUS_NOT_FOUND; DBUG_RETURN(ndb_err(trans)); } else { DBUG_PRINT("info", ("m_dupkey %d", m_dupkey)); } DBUG_RETURN(0); } /* Read one record from NDB using unique secondary index */ Loading Loading @@ -2312,13 +2440,33 @@ int ha_ndbcluster::write_row(byte *record) DBUG_ENTER("ha_ndbcluster::write_row"); m_write_op= TRUE; if (!m_use_write && m_ignore_dup_key && table_share->primary_key != MAX_KEY) has_auto_increment= (table->next_number_field && record == table->record[0]); if (table_share->primary_key != MAX_KEY) { /* * Increase any auto_incremented primary key */ if (has_auto_increment) { int peek_res= peek_row(record); THD *thd= table->in_use; m_skip_auto_increment= FALSE; update_auto_increment(); /* Ensure that handler is always called for auto_increment values */ thd->next_insert_id= 0; m_skip_auto_increment= !auto_increment_column_changed; } } /* * If IGNORE the ignore constraint violations on primary and unique keys */ if (!m_use_write && m_ignore_dup_key) { int peek_res= peek_indexed_rows(record); if (!peek_res) { m_dupkey= table_share->primary_key; DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); } if (peek_res != HA_ERR_KEY_NOT_FOUND) Loading @@ -2328,7 +2476,6 @@ int ha_ndbcluster::write_row(byte *record) statistic_increment(thd->status_var.ha_write_count, &LOCK_status); if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) table->timestamp_field->set_time(); has_auto_increment= (table->next_number_field && record == table->record[0]); if (!(op= trans->getNdbOperation((const NDBTAB *) m_table))) ERR_RETURN(trans->getNdbError()); Loading Loading @@ -2369,17 +2516,6 @@ int ha_ndbcluster::write_row(byte *record) { int res; if (has_auto_increment) { THD *thd= table->in_use; m_skip_auto_increment= FALSE; update_auto_increment(); /* Ensure that handler is always called for auto_increment values */ thd->next_insert_id= 0; m_skip_auto_increment= !auto_increment_column_changed; } if ((res= set_primary_key_from_record(op, record))) return res; } Loading Loading @@ -3465,7 +3601,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation) break; case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/ DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY")); if (current_thd->lex->sql_command == SQLCOM_REPLACE) if (current_thd->lex->sql_command == SQLCOM_REPLACE && !m_has_unique_index) { DBUG_PRINT("info", ("Turning ON use of write instead of insert")); m_use_write= TRUE; Loading Loading @@ -5139,6 +5275,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg): m_sorted(FALSE), m_use_write(FALSE), m_ignore_dup_key(FALSE), m_has_unique_index(FALSE), m_primary_key_update(FALSE), m_ignore_no_key(FALSE), m_rows_to_insert((ha_rows) 1), Loading
sql/ha_ndbcluster.h +8 −1 Original line number Diff line number Diff line Loading @@ -738,7 +738,11 @@ static void set_tabname(const char *pathname, char *tabname); part_id_range *part_spec); int full_table_scan(byte * buf); int peek_row(const byte *record); bool check_all_operations_for_error(NdbTransaction *trans, const NdbOperation *first, const NdbOperation *last, uint errcode); int peek_indexed_rows(const byte *record); int unique_index_read(const byte *key, uint key_len, byte *buf); int fetch_next(NdbScanOperation* op); Loading Loading @@ -766,6 +770,8 @@ static void set_tabname(const char *pathname, char *tabname); int get_ndb_blobs_value(NdbBlob *last_ndb_blob); int set_primary_key(NdbOperation *op, const byte *key); int set_primary_key_from_record(NdbOperation *op, const byte *record); int set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno); int set_bounds(NdbIndexScanOperation*, uint inx, bool rir, const key_range *keys[2], uint= 0); int key_cmp(uint keynr, const byte * old_row, const byte * new_row); Loading Loading @@ -832,6 +838,7 @@ static void set_tabname(const char *pathname, char *tabname); bool m_sorted; bool m_use_write; bool m_ignore_dup_key; bool m_has_unique_index; bool m_primary_key_update; bool m_write_op; bool m_ignore_no_key; Loading