Commit 6f83ed91 authored by unknown's avatar unknown
Browse files

WL #2747: Fix such that backup and restore works for user defined

partitioned tables in NDB


include/my_sys.h:
  Move packfrm and unpackfrm to mysys
mysql-test/r/ndb_restore.result:
  New test cases
mysql-test/t/ndb_restore.test:
  New test cases
mysys/my_compress.c:
  Moved packfrm and unpackfrm to mysys
sql/ha_ndbcluster.cc:
  Set value of partition function in hidden field for user defined
  partitioning in NDB to handle restore and later on-line reorganize
  of partitions
  To save space value of those functions are limited to 32 bits
sql/ha_partition.cc:
  Use new get_partition_id interface
sql/handler.h:
  Use new get_partition_id interface
sql/mysql_priv.h:
  Moved to mysys
sql/mysqld.cc:
  Minor
sql/opt_range.cc:
  New get_partition_id interface
sql/sql_partition.cc:
  New get_partition_id interface
  Fix error checks of specification of engines in ALTER TABLE
  Moved packfrm and unpackfrm to mysys
sql/sql_table.cc:
  Fixed debug printouts
storage/ndb/include/kernel/ndb_limits.h:
  New constant
storage/ndb/include/kernel/signaldata/DictTabInfo.hpp:
  New table description item
storage/ndb/include/ndb_version.h.in:
  New version specific constant
storage/ndb/include/ndbapi/NdbDictionary.hpp:
  New item in table descriptions
storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp:
  New item in table descriptions
storage/ndb/src/kernel/blocks/backup/Backup.cpp:
  Write fragment id in backup's log entry
storage/ndb/src/kernel/blocks/backup/BackupFormat.hpp:
  Write fragment id in backup's log entry
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp:
  New item in table description
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp:
  New item in table description
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp:
  Moved constant
storage/ndb/src/ndbapi/NdbDictionary.cpp:
  New item in table description
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  New item in table description
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  New item in table description
storage/ndb/tools/Makefile.am:
  Compress library needed for ndb_restore
storage/ndb/tools/restore/Restore.cpp:
  Handle fragment id and also handle backups from older versions
storage/ndb/tools/restore/Restore.hpp:
  Use fragment id
storage/ndb/tools/restore/consumer.hpp:
  Use fragment id
storage/ndb/tools/restore/consumer_printer.cpp:
  Use fragment id
storage/ndb/tools/restore/consumer_printer.hpp:
  Use fragment id
storage/ndb/tools/restore/consumer_restore.cpp:
  Code to map node groups if new cluster has different set of
  node groups from original cluster
  Very simple search and replace parser of partition syntax in frm file
  Fix settings of partition id properly using fragment id and hidden
  field in tables
storage/ndb/tools/restore/consumer_restore.hpp:
  Changed function headers and new one for mapping node groups
storage/ndb/tools/restore/consumer_restorem.cpp:
  Use fragment id
storage/ndb/tools/restore/restore_main.cpp:
  New parameter to set node group map, parser for this parameter
parent 19bbb7cc
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -806,6 +806,9 @@ extern void print_defaults(const char *conf_file, const char **groups);
extern my_bool my_compress(byte *, ulong *, ulong *);
extern my_bool my_uncompress(byte *, ulong *, ulong *);
extern byte *my_compress_alloc(const byte *packet, ulong *len, ulong *complen);
extern int packfrm(const void *, uint, const void **, uint *);
extern int unpackfrm(const void **, uint *, const void *);

extern ha_checksum my_checksum(ha_checksum crc, const byte *mem, uint count);
extern uint my_bit_log2(ulong value);
extern uint my_count_bits(ulonglong v);
+218 −1
Original line number Diff line number Diff line
@@ -225,6 +225,223 @@ from (select * from t9 union
select * from t9_c) a;
count(*)
3
ALTER TABLE t1_c
PARTITION BY RANGE (`capgoaledatta`)
(PARTITION p0 VALUES LESS THAN MAXVALUE);
ALTER TABLE t2_c
PARTITION BY LIST(`capgotod`)
(PARTITION p0 VALUES IN (0,1,2,3,4,5,6));
ALTER TABLE t3_c
PARTITION BY HASH (`CapGoaledatta`);
ALTER TABLE t5_c
PARTITION BY HASH (`capfa`)
PARTITIONS 4;
ALTER TABLE t6_c
PARTITION BY LINEAR HASH (`relatta`)
PARTITIONS 4;
ALTER TABLE t7_c
PARTITION BY LINEAR KEY (`dardtestard`);
drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
select count(*) from t1;
count(*)
5
select count(*) from t1_c;
count(*)
5
select count(*)
from (select * from t1 union 
select * from t1_c) a;
count(*)
5
select count(*) from t2;
count(*)
6
select count(*) from t2_c;
count(*)
6
select count(*)
from (select * from t2 union 
select * from t2_c) a;
count(*)
6
select count(*) from t3;
count(*)
4
select count(*) from t3_c;
count(*)
4
select count(*)
from (select * from t3 union 
select * from t3_c) a;
count(*)
4
select count(*) from t4;
count(*)
22
select count(*) from t4_c;
count(*)
22
select count(*)
from (select * from t4 union 
select * from t4_c) a;
count(*)
22
select count(*) from t5;
count(*)
3
select count(*) from t5_c;
count(*)
3
select count(*)
from (select * from t5 union 
select * from t5_c) a;
count(*)
3
select count(*) from t6;
count(*)
8
select count(*) from t6_c;
count(*)
8
select count(*)
from (select * from t6 union 
select * from t6_c) a;
count(*)
8
select count(*) from t7;
count(*)
5
select count(*) from t7_c;
count(*)
5
select count(*)
from (select * from t7 union 
select * from t7_c) a;
count(*)
5
select count(*) from t8;
count(*)
3
select count(*) from t8_c;
count(*)
3
select count(*)
from (select * from t8 union 
select * from t8_c) a;
count(*)
3
select count(*) from t9;
count(*)
3
select count(*) from t9_c;
count(*)
3
select count(*)
from (select * from t9 union 
select * from t9_c) a;
count(*)
3
drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
select count(*) from t1;
count(*)
5
select count(*) from t1_c;
count(*)
5
select count(*)
from (select * from t1 union 
select * from t1_c) a;
count(*)
5
select count(*) from t2;
count(*)
6
select count(*) from t2_c;
count(*)
6
select count(*)
from (select * from t2 union 
select * from t2_c) a;
count(*)
6
select count(*) from t3;
count(*)
4
select count(*) from t3_c;
count(*)
4
select count(*)
from (select * from t3 union 
select * from t3_c) a;
count(*)
4
select count(*) from t4;
count(*)
22
select count(*) from t4_c;
count(*)
22
select count(*)
from (select * from t4 union 
select * from t4_c) a;
count(*)
22
select count(*) from t5;
count(*)
3
select count(*) from t5_c;
count(*)
3
select count(*)
from (select * from t5 union 
select * from t5_c) a;
count(*)
3
select count(*) from t6;
count(*)
8
select count(*) from t6_c;
count(*)
8
select count(*)
from (select * from t6 union 
select * from t6_c) a;
count(*)
8
select count(*) from t7;
count(*)
5
select count(*) from t7_c;
count(*)
5
select count(*)
from (select * from t7 union 
select * from t7_c) a;
count(*)
5
select count(*) from t8;
count(*)
3
select count(*) from t8_c;
count(*)
3
select count(*)
from (select * from t8 union 
select * from t8_c) a;
count(*)
3
select count(*) from t9;
count(*)
3
select count(*) from t9_c;
count(*)
3
select count(*)
from (select * from t9 union 
select * from t9_c) a;
count(*)
3
drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
drop table if exists t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
520093696,1
520093696,2
+146 −0
Original line number Diff line number Diff line
@@ -205,6 +205,152 @@ select count(*)
  from (select * from t9 union 
        select * from t9_c) a;

#
# Try Partitioned tables as well
#
ALTER TABLE t1_c
PARTITION BY RANGE (`capgoaledatta`)
(PARTITION p0 VALUES LESS THAN MAXVALUE);

ALTER TABLE t2_c
PARTITION BY LIST(`capgotod`)
(PARTITION p0 VALUES IN (0,1,2,3,4,5,6));

ALTER TABLE t3_c
PARTITION BY HASH (`CapGoaledatta`);

ALTER TABLE t5_c
PARTITION BY HASH (`capfa`)
PARTITIONS 4;

ALTER TABLE t6_c
PARTITION BY LINEAR HASH (`relatta`)
PARTITIONS 4;

ALTER TABLE t7_c
PARTITION BY LINEAR KEY (`dardtestard`);

--exec $NDB_MGM --no-defaults -e "start backup" >> $NDB_TOOLS_OUTPUT
drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 2 -n 1 -m -r --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-2 >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 2 -n 2 -r --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-2 >> $NDB_TOOLS_OUTPUT

select count(*) from t1;
select count(*) from t1_c;
select count(*)
  from (select * from t1 union 
        select * from t1_c) a;

select count(*) from t2;
select count(*) from t2_c;
select count(*)
  from (select * from t2 union 
        select * from t2_c) a;

select count(*) from t3;
select count(*) from t3_c;
select count(*)
  from (select * from t3 union 
        select * from t3_c) a;

select count(*) from t4;
select count(*) from t4_c;
select count(*)
  from (select * from t4 union 
        select * from t4_c) a;

select count(*) from t5;
select count(*) from t5_c;
select count(*)
  from (select * from t5 union 
        select * from t5_c) a;

select count(*) from t6;
select count(*) from t6_c;
select count(*)
  from (select * from t6 union 
        select * from t6_c) a;

select count(*) from t7;
select count(*) from t7_c;
select count(*)
  from (select * from t7 union 
        select * from t7_c) a;

select count(*) from t8;
select count(*) from t8_c;
select count(*)
  from (select * from t8 union 
        select * from t8_c) a;

select count(*) from t9;
select count(*) from t9_c;
select count(*)
  from (select * from t9 union 
        select * from t9_c) a;

drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 2 -n 1 -m -r --ndb-nodegroup_map '(0,0)' --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-2 >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 2 -n 2 -r --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-2 >> $NDB_TOOLS_OUTPUT

select count(*) from t1;
select count(*) from t1_c;
select count(*)
  from (select * from t1 union 
        select * from t1_c) a;

select count(*) from t2;
select count(*) from t2_c;
select count(*)
  from (select * from t2 union 
        select * from t2_c) a;

select count(*) from t3;
select count(*) from t3_c;
select count(*)
  from (select * from t3 union 
        select * from t3_c) a;

select count(*) from t4;
select count(*) from t4_c;
select count(*)
  from (select * from t4 union 
        select * from t4_c) a;

select count(*) from t5;
select count(*) from t5_c;
select count(*)
  from (select * from t5 union 
        select * from t5_c) a;

select count(*) from t6;
select count(*) from t6_c;
select count(*)
  from (select * from t6 union 
        select * from t6_c) a;

select count(*) from t7;
select count(*) from t7_c;
select count(*)
  from (select * from t7 union 
        select * from t7_c) a;

select count(*) from t8;
select count(*) from t8_c;
select count(*)
  from (select * from t8 union 
        select * from t8_c) a;

select count(*) from t9;
select count(*) from t9_c;
select count(*)
  from (select * from t9 union 
        select * from t9_c) a;

drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c;
--error 134
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 2 -n 1 -m -r --ndb-nodegroup_map '(0,1)' --print --print_meta $NDB_BACKUP_DIR/BACKUP/BACKUP-2 >> $NDB_TOOLS_OUTPUT

#
# Cleanup
#
+128 −0
Original line number Diff line number Diff line
@@ -95,4 +95,132 @@ my_bool my_uncompress (byte *packet, ulong *len, ulong *complen)
  }
  DBUG_RETURN(0);
}

/*
  Internal representation of the frm blob
*/

struct frm_blob_header
{
  uint ver;      /* Version of header                         */
  uint orglen;   /* Original length of compressed data        */
  uint complen;  /* Compressed length of data, 0=uncompressed */
};

struct frm_blob_struct
{
  struct frm_blob_header head;
  char data[1];
};

/*
  packfrm is a method used to compress the frm file for storage in a
  handler. This method was developed for the NDB handler and has been moved
  here to serve also other uses.

  SYNOPSIS
    packfrm()
    data                    Data reference to frm file data
    len                     Length of frm file data
    out:pack_data           Reference to the pointer to the packed frm data
    out:pack_len            Length of packed frm file data

  RETURN VALUES
    0                       Success
    >0                      Failure
*/

int packfrm(const void *data, uint len,
            const void **pack_data, uint *pack_len)
{
  int error;
  ulong org_len, comp_len;
  uint blob_len;
  struct frm_blob_struct *blob;
  DBUG_ENTER("packfrm");
  DBUG_PRINT("enter", ("data: %x, len: %d", data, len));

  error= 1;
  org_len= len;
  if (my_compress((byte*)data, &org_len, &comp_len))
    goto err;

  DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
  DBUG_DUMP("compressed", (char*)data, org_len);

  error= 2;
  blob_len= sizeof(struct frm_blob_header)+org_len;
  if (!(blob= (struct frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
    goto err;

  // Store compressed blob in machine independent format
  int4store((char*)(&blob->head.ver), 1);
  int4store((char*)(&blob->head.orglen), comp_len);
  int4store((char*)(&blob->head.complen), org_len);

  // Copy frm data into blob, already in machine independent format
  memcpy(blob->data, data, org_len);

  *pack_data= blob;
  *pack_len= blob_len;
  error= 0;

  DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
err:
  DBUG_RETURN(error);

}

/*
  unpackfrm is a method used to decompress the frm file received from a
  handler. This method was developed for the NDB handler and has been moved
  here to serve also other uses for other clustered storage engines.

  SYNOPSIS
    unpackfrm()
    pack_data               Data reference to packed frm file data
    out:unpack_data         Reference to the pointer to the unpacked frm data
    out:unpack_len          Length of unpacked frm file data

  RETURN VALUES¨
    0                       Success
    >0                      Failure
*/

int unpackfrm(const void **unpack_data, uint *unpack_len,
              const void *pack_data)
{
   const struct frm_blob_struct *blob= (struct frm_blob_struct*)pack_data;
   byte *data;
   ulong complen, orglen, ver;
   DBUG_ENTER("unpackfrm");
   DBUG_PRINT("enter", ("pack_data: %x", pack_data));

   complen=     uint4korr((char*)&blob->head.complen);
   orglen=      uint4korr((char*)&blob->head.orglen);
   ver=         uint4korr((char*)&blob->head.ver);

   DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
                     ver,complen,orglen));
   DBUG_DUMP("blob->data", (char*) blob->data, complen);

   if (ver != 1)
     DBUG_RETURN(1);
   if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
     DBUG_RETURN(2);
   memcpy(data, blob->data, complen);


   if (my_uncompress(data, &complen, &orglen))
   {
     my_free((char*)data, MYF(0));
     DBUG_RETURN(3);
   }

   *unpack_data= data;
   *unpack_len= complen;

   DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
   DBUG_RETURN(0);
}
#endif /* HAVE_COMPRESS */
+75 −19
Original line number Diff line number Diff line
@@ -1688,7 +1688,9 @@ int ha_ndbcluster::peek_row(const byte *record)
  {
    uint32 part_id;
    int error;
    if ((error= m_part_info->get_partition_id(m_part_info, &part_id)))
    longlong func_value;
    if ((error= m_part_info->get_partition_id(m_part_info, &part_id,
                                              &func_value)))
    {
      DBUG_RETURN(error);
    }
@@ -2146,10 +2148,10 @@ int ha_ndbcluster::write_row(byte *record)
  NdbOperation *op;
  int res;
  THD *thd= current_thd;
  m_write_op= TRUE;

  DBUG_ENTER("write_row");
  longlong func_value= 0;
  DBUG_ENTER("ha_ndbcluster::write_row");

  m_write_op= TRUE;
  if (!m_use_write && m_ignore_dup_key && table_share->primary_key != MAX_KEY)
  {
    int peek_res= peek_row(record);
@@ -2179,7 +2181,8 @@ int ha_ndbcluster::write_row(byte *record)
  {
    uint32 part_id;
    int error;
    if ((error= m_part_info->get_partition_id(m_part_info, &part_id)))
    if ((error= m_part_info->get_partition_id(m_part_info, &part_id,
                                              &func_value)))
    {
      DBUG_RETURN(error);
    }
@@ -2235,6 +2238,22 @@ int ha_ndbcluster::write_row(byte *record)
    }
  }

  if (m_use_partition_function)
  {
    /*
      We need to set the value of the partition function value in
      NDB since the NDB kernel doesn't have easy access to the function
      to calculate the value.
    */
    if (func_value >= INT_MAX32)
      func_value= INT_MAX32;
    uint32 part_func_value= (uint32)func_value;
    uint no_fields= table_share->fields;
    if (table_share->primary_key == MAX_KEY)
      no_fields++;
    op->setValue(no_fields, part_func_value);
  }

  m_rows_changed++;

  /*
@@ -2346,6 +2365,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
  uint i;
  uint32 old_part_id= 0, new_part_id= 0;
  int error;
  longlong func_value;
  DBUG_ENTER("update_row");
  m_write_op= TRUE;
  
@@ -2358,7 +2378,8 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)

  if (m_use_partition_function &&
      (error= get_parts_for_update(old_data, new_data, table->record[0],
                                   m_part_info, &old_part_id, &new_part_id)))
                                   m_part_info, &old_part_id, &new_part_id,
                                   &func_value)))
  {
    DBUG_RETURN(error);
  }
@@ -2474,6 +2495,16 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
      ERR_RETURN(op->getNdbError());
  }

  if (m_use_partition_function)
  {
    if (func_value >= INT_MAX32)
      func_value= INT_MAX32;
    uint32 part_func_value= (uint32)func_value;
    uint no_fields= table_share->fields;
    if (table_share->primary_key == MAX_KEY)
      no_fields++;
    op->setValue(no_fields, part_func_value);
  }
  // Execute update operation
  if (!cursor && execute_no_commit(this,trans) != 0) {
    no_uncommitted_rows_execute_failure();
@@ -8871,12 +8902,17 @@ int ha_ndbcluster::set_range_data(void *tab_ref, partition_info *part_info)
  for (i= 0; i < part_info->no_parts; i++)
  {
    longlong range_val= part_info->range_int_array[i];
    if (range_val < INT_MIN32 || range_val > INT_MAX32)
    if (range_val < INT_MIN32 || range_val >= INT_MAX32)
    {
      if ((i != part_info->no_parts - 1) ||
          (range_val != LONGLONG_MAX))
      {
        my_error(ER_LIMITED_PART_RANGE, MYF(0), "NDB");
        error= 1;
        goto error;
      }
      range_val= INT_MAX32;
    }
    range_data[i]= (int32)range_val;
  }
  tab->setRangeListData(range_data, sizeof(int32)*part_info->no_parts);
@@ -8966,7 +9002,25 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
      col->setPartitionKey(TRUE);
    }
  }
  else if (part_info->part_type == RANGE_PARTITION)
  else 
  {
    /*
      Create a shadow field for those tables that have user defined
      partitioning. This field stores the value of the partition
      function such that NDB can handle reorganisations of the data
      even when the MySQL Server isn't available to assist with
      calculation of the partition function value.
    */
    NDBCOL col;
    DBUG_PRINT("info", ("Generating partition func value field"));
    col.setName("$PART_FUNC_VALUE");
    col.setType(NdbDictionary::Column::Int);
    col.setLength(1);
    col.setNullable(FALSE);
    col.setPrimaryKey(FALSE);
    col.setAutoIncrement(FALSE);
    tab->addColumn(col);
    if (part_info->part_type == RANGE_PARTITION)
    {
      if ((error= set_range_data((void*)tab, part_info)))
      {
@@ -8980,6 +9034,7 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
        DBUG_RETURN(error);
      }
    }
  }
  tab->setFragmentType(ftype);
  i= 0;
  tot_ts_name_len= 0;
@@ -9012,6 +9067,7 @@ uint ha_ndbcluster::set_up_partition_info(partition_info *part_info,
    first= FALSE;
  } while (++i < part_info->no_parts);
  tab->setDefaultNoPartitionsFlag(part_info->use_default_no_partitions);
  tab->setLinearFlag(part_info->linear_hash_ind);
  tab->setMaxRows(table->s->max_rows);
  tab->setTablespaceNames(ts_names, fd_index*sizeof(char*));
  tab->setFragmentCount(fd_index);
Loading