Commit 6e73098f authored by unknown's avatar unknown
Browse files

Fix for Bug#17431 INSERT IGNORE INTO returns failed: 1296: err 4350 'Transaction already aborted'

parent aa3411f5
Loading
Loading
Loading
Loading
+27 −1
Original line number Diff line number Diff line
@@ -577,6 +577,25 @@ pk1 b c
2	2	17
4	4	3
6	6	3
DELETE FROM t1;
CREATE UNIQUE INDEX bi ON t1(b);
INSERT INTO t1 VALUES 
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
INSERT INTO t1 VALUES(0,1,0),(21,21,21) ON DUPLICATE KEY UPDATE pk1=b+10,b=b+10;
select * from t1 order by pk1;
pk1	b	c
2	2	2
3	3	3
4	4	4
5	5	5
6	6	6
7	7	7
8	8	8
9	9	9
10	10	10
11	11	1
21	21	21
DROP TABLE t1;
CREATE TABLE t1(a INT) ENGINE=ndb;
INSERT IGNORE INTO t1 VALUES (1);
@@ -586,7 +605,7 @@ INSERT IGNORE INTO t1 SELECT a FROM t1;
INSERT IGNORE INTO t1 SELECT a FROM t1;
INSERT IGNORE INTO t1 VALUES (1);
INSERT IGNORE INTO t1 VALUES (1);
SELECT * FROM t1;
SELECT * FROM t1 ORDER BY a;
a
1
1
@@ -606,4 +625,11 @@ a
1
1
1
DELETE FROM t1;
CREATE UNIQUE INDEX ai ON t1(a);
INSERT IGNORE INTO t1 VALUES (1);
INSERT IGNORE INTO t1 VALUES (1);
SELECT * FROM t1 ORDER BY a;
a
1
DROP TABLE t1;
+12 −0
Original line number Diff line number Diff line
@@ -19,3 +19,15 @@ gesuchnr benutzer_id
2	1
3	2
drop table t1;
CREATE TABLE t1(i INT PRIMARY KEY AUTO_INCREMENT, 
j INT, 
k INT, 
UNIQUE INDEX(j)
) ENGINE = ndb;
INSERT  INTO t1 VALUES (1,1,23),(2,2,24);
REPLACE INTO t1 (j,k) VALUES (1,42);
REPLACE INTO t1 (i,j) VALUES (17,2);
SELECT * from t1 ORDER BY i;
i	j	k
3	1	42
17	2	24
+14 −9
Original line number Diff line number Diff line
@@ -591,14 +591,14 @@ DELETE FROM t1 WHERE pk1 = 2 OR pk1 = 4 OR pk1 = 6;
INSERT INTO t1 VALUES(1,1,1),(2,2,17),(3,4,5) ON DUPLICATE KEY UPDATE pk1=b;
select * from t1 where pk1 = b and b != c order by pk1;

# The following test case currently does not work
#DELETE FROM t1;
#CREATE UNIQUE INDEX bi ON t1(b);
#INSERT INTO t1 VALUES 
#(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
#(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
#INSERT INTO t1 VALUES(0,1,0),(21,21,21) ON DUPLICATE KEY UPDATE pk1=b+10,c=b+10;
#select * from t1 order by pk1;
# Test handling of duplicate unique
DELETE FROM t1;
CREATE UNIQUE INDEX bi ON t1(b);
INSERT INTO t1 VALUES 
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
INSERT INTO t1 VALUES(0,1,0),(21,21,21) ON DUPLICATE KEY UPDATE pk1=b+10,b=b+10;
select * from t1 order by pk1;

DROP TABLE t1;

@@ -614,7 +614,12 @@ INSERT IGNORE INTO t1 SELECT a FROM t1;
INSERT IGNORE INTO t1 SELECT a FROM t1;
INSERT IGNORE INTO t1 VALUES (1);
INSERT IGNORE INTO t1 VALUES (1);
SELECT * FROM t1;
SELECT * FROM t1 ORDER BY a;
DELETE FROM t1;
CREATE UNIQUE INDEX ai ON t1(a);
INSERT IGNORE INTO t1 VALUES (1);
INSERT IGNORE INTO t1 VALUES (1);
SELECT * FROM t1 ORDER BY a;
DROP TABLE t1;

# End of 4.1 tests
+11 −0
Original line number Diff line number Diff line
@@ -27,4 +27,15 @@ replace into t1 (gesuchnr,benutzer_id) values (1,1);
select * from t1 order by gesuchnr;
drop table t1;

# bug#17431
CREATE TABLE t1(i INT PRIMARY KEY AUTO_INCREMENT, 
                j INT, 
                k INT, 
                UNIQUE INDEX(j)
               ) ENGINE = ndb;
INSERT  INTO t1 VALUES (1,1,23),(2,2,24);
REPLACE INTO t1 (j,k) VALUES (1,42);
REPLACE INTO t1 (i,j) VALUES (17,2);
SELECT * from t1 ORDER BY i;

# End of 4.1 tests
+163 −28
Original line number Diff line number Diff line
@@ -1031,6 +1031,7 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
  NDBDICT *dict= ndb->getDictionary();
  DBUG_ENTER("ha_ndbcluster::build_index_list");
  
  m_has_unique_index= FALSE;
  // Save information about all known indexes
  for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
  {
@@ -1039,6 +1040,7 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
    m_index[i].type= idx_type;
    if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
    {
      m_has_unique_index= TRUE;
      strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
      DBUG_PRINT("info", ("Created unique index name \'%s\' for index %d",
                          unique_index_name, i));
@@ -1290,6 +1292,24 @@ int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *rec
  DBUG_RETURN(0);
}

int ha_ndbcluster::set_index_key_from_record(NdbOperation *op, const byte *record, uint keyno)
{
  KEY* key_info= table->key_info + keyno;
  KEY_PART_INFO* key_part= key_info->key_part;
  KEY_PART_INFO* end= key_part+key_info->key_parts;
  uint i;
  DBUG_ENTER("set_index_key_from_record");
                                                                                
  for (i= 0; key_part != end; key_part++, i++)
  {
    Field* field= key_part->field;
    if (set_ndb_key(op, field, m_index[keyno].unique_index_attrid_map[i],
                    record+key_part->offset))
      ERR_RETURN(m_active_trans->getNdbError());
  }
  DBUG_RETURN(0);
}

int 
ha_ndbcluster::set_index_key(NdbOperation *op, 
                             const KEY *key_info, 
@@ -1443,7 +1463,6 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
        ERR_RETURN(trans->getNdbError());
    }
  }
  
  if (execute_no_commit(this,trans) != 0) 
  {
    table->status= STATUS_NOT_FOUND;
@@ -1471,30 +1490,137 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
}

/*
  Peek to check if a particular row already exists
 * Check that all operations between first and last all
 * have gotten the errcode
 * If checking for HA_ERR_KEY_NOT_FOUND then update m_dupkey
 * for all succeeding operations
 */
bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans,
                                                   const NdbOperation *first,
                                                   const NdbOperation *last,
                                                   uint errcode)
{
  const NdbOperation *op= first;
  DBUG_ENTER("ha_ndbcluster::check_all_operations_for_error");

int ha_ndbcluster::peek_row(const byte *record)
  while(op)
  {
    NdbError err= op->getNdbError();
    if (err.status != NdbError::Success)
    {
      if (ndb_to_mysql_error(&err) != (int) errcode)
        DBUG_RETURN(false);
      if (op == last) break;
      op= trans->getNextCompletedOperation(op);
    }
    else
    {
      // We found a duplicate
      if (op->getType() == NdbOperation::UniqueIndexAccess)
      {
        if (errcode == HA_ERR_KEY_NOT_FOUND)
        {
          NdbIndexOperation *iop= (NdbIndexOperation *) op;
          const NDBINDEX *index= iop->getIndex();
          // Find the key_no of the index
          for(uint i= 0; i<table->s->keys; i++)
          {
            if (m_index[i].unique_index == index)
            {
              m_dupkey= i;
              break;
            }
          }
        }
      }
      else
      {
        // Must have been primary key access
        DBUG_ASSERT(op->getType() == NdbOperation::PrimaryKeyAccess);
        if (errcode == HA_ERR_KEY_NOT_FOUND)
          m_dupkey= table->s->primary_key;
      }
      DBUG_RETURN(false);      
    }
  }
  DBUG_RETURN(true);
}

/*
 * Peek to check if any rows already exist with conflicting
 * primary key or unique index values
*/

int ha_ndbcluster::peek_indexed_rows(const byte *record)
{
  NdbTransaction *trans= m_active_trans;
  NdbOperation *op;
  DBUG_ENTER("peek_row");
  const NdbOperation *first, *last;
  uint i;
  int res;
  DBUG_ENTER("peek_indexed_rows");

  NdbOperation::LockMode lm=
    (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  first= NULL;
  if (table->s->primary_key != MAX_KEY)
  {
    /*
     * Fetch any row with colliding primary key
     */
    if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
        op->readTuple(lm) != 0)
      ERR_RETURN(trans->getNdbError());
    
  int res;
    first= op;
    if ((res= set_primary_key_from_record(op, record)))
      ERR_RETURN(trans->getNdbError());
  }
  /*
   * Fetch any rows with colliding unique indexes
   */
  KEY* key_info;
  KEY_PART_INFO *key_part, *end;
  for (i= 0, key_info= table->key_info; i < table->s->keys; i++, key_info++)
  {
    if (i != table->s->primary_key &&
        key_info->flags & HA_NOSAME)
    {
      // A unique index is defined on table
      NdbIndexOperation *iop;
      NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
      key_part= key_info->key_part;
      end= key_part + key_info->key_parts;
      if (!(iop= trans->getNdbIndexOperation(unique_index,
                                             (const NDBTAB *) m_table)) ||
          iop->readTuple(lm) != 0)
        ERR_RETURN(trans->getNdbError());

  if (execute_no_commit_ie(this,trans) != 0)
      if (!first)
        first= iop;
      if ((res= set_index_key_from_record(iop, record, i)))
        ERR_RETURN(trans->getNdbError());
    }
  }
  last= trans->getLastDefinedOperation();
  if (first)
    res= execute_no_commit_ie(this,trans);
  else
  {
    // Table has no keys
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
  }
  if (check_all_operations_for_error(trans, first, last, 
                                     HA_ERR_KEY_NOT_FOUND))
  {
    table->status= STATUS_NOT_FOUND;
    DBUG_RETURN(ndb_err(trans));
  } 
  else
  {
    DBUG_PRINT("info", ("m_dupkey %d", m_dupkey));
  }
  DBUG_RETURN(0);
}

@@ -1930,13 +2056,33 @@ int ha_ndbcluster::write_row(byte *record)

  DBUG_ENTER("write_row");

  if (m_ignore_dup_key && table->s->primary_key != MAX_KEY)
  has_auto_increment= (table->next_number_field && record == table->record[0]);
  if (table->s->primary_key != MAX_KEY)
  {
    /*
     * Increase any auto_incremented primary key
     */
    if (has_auto_increment) 
    {
    int peek_res= peek_row(record);
      THD *thd= table->in_use;
      
      m_skip_auto_increment= FALSE;
      update_auto_increment();
      /* Ensure that handler is always called for auto_increment values */
      thd->next_insert_id= 0;
      m_skip_auto_increment= !auto_increment_column_changed;
    }
  }
  
  /*
   * If IGNORE the ignore constraint violations on primary and unique keys
   */
  if (m_ignore_dup_key)
  {
    int peek_res= peek_indexed_rows(record);
    
    if (!peek_res) 
    {
      m_dupkey= table->s->primary_key;
      DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
    }
    if (peek_res != HA_ERR_KEY_NOT_FOUND)
@@ -1946,7 +2092,6 @@ int ha_ndbcluster::write_row(byte *record)
  statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
  has_auto_increment= (table->next_number_field && record == table->record[0]);

  if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
    ERR_RETURN(trans->getNdbError());
@@ -1975,17 +2120,6 @@ int ha_ndbcluster::write_row(byte *record)
  {
    int res;

    if (has_auto_increment) 
    {
      THD *thd= table->in_use;

      m_skip_auto_increment= FALSE;
      update_auto_increment();
      /* Ensure that handler is always called for auto_increment values */
      thd->next_insert_id= 0;
      m_skip_auto_increment= !auto_increment_column_changed;
    }

    if ((res= set_primary_key_from_record(op, record)))
      return res;  
  }
@@ -2996,7 +3130,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
    break;
  case HA_EXTRA_IGNORE_DUP_KEY:       /* Dup keys don't rollback everything*/
    DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
    if (current_thd->lex->sql_command == SQLCOM_REPLACE)
    if (current_thd->lex->sql_command == SQLCOM_REPLACE && !m_has_unique_index)
    {
      DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
      m_use_write= TRUE;
@@ -4260,6 +4394,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
  m_share(0),
  m_use_write(FALSE),
  m_ignore_dup_key(FALSE),
  m_has_unique_index(FALSE),
  m_primary_key_update(FALSE),
  m_retrieve_all_fields(FALSE),
  m_retrieve_primary_key(FALSE),
Loading