Commit 836912d1 authored by unknown's avatar unknown
Browse files

WL 2826: Error handling for ALTER TABLE for partitioning

Step 14: First version of table log for add/Drop partition


sql/ha_partition.cc:
  Moved create partition name code to sql_partition.cc
sql/handler.h:
  Added entries in partition_info to keep track of table log entries
sql/mysql_priv.h:
  Moved create partition name code to sql_partition.cc
sql/sql_partition.cc:
  Moved create partition name code to sql_partition.cc
  First version of table log for add/drop partition
sql/sql_table.cc:
  Add IO_SIZE to table log header
parent e5200bc1
Loading
Loading
Loading
Loading
+0 −82
Original line number Diff line number Diff line
@@ -391,88 +391,6 @@ int ha_partition::ha_initialise()
/****************************************************************************
                MODULE meta data changes
****************************************************************************/
/*
  Create partition names

  SYNOPSIS
    create_partition_name()
    out:out                   Created partition name string
    in1                       First part
    in2                       Second part
    name_variant              Normal, temporary or renamed partition name

  RETURN VALUE
    NONE

  DESCRIPTION
    This method is used to calculate the partition name, service routine to
    the del_ren_cre_table method.
*/

#define NORMAL_PART_NAME 0
#define TEMP_PART_NAME 1
#define RENAMED_PART_NAME 2
static void create_partition_name(char *out, const char *in1,
                                  const char *in2, uint name_variant,
                                  bool translate)
{
  char transl_part_name[FN_REFLEN];
  const char *transl_part;

  if (translate)
  {
    tablename_to_filename(in2, transl_part_name, FN_REFLEN);
    transl_part= transl_part_name;
  }
  else
    transl_part= in2;
  if (name_variant == NORMAL_PART_NAME)
    strxmov(out, in1, "#P#", transl_part, NullS);
  else if (name_variant == TEMP_PART_NAME)
    strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
  else if (name_variant == RENAMED_PART_NAME)
    strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
}

/*
  Create subpartition name

  SYNOPSIS
    create_subpartition_name()
    out:out                   Created partition name string
    in1                       First part
    in2                       Second part
    in3                       Third part
    name_variant              Normal, temporary or renamed partition name

  RETURN VALUE
    NONE

  DESCRIPTION
  This method is used to calculate the subpartition name, service routine to
  the del_ren_cre_table method.
*/

static void create_subpartition_name(char *out, const char *in1,
                                     const char *in2, const char *in3,
                                     uint name_variant)
{
  char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];

  tablename_to_filename(in2, transl_part_name, FN_REFLEN);
  tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
  if (name_variant == NORMAL_PART_NAME)
    strxmov(out, in1, "#P#", transl_part_name,
            "#SP#", transl_subpart_name, NullS);
  else if (name_variant == TEMP_PART_NAME)
    strxmov(out, in1, "#P#", transl_part_name,
            "#SP#", transl_subpart_name, "#TMP#", NullS);
  else if (name_variant == RENAMED_PART_NAME)
    strxmov(out, in1, "#P#", transl_part_name,
            "#SP#", transl_subpart_name, "#REN#", NullS);
}


/*
  Delete a table

+5 −1
Original line number Diff line number Diff line
@@ -799,6 +799,8 @@ typedef int (*get_partitions_in_range_iter)(partition_info *part_info,
                                            PARTITION_ITERATOR *part_iter);


struct TABLE_LOG_MEMORY_ENTRY;

class partition_info : public Sql_alloc
{
public:
@@ -846,6 +848,8 @@ class partition_info : public Sql_alloc

  Item *item_free_list;

  TABLE_LOG_MEMORY_ENTRY *first_log_entry;
  TABLE_LOG_MEMORY_ENTRY *exec_log_entry;
  /* 
    A bitmap of partitions used by the current query. 
    Usage pattern:
+10 −0
Original line number Diff line number Diff line
@@ -1135,6 +1135,16 @@ uint prep_alter_part_table(THD *thd, TABLE *table, ALTER_INFO *alter_info,
bool remove_table_from_cache(THD *thd, const char *db, const char *table,
                             uint flags);

#define NORMAL_PART_NAME 0
#define TEMP_PART_NAME 1
#define RENAMED_PART_NAME 2
void create_partition_name(char *out, const char *in1,
                           const char *in2, uint name_variant,
                           bool translate);
void create_subpartition_name(char *out, const char *in1,
                              const char *in2, const char *in3,
                              uint name_variant);

typedef struct st_lock_param_type
{
  ulonglong copied;
+296 −2
Original line number Diff line number Diff line
@@ -5063,6 +5063,50 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
}


/*
  Insert log entry into list
  SYNOPSIS
    insert_part_info_log_entry_list()
    log_entry
  RETURN VALUES
    NONE
*/

static
void
insert_part_info_log_entry_list(partition_info *part_info,
                                TABLE_LOG_MEMORY_ENTRY *log_entry)
{
  log_entry->next_active_log_entry= part_info->first_log_entry;
  part_info->first_log_entry= log_entry;
}


/*
  Release all log entries for this partition info struct
  SYNOPSIS
    release_part_info_log_entries()
    first_log_entry                 First log entry in list to release
  RETURN VALUES
    NONE
*/

static
void
release_part_info_log_entries(TABLE_LOG_MEMORY_ENTRY *log_entry)
{
  DBUG_ENTER("release_part_info_log_entries");

  while (log_entry)
  {
    release_table_log_memory_entry(log_entry);
    log_entry= log_entry->next_log_entry;
  }
  part_info->first_log_entry= NULL;
  DBUG_VOID_RETURN;
}


/*
  Write the log entry to ensure that the shadow frm file is removed at
  crash.
@@ -5082,11 +5126,111 @@ static bool mysql_drop_partitions(ALTER_PARTITION_PARAM_TYPE *lpt)
bool
write_log_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt, bool install_frm)
{
  TABLE_LOG_ENTRY table_log_entry;
  partition_info *part_info= lpt->part_info;
  TABLE_LOG_MEMORY_ENTRY *log_entry;
  char shadow_path[FN_LEN];
  DBUG_ENTER("write_log_shadow_frm");

  lock_global_table_log();
  do
  {
    build_table_filename(shadow_path, sizeof(shadow_path), lpt->db,
                         lpt->table_name, "#");
    table_log_entry.action_type= 'd';
    table_log_entry.next_entry= 0;
    table_log_entry.handler_type= "frm";
    table_log_entry.name= shadow_path;
    
    if (write_table_log_entry(&table_log_entry, &log_entry))
      break;
    insert_part_info_log_entry_list(part_info, log_entry);
    if (write_execute_table_log_entry(log_entry->entry_pos, &log_entry))
      break;
    part_info->exec_log_entry= log_entry;
    unlock_global_table_log();
    DBUG_RETURN(FALSE);
  } while (TRUE);
  release_part_info_log_entries(part_info->first_log_entry);
  unlock_global_table_log();
  DBUG_RETURN(TRUE);
}


/*
  Log dropped partitions
  SYNOPSIS
    write_log_dropped_partitions()
    lpt                      Struct containing parameters
  RETURN VALUES
    TRUE                     Error
    FALSE                    Success
*/

static
bool
write_log_dropped_partitions(ALTER_PARTITION_PARAM_TYPE *lpt,
                             uint *next_entry,
                             const char *path)
{
  TABLE_LOG_ENTRY table_log_entry;
  partition_info *part_info= lpt->part_info;
  TABLE_LOG_MEMORY_ENTRY *log_entry;
  char tmp_path[FN_LEN];
  List_iterator<partition_element> part_it(part_info->partitions);
  uint no_elements= part_info->partitions.elements;
  DBUG_ENTER("write_log_dropped_partitions");

  table_log_entry.action_type= 'd';
  do
  {
    partition_element part_elem= part_it++;
    if (part_elem->part_state == PART_TO_BE_DROPPED ||
        part_elem->part_state == PART_TO_BE_ADDED)
    {
      if (is_sub_partitioned(part_info))
      {
        List_iterator<partition_element> sub_it(part_elem->subpartitions);
        uint no_subparts= part_info->no_subparts;
        do
        {
          partition_element *sub_elem= sub_it++;
          table_log_entry.next_entry= *next_entry;
          table_log_entry.handler_type=
                 ha_resolve_storage_engine_name(sub_elem->engine_type);
          create_subpartition_name(tmp_path, path,
                                   part_elem->partition_name,
                                   sub_elem->partition_name,
                                   NORMAL_PART_NAME);
          table_log_entry.name= tmp_path;
          if (write_table_log_entry(&table_log_entry, &log_entry))
          {
            DBUG_RETURN(TRUE);
          }
          *next_entry= log_entry->entry_pos;
          insert_part_info_log_entry_list(part_info, log_entry);
        } while (++i < no_subparts);
      }
      else
      {
        table_log_entry.next_entry= *next_entry;
        table_log_entry.handler_type=
               ha_resolve_storage_engine_name(part_elem->engine_type);
        create_partition_name(tmp_path, path,
                              part_elem->partition_name,
                              NORMAL_PART_NAME, TRUE);
        table_log_entry.name= tmp_path;
        if (write_table_log_entry(&table_log_entry, &log_entry))
        {
          DBUG_RETURN(TRUE);
        }
        *next_entry= log_entry->entry_pos;
        insert_part_info_log_entry_list(part_info, log_entry);
      }
    }
  } while (++i < no_elements);
  DBUG_RETURN(FALSE);
error:
}


@@ -5108,11 +5252,47 @@ write_log_shadow_frm(ALTER_PARTITION_PARAM_TYPE *lpt, bool install_frm)
bool
write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
  TABLE_LOG_ENTRY table_log_entry;
  partition_info *part_info= lpt->part_info;
  TABLE_LOG_MEMORY_ENTRY *log_entry;
  char tmp_path[FN_LEN];
  char path[FN_LEN];
  uint next_entry= 0;
  TABLE_LOG_MEMORY_ENTRY *old_log_entry= part_info->first_log_entry;
  DBUG_ENTER("write_log_drop_partition");

  part_info->first_log_entry= NULL;
  build_table_filename(path, sizeof(path), lpt->db,
                       lpt->table_name, "");
  lock_global_table_log();
  do
  {
    if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path))
      break;
    /*
      At first we write an entry that installs the new frm file
    */
    build_table_filename(tmp_path, sizeof(tmp_path), lpt->db,
                         lpt->table_name, "#");
    table_log_entry.action_type= 'r';
    table_log_entry.next_entry= next_entry;
    table_log_entry.handler_type= "frm";
    table_log_entry.name= path;
    table_log_entry.from_name= tmp_path;
    if (write_table_log_entry(&table_log_entry, &log_entry))
      break;
    insert_part_info_log_entry_list(part_info, log_entry);
    log_entry= part_info->exec_log_entry;
    if (write_execute_table_log_entry(log_entry->entry_pos, NULL))
      break;
    release_part_info_log_entries(old_first_log_entry);
    unlock_global_table_log();
    DBUG_RETURN(FALSE); 
  } while (TRUE);
  release_part_info_log_entries(part_info->first_log_entry);
  part_info->first_log_entry= old_log_entry;
  unlock_global_table_log();
  DBUG_RETURN(TRUE);
}


@@ -5129,16 +5309,48 @@ write_log_drop_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
  DESCRIPTION
    Prepare entries to the table log indicating all partitions to drop and to
    remove the shadow frm file.
    The removal of the shadow frm file is already in the log file so we only
    need to link the new entries to the existing and carefully ensure that
    the new linked list has first the dropped partitions and then the
    drop of the shadow frm file.
    We always inject entries backwards in the list in the table log since we
    don't know the entry position until we have written it.
*/

bool
write_log_add_partition(ALTER_PARTITION_PARAM_TYPE *lpt)
{
  DBUG_ENTER("write_log_add_partition");
  TABLE_LOG_ENTRY table_log_entry;
  partition_info *part_info= lpt->part_info;
  TABLE_LOG_MEMORY_ENTRY *log_entry;
  char tmp_path[FN_LEN];
  char path[FN_LEN];
  TABLE_LOG_MEMORY_ENTRY *old_log_entry= part_info->first_log_entry;
  uint next_entry= old_log_entry->entry_pos;
  /* Ensure we linked the existing entries at the back */
  DBUG_ENTER("write_log_add_partition");

  part_info->first_log_entry= NULL;
  build_table_filename(path, sizeof(path), lpt->db,
                       lpt->table_name, "");
  lock_global_table_log();
  do
  {
    if (write_log_dropped_partitions(lpt, &next_entry, (const char*)path))
      break;
    log_entry= part_info->first_log_entry;
    /* Ensure first entry is the last dropped partition */
    if (write_execute_table_log_entry(log_entry->entry_pos, NULL))
      break;
    release_part_info_log_entries(old_first_log_entry);
    unlock_global_table_log();
    DBUG_RETURN(FALSE); 
  } while (TRUE);
  release_part_info_log_entries(part_info->first_log_entry);
  part_info->first_log_entry= old_log_entry;
  unlock_global_table_log();
  DBUG_RETURN(TRUE);
}


@@ -6182,5 +6394,87 @@ static uint32 get_next_subpartition_via_walking(PARTITION_ITERATOR *part_iter)
  part_iter->field_vals.start++;
  return part_iter->part_info->get_subpartition_id(part_iter->part_info);
}


/*
  Create partition names

  SYNOPSIS
    create_partition_name()
    out:out                   Created partition name string
    in1                       First part
    in2                       Second part
    name_variant              Normal, temporary or renamed partition name

  RETURN VALUE
    NONE

  DESCRIPTION
    This method is used to calculate the partition name, service routine to
    the del_ren_cre_table method.
*/

void
create_partition_name(char *out, const char *in1,
                      const char *in2, uint name_variant,
                      bool translate)
{
  char transl_part_name[FN_REFLEN];
  const char *transl_part;

  if (translate)
  {
    tablename_to_filename(in2, transl_part_name, FN_REFLEN);
    transl_part= transl_part_name;
  }
  else
    transl_part= in2;
  if (name_variant == NORMAL_PART_NAME)
    strxmov(out, in1, "#P#", transl_part, NullS);
  else if (name_variant == TEMP_PART_NAME)
    strxmov(out, in1, "#P#", transl_part, "#TMP#", NullS);
  else if (name_variant == RENAMED_PART_NAME)
    strxmov(out, in1, "#P#", transl_part, "#REN#", NullS);
}


/*
  Create subpartition name

  SYNOPSIS
    create_subpartition_name()
    out:out                   Created partition name string
    in1                       First part
    in2                       Second part
    in3                       Third part
    name_variant              Normal, temporary or renamed partition name

  RETURN VALUE
    NONE

  DESCRIPTION
  This method is used to calculate the subpartition name, service routine to
  the del_ren_cre_table method.
*/

void
create_subpartition_name(char *out, const char *in1,
                         const char *in2, const char *in3,
                         uint name_variant)
{
  char transl_part_name[FN_REFLEN], transl_subpart_name[FN_REFLEN];

  tablename_to_filename(in2, transl_part_name, FN_REFLEN);
  tablename_to_filename(in3, transl_subpart_name, FN_REFLEN);
  if (name_variant == NORMAL_PART_NAME)
    strxmov(out, in1, "#P#", transl_part_name,
            "#SP#", transl_subpart_name, NullS);
  else if (name_variant == TEMP_PART_NAME)
    strxmov(out, in1, "#P#", transl_part_name,
            "#SP#", transl_subpart_name, "#TMP#", NullS);
  else if (name_variant == RENAMED_PART_NAME)
    strxmov(out, in1, "#P#", transl_part_name,
            "#SP#", transl_subpart_name, "#REN#", NullS);
}
#endif
+10 −1
Original line number Diff line number Diff line
@@ -282,6 +282,7 @@ typedef struct st_global_table_log
  File file_id;
  uint name_len;
  uint handler_type_len;
  uint io_size;
} GLOBAL_TABLE_LOG;

GLOBAL_TABLE_LOG global_table_log;
@@ -361,6 +362,8 @@ write_table_log_header()
  int2store(&global_table_log.file_entry[4], const_var);
  const_var= 32;
  int2store(&global_table_log.file_entry[6], const_var);
  const_var= IO_SIZE;
  int4store(&global_table_log.file_entry[8], const_var);
  if (write_table_log_file_entry(0UL))
    error= TRUE;
  DBUG_RETURN(error);
@@ -384,9 +387,10 @@ read_table_log_file_entry(uint entry_no)
  bool error= FALSE;
  File file_id= global_table_log.file_id;
  char *file_entry= (char*)global_table_log.file_entry;
  uint io_size= global_table_log.io_size;
  DBUG_ENTER("read_table_log_file_entry");

  if (my_pread(file_id, file_entry, IO_SIZE, IO_SIZE * entry_no, MYF(0)))
  if (my_pread(file_id, file_entry, io_size, io_size * entry_no, MYF(0)))
    error= TRUE;
  DBUG_RETURN(error);
}
@@ -429,6 +433,7 @@ read_table_log_header()
  char *file_entry= (char*)global_table_log.file_entry;
  char file_name[FN_REFLEN];
  uint entry_no;
  bool successful_open= FALSE;
  DBUG_ENTER("read_table_log_header");

  bzero(file_entry, sizeof(global_table_log.file_entry));
@@ -439,10 +444,14 @@ read_table_log_header()
    {
      /* Write message into error log */
    }
    else
      successful_open= TRUE;
  }
  entry_no= uint4korr(&file_entry[0]);
  global_table_log.name_len= uint2korr(&file_entry[4]);
  global_table_log.handler_type_len= uint2korr(&file_entry[6]);
  if (successful_open) 
    global_table_log.io_size= uint4korr(&file_entry[8]);
  global_table_log.first_free= NULL;
  global_table_log.first_used= NULL;
  global_table_log.no_entries= 0;