Commit 9caa9f9c authored by unknown's avatar unknown
Browse files

Fix for bug#6398 update of primary key fails

parent 78c4faa2
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -40,6 +40,11 @@ SELECT * FROM t1 ORDER BY pk1;
pk1	attr1	attr2	attr3
3	1	NULL	9412
9412	9413	17	9413
UPDATE t1 SET pk1=4 WHERE pk1 = 3;
SELECT * FROM t1 ORDER BY pk1;
pk1	attr1	attr2	attr3
4	1	NULL	9412
9412	9413	17	9413
DELETE FROM t1;
SELECT * FROM t1;
pk1	attr1	attr2	attr3
+2 −0
Original line number Diff line number Diff line
@@ -36,6 +36,8 @@ UPDATE t1 SET pk1=2 WHERE attr1=1;
SELECT * FROM t1 ORDER BY pk1;
UPDATE t1 SET pk1=pk1 + 1;
SELECT * FROM t1 ORDER BY pk1;
UPDATE t1 SET pk1=4 WHERE pk1 = 3;
SELECT * FROM t1 ORDER BY pk1;

# Delete the record
DELETE FROM t1;
+80 −75
Original line number Diff line number Diff line
@@ -324,7 +324,7 @@ int ha_ndbcluster::ndb_err(NdbConnection *trans)
  DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d", 
		      err.code, res));
  if (res == HA_ERR_FOUND_DUPP_KEY)
    dupkey= table->primary_key;
    m_dupkey= table->primary_key;
  
  DBUG_RETURN(res);
}
@@ -551,7 +551,7 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
          blob_size+= 8 - blob_size % 8;
        if (loop == 1)
        {
          char *buf= blobs_buffer + offset;
          char *buf= m_blobs_buffer + offset;
          uint32 len= 0xffffffff;  // Max uint32
          DBUG_PRINT("value", ("read blob ptr=%x len=%u",
                               (uint)buf, (uint)blob_len));
@@ -563,15 +563,15 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
        offset+= blob_size;
      }
    }
    if (loop == 0 && offset > blobs_buffer_size)
    if (loop == 0 && offset > m_blobs_buffer_size)
    {
      my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
      blobs_buffer_size= 0;
      my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
      m_blobs_buffer_size= 0;
      DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
      blobs_buffer= my_malloc(offset, MYF(MY_WME));
      if (blobs_buffer == NULL)
      m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
      if (m_blobs_buffer == NULL)
        DBUG_RETURN(-1);
      blobs_buffer_size= offset;
      m_blobs_buffer_size= offset;
    }
  }
  DBUG_RETURN(0);
@@ -854,7 +854,7 @@ int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
{
  if (type >= TL_WRITE_ALLOW_WRITE)
    return NdbOperation::LM_Exclusive;
  else if (uses_blob_value(retrieve_all_fields))
  else if (uses_blob_value(m_retrieve_all_fields))
    return NdbOperation::LM_Read;
  else
    return NdbOperation::LM_CommittedRead;
@@ -1018,7 +1018,7 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
  {
    Field *field= table->field[i];
    if ((thd->query_id == field->query_id) ||
	retrieve_all_fields)
	m_retrieve_all_fields)
    {
      if (get_ndb_value(op, field, i, buf))
	ERR_RETURN(trans->getNdbError());
@@ -1055,7 +1055,7 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
  THD *thd= current_thd;
  DBUG_ENTER("complemented_pk_read");

  if (retrieve_all_fields)
  if (m_retrieve_all_fields)
    // We have allready retrieved all fields, nothing to complement
    DBUG_RETURN(0);

@@ -1192,12 +1192,12 @@ inline int ha_ndbcluster::next_result(byte *buf)
    /*
      We can only handle one tuple with blobs at a time.
    */
    if (ops_pending && blobs_pending)
    if (m_ops_pending && m_blobs_pending)
    {
      if (execute_no_commit(this,trans) != 0)
	DBUG_RETURN(ndb_err(trans));
      ops_pending= 0;
      blobs_pending= FALSE;
      m_ops_pending= 0;
      m_blobs_pending= FALSE;
    }
    check= cursor->nextResult(contact_ndb);
    if (check == 0)
@@ -1219,8 +1219,8 @@ inline int ha_ndbcluster::next_result(byte *buf)
	all pending update or delete operations should 
	be sent to NDB
      */
      DBUG_PRINT("info", ("ops_pending: %d", ops_pending));    
      if (ops_pending)
      DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
      if (m_ops_pending)
      {
	if (current_thd->transaction.on)
	{
@@ -1234,7 +1234,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
	  int res= trans->restart();
	  DBUG_ASSERT(res == 0);
	}
	ops_pending= 0;
	m_ops_pending= 0;
      }
      
      contact_ndb= (check == 2);
@@ -1423,7 +1423,7 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
    Field *field= table->field[i];
    if ((thd->query_id == field->query_id) ||
	(field->flags & PRI_KEY_FLAG) || 
	retrieve_all_fields)
	m_retrieve_all_fields)
    {      
      if (get_ndb_value(op, field, i, buf))
	ERR_RETURN(op->getNdbError());
@@ -1666,9 +1666,9 @@ int ha_ndbcluster::write_row(byte *record)

    if (has_auto_increment) 
    {
      skip_auto_increment= FALSE;
      m_skip_auto_increment= FALSE;
      update_auto_increment();
      skip_auto_increment= !auto_increment_column_changed;
      m_skip_auto_increment= !auto_increment_column_changed;
    }

    if ((res= set_primary_key(op)))
@@ -1683,7 +1683,7 @@ int ha_ndbcluster::write_row(byte *record)
    if (!(field->flags & PRI_KEY_FLAG) &&
	set_ndb_value(op, field, i, &set_blob_value))
    {
      skip_auto_increment= TRUE;
      m_skip_auto_increment= TRUE;
      ERR_RETURN(op->getNdbError());
    }
  }
@@ -1695,25 +1695,25 @@ int ha_ndbcluster::write_row(byte *record)
    to NoCommit the transaction between each row.
    Find out how this is detected!
  */
  rows_inserted++;
  m_rows_inserted++;
  no_uncommitted_rows_update(1);
  bulk_insert_not_flushed= TRUE;
  if ((rows_to_insert == 1) || 
      ((rows_inserted % bulk_insert_rows) == 0) ||
  m_bulk_insert_not_flushed= TRUE;
  if ((m_rows_to_insert == 1) || 
      ((m_rows_inserted % m_bulk_insert_rows) == 0) ||
      set_blob_value)
  {
    THD *thd= current_thd;
    // Send rows to NDB
    DBUG_PRINT("info", ("Sending inserts to NDB, "\
			"rows_inserted:%d, bulk_insert_rows: %d", 
			(int)rows_inserted, (int)bulk_insert_rows));
			(int)m_rows_inserted, (int)m_bulk_insert_rows));

    bulk_insert_not_flushed= FALSE;
    m_bulk_insert_not_flushed= FALSE;
    if (thd->transaction.on)
    {
      if (execute_no_commit(this,trans) != 0)
      {
	skip_auto_increment= TRUE;
	m_skip_auto_increment= TRUE;
	no_uncommitted_rows_execute_failure();
	DBUG_RETURN(ndb_err(trans));
      }
@@ -1722,7 +1722,7 @@ int ha_ndbcluster::write_row(byte *record)
    {
      if (execute_commit(this,trans) != 0)
      {
	skip_auto_increment= TRUE;
	m_skip_auto_increment= TRUE;
	no_uncommitted_rows_execute_failure();
	DBUG_RETURN(ndb_err(trans));
      }
@@ -1730,7 +1730,7 @@ int ha_ndbcluster::write_row(byte *record)
      DBUG_ASSERT(res == 0);
    }
  }
  if ((has_auto_increment) && (skip_auto_increment))
  if ((has_auto_increment) && (m_skip_auto_increment))
  {
    Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
    DBUG_PRINT("info", 
@@ -1740,7 +1740,7 @@ int ha_ndbcluster::write_row(byte *record)
      DBUG_PRINT("info", 
		 ("Setting next auto increment value to %u", next_val));  
  }
  skip_auto_increment= TRUE;
  m_skip_auto_increment= TRUE;

  DBUG_RETURN(0);
}
@@ -1820,7 +1820,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
    }
    // Delete old row
    DBUG_PRINT("info", ("insert succeded"));
    m_primary_key_update= TRUE;
    delete_res= delete_row(old_data);
    m_primary_key_update= FALSE;
    if (delete_res)
    {
      DBUG_PRINT("info", ("delete failed"));
@@ -1843,9 +1845,9 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
    DBUG_PRINT("info", ("Calling updateTuple on cursor"));
    if (!(op= cursor->updateTuple()))
      ERR_RETURN(trans->getNdbError());
    ops_pending++;
    m_ops_pending++;
    if (uses_blob_value(FALSE))
      blobs_pending= TRUE;
      m_blobs_pending= TRUE;
  }
  else
  {  
@@ -1921,7 +1923,7 @@ int ha_ndbcluster::delete_row(const byte *record)
    DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
    if (cursor->deleteTuple() != 0)
      ERR_RETURN(trans->getNdbError());     
    ops_pending++;
    m_ops_pending++;

    no_uncommitted_rows_update(-1);

@@ -1951,7 +1953,9 @@ int ha_ndbcluster::delete_row(const byte *record)
    else 
    {
      int res;
      if ((res= set_primary_key(op)))
      if ((res= (m_primary_key_update ?
		 set_primary_key_from_old_data(op, record)
		 : set_primary_key(op))))
	  return res;  
    }
  }
@@ -2411,18 +2415,18 @@ int ha_ndbcluster::close_scan()
    DBUG_RETURN(1);

  
  if (ops_pending)
  if (m_ops_pending)
  {
    /*
      Take over any pending transactions to the 
      deleteing/updating transaction before closing the scan    
    */
    DBUG_PRINT("info", ("ops_pending: %d", ops_pending));    
    DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
    if (execute_no_commit(this,trans) != 0) {
      no_uncommitted_rows_execute_failure();
      DBUG_RETURN(ndb_err(trans));
    }
    ops_pending= 0;
    m_ops_pending= 0;
  }
  
  cursor->close();
@@ -2555,7 +2559,7 @@ void ha_ndbcluster::info(uint flag)
  if (flag & HA_STATUS_ERRKEY)
  {
    DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
    errkey= dupkey;
    errkey= m_dupkey;
  }
  if (flag & HA_STATUS_AUTO)
    DBUG_PRINT("info", ("HA_STATUS_AUTO"));
@@ -2663,7 +2667,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
					 where field->query_id is the same as
					 the current query id */
    DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
    retrieve_all_fields= TRUE;
    m_retrieve_all_fields= TRUE;
    break;
  case HA_EXTRA_PREPARE_FOR_DELETE:
    DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
@@ -2707,8 +2711,8 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
  DBUG_ENTER("start_bulk_insert");
  DBUG_PRINT("enter", ("rows: %d", (int)rows));
  
  rows_inserted= 0;
  rows_to_insert= rows; 
  m_rows_inserted= 0;
  m_rows_to_insert= rows; 

  /* 
    Calculate how many rows that should be inserted
@@ -2722,7 +2726,7 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
  batch= bytesperbatch/bytes;
  batch= batch == 0 ? 1 : batch;
  DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
  bulk_insert_rows= batch;
  m_bulk_insert_rows= batch;

  DBUG_VOID_RETURN;
}
@@ -2736,22 +2740,22 @@ int ha_ndbcluster::end_bulk_insert()

  DBUG_ENTER("end_bulk_insert");
  // Check if last inserts need to be flushed
  if (bulk_insert_not_flushed)
  if (m_bulk_insert_not_flushed)
  {
    NdbConnection *trans= m_active_trans;
    // Send rows to NDB
    DBUG_PRINT("info", ("Sending inserts to NDB, "\
                        "rows_inserted:%d, bulk_insert_rows: %d", 
                        rows_inserted, bulk_insert_rows)); 
    bulk_insert_not_flushed= FALSE;
                        m_rows_inserted, m_bulk_insert_rows)); 
    m_bulk_insert_not_flushed= FALSE;
    if (execute_no_commit(this,trans) != 0) {
      no_uncommitted_rows_execute_failure();
      my_errno= error= ndb_err(trans);
    }
  }

  rows_inserted= 0;
  rows_to_insert= 1;
  m_rows_inserted= 0;
  m_rows_to_insert= 1;
  DBUG_RETURN(error);
}

@@ -2937,8 +2941,8 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
      (NdbConnection*)thd->transaction.stmt.ndb_tid;
    DBUG_ASSERT(m_active_trans);
    // Start of transaction
    retrieve_all_fields= FALSE;
    ops_pending= 0;    
    m_retrieve_all_fields= FALSE;
    m_ops_pending= 0;    
    {
      NDBDICT *dict= m_ndb->getDictionary();
      const NDBTAB *tab;
@@ -2986,13 +2990,13 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
      DBUG_PRINT("warning", ("m_active_cursor != NULL"));
    m_active_cursor= NULL;

    if (blobs_pending)
    if (m_blobs_pending)
      DBUG_PRINT("warning", ("blobs_pending != 0"));
    blobs_pending= 0;
    m_blobs_pending= 0;
    
    if (ops_pending)
    if (m_ops_pending)
      DBUG_PRINT("warning", ("ops_pending != 0L"));
    ops_pending= 0;
    m_ops_pending= 0;
  }
  DBUG_RETURN(error);
}
@@ -3029,8 +3033,8 @@ int ha_ndbcluster::start_stmt(THD *thd)
  m_active_trans= trans;

  // Start of statement
  retrieve_all_fields= FALSE;
  ops_pending= 0;    
  m_retrieve_all_fields= FALSE;
  m_ops_pending= 0;    
  
  DBUG_RETURN(error);
}
@@ -3571,13 +3575,13 @@ longlong ha_ndbcluster::get_auto_increment()
  DBUG_ENTER("get_auto_increment");
  DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
  int cache_size= 
    (rows_to_insert - rows_inserted < autoincrement_prefetch) ?
    rows_to_insert - rows_inserted 
    : (rows_to_insert > autoincrement_prefetch) ? 
    rows_to_insert 
    (m_rows_to_insert - m_rows_inserted < autoincrement_prefetch) ?
    m_rows_to_insert - m_rows_inserted 
    : (m_rows_to_insert > autoincrement_prefetch) ? 
    m_rows_to_insert 
    : autoincrement_prefetch;
  Uint64 auto_value= 
    (skip_auto_increment) ? 
    (m_skip_auto_increment) ? 
    m_ndb->readAutoIncrementValue((const NDBTAB *) m_table)
    : m_ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size);
  DBUG_RETURN((longlong)auto_value);
@@ -3602,17 +3606,18 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
  m_share(0),
  m_use_write(FALSE),
  m_ignore_dup_key_not_supported(FALSE),
  retrieve_all_fields(FALSE),
  rows_to_insert(1),
  rows_inserted(0),
  bulk_insert_rows(1024),
  bulk_insert_not_flushed(FALSE),
  ops_pending(0),
  skip_auto_increment(TRUE),
  blobs_pending(0),
  blobs_buffer(0),
  blobs_buffer_size(0),
  dupkey((uint) -1)
  m_primary_key_update(FALSE),
  m_retrieve_all_fields(FALSE),
  m_rows_to_insert(1),
  m_rows_inserted(0),
  m_bulk_insert_rows(1024),
  m_bulk_insert_not_flushed(FALSE),
  m_ops_pending(0),
  m_skip_auto_increment(TRUE),
  m_blobs_pending(0),
  m_blobs_buffer(0),
  m_blobs_buffer_size(0),
  m_dupkey((uint) -1)
{ 
  int i;
  
@@ -3646,8 +3651,8 @@ ha_ndbcluster::~ha_ndbcluster()
  if (m_share)
    free_share(m_share);
  release_metadata();
  my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
  blobs_buffer= 0;
  my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
  m_blobs_buffer= 0;

  // Check for open cursor/transaction
  if (m_active_cursor) {
+12 −11
Original line number Diff line number Diff line
@@ -243,18 +243,19 @@ class ha_ndbcluster: public handler
  NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
  bool m_use_write;
  bool m_ignore_dup_key_not_supported;
  bool retrieve_all_fields;
  ha_rows rows_to_insert;
  ha_rows rows_inserted;
  ha_rows bulk_insert_rows;
  bool bulk_insert_not_flushed;
  ha_rows ops_pending;
  bool skip_auto_increment;
  bool blobs_pending;
  bool m_primary_key_update;
  bool m_retrieve_all_fields;
  ha_rows m_rows_to_insert;
  ha_rows m_rows_inserted;
  ha_rows m_bulk_insert_rows;
  bool m_bulk_insert_not_flushed;
  ha_rows m_ops_pending;
  bool m_skip_auto_increment;
  bool m_blobs_pending;
  // memory for blobs in one tuple
  char *blobs_buffer;
  uint32 blobs_buffer_size;
  uint dupkey;
  char *m_blobs_buffer;
  uint32 m_blobs_buffer_size;
  uint m_dupkey;

  void set_rec_per_key();
  void records_update();