Commit 60d827b6 authored by unknown's avatar unknown
Browse files

Merge baker@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  zim.(none):/home/brian/mysql/archive-5.1

parents 63a33172 6d8f89b8
Loading
Loading
Loading
Loading
+9 −1
Original line number Diff line number Diff line
@@ -253,7 +253,15 @@ class Field
      ptr-=row_offset;
      return tmp;
    }

  inline longlong val_int(char *new_ptr)
  {
    char *old_ptr= ptr;
    longlong return_value;
    ptr= new_ptr;
    return_value= val_int();
    ptr= old_ptr;
    return return_value;
  }
  inline String *val_str(String *str, char *new_ptr)
  {
    char *old_ptr= ptr;
+239 −25
Original line number Diff line number Diff line
@@ -104,6 +104,7 @@
  rows - This is an unsigned long long which is the number of rows in the data
         file.
  check point - Reserved for future use
  auto increment - MAX value for autoincrement
  dirty - Status of the file, whether or not its values are the latest. This
          flag is what causes a repair to occur

@@ -125,9 +126,11 @@ static HASH archive_open_tables;
#define ARN ".ARN"               // Files used during an optimize call
#define ARM ".ARM"               // Meta file
/*
  uchar + uchar + ulonglong + ulonglong + uchar
  uchar + uchar + ulonglong + ulonglong + ulonglong + uchar
*/
#define META_BUFFER_SIZE 19      // Size of the data used in the meta file
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
  + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)

/*
  uchar + uchar
*/
@@ -300,9 +303,11 @@ int ha_archive::write_data_header(azio_stream *file_to_write)
  This method reads the header of a meta file and returns whether or not it was successful.
  *rows will contain the current number of rows in the data file upon success.
*/
int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
int ha_archive::read_meta_file(File meta_file, ha_rows *rows, 
                               ulonglong *auto_increment)
{
  uchar meta_buffer[META_BUFFER_SIZE];
  uchar *ptr= meta_buffer;
  ulonglong check_point;

  DBUG_ENTER("ha_archive::read_meta_file");
@@ -314,17 +319,24 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
  /*
    Parse out the meta data, we ignore version at the moment
  */
  *rows= (ha_rows)uint8korr(meta_buffer + 2);
  check_point= uint8korr(meta_buffer + 10);

  ptr+= sizeof(uchar)*2; // Move past header
  *rows= (ha_rows)uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past rows
  check_point= uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past check_point
  *auto_increment= uint8korr(ptr);
  ptr+= sizeof(ulonglong); // Move past auto_increment

  DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0]));
  DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1]));
  DBUG_PRINT("ha_archive::read_meta_file", ("Rows %lld", *rows));
  DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %lld", check_point));
  DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)meta_buffer[18]));
  DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows));
  DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point));
  DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment));
  DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));

  if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) || 
      ((bool)meta_buffer[18] == TRUE))
      ((bool)(*ptr)== TRUE))
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);

  my_sync(meta_file, MYF(MY_WME));
@@ -337,22 +349,34 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
  By setting dirty you say whether or not the file represents the actual state of the data file.
  Upon ::open() we set to dirty, and upon ::close() we set to clean.
*/
int ha_archive::write_meta_file(File meta_file, ha_rows rows, bool dirty)
int ha_archive::write_meta_file(File meta_file, ha_rows rows, 
                                ulonglong auto_increment, bool dirty)
{
  uchar meta_buffer[META_BUFFER_SIZE];
  uchar *ptr= meta_buffer;
  ulonglong check_point= 0; //Reserved for the future

  DBUG_ENTER("ha_archive::write_meta_file");

  meta_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER;
  meta_buffer[1]= (uchar)ARCHIVE_VERSION;
  int8store(meta_buffer + 2, (ulonglong)rows); 
  int8store(meta_buffer + 10, check_point); 
  *(meta_buffer + 18)= (uchar)dirty;
  DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", (uint)ARCHIVE_CHECK_HEADER));
  DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", (uint)ARCHIVE_VERSION));
  *ptr= (uchar)ARCHIVE_CHECK_HEADER;
  ptr += sizeof(uchar);
  *ptr= (uchar)ARCHIVE_VERSION;
  ptr += sizeof(uchar);
  int8store(ptr, (ulonglong)rows); 
  ptr += sizeof(ulonglong);
  int8store(ptr, check_point); 
  ptr += sizeof(ulonglong);
  int8store(ptr, auto_increment); 
  ptr += sizeof(ulonglong);
  *ptr= (uchar)dirty;
  DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", 
                                             (uint)ARCHIVE_CHECK_HEADER));
  DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", 
                                             (uint)ARCHIVE_VERSION));
  DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows));
  DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point));
  DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu",
                                             auto_increment));
  DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty));

  VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
@@ -414,17 +438,19 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
      opposite. If the meta file will not open we assume it is crashed and
      leave it up to the user to fix.
    */
    if (read_meta_file(share->meta_file, &share->rows_recorded))
    if (read_meta_file(share->meta_file, &share->rows_recorded, 
                       &share->auto_increment_value))
      share->crashed= TRUE;
    else
      (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);

      (void)write_meta_file(share->meta_file, share->rows_recorded,
                            share->auto_increment_value, TRUE);
    /* 
      It is expensive to open and close the data files and since you can't have
      a gzip file that can be both read and written we keep a writer open
      that is shared amoung all open tables.
    */
    if (!(azopen(&(share->archive_write), share->data_file_name, O_WRONLY|O_APPEND|O_BINARY)))
    if (!(azopen(&(share->archive_write), share->data_file_name, 
                 O_WRONLY|O_APPEND|O_BINARY)))
    {
      DBUG_PRINT("info", ("Could not open archive write file"));
      share->crashed= TRUE;
@@ -452,7 +478,8 @@ int ha_archive::free_share(ARCHIVE_SHARE *share)
    hash_delete(&archive_open_tables, (byte*) share);
    thr_lock_delete(&share->lock);
    VOID(pthread_mutex_destroy(&share->mutex));
    (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
    (void)write_meta_file(share->meta_file, share->rows_recorded, 
                          share->auto_increment_value, FALSE);
    if (azclose(&(share->archive_write)))
      rc= 1;
    if (my_close(share->meta_file, MYF(0)))
@@ -561,7 +588,27 @@ int ha_archive::create(const char *name, TABLE *table_arg,
    error= my_errno;
    goto error;
  }
  write_meta_file(create_file, 0, FALSE);

  for (uint key= 0; key < table_arg->s->keys; key++)
  {
    KEY *pos= table_arg->key_info+key;
    KEY_PART_INFO *key_part=     pos->key_part;
    KEY_PART_INFO *key_part_end= key_part + pos->key_parts;

    for (; key_part != key_part_end; key_part++)
    {
      Field *field= key_part->field;

    DBUG_PRINT("info", ("Looking at field  index%s", field->field_name));
      if (!(field->flags & AUTO_INCREMENT_FLAG))
      {
        error= -1;
        goto error;
      }
    }
  }

  write_meta_file(create_file, 0, 0, FALSE);
  my_close(create_file,MYF(0));

  /* 
@@ -614,7 +661,8 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
  DBUG_ENTER("ha_archive::real_write_row");

  written= azwrite(writer, buf, table->s->reclength);
  DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", written, table->s->reclength));
  DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", 
                                            written, table->s->reclength));
  if (!delayed_insert || !bulk_insert)
    share->dirty= TRUE;

@@ -655,6 +703,8 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
int ha_archive::write_row(byte *buf)
{
  int rc;
  byte *read_buf= NULL;
  ulonglong temp_auto;
  DBUG_ENTER("ha_archive::write_row");

  if (share->crashed)
@@ -664,13 +714,164 @@ int ha_archive::write_row(byte *buf)
  if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
    table->timestamp_field->set_time();
  pthread_mutex_lock(&share->mutex);

  if (table->next_number_field)
  {
    KEY *mkey= &table->s->key_info[0]; // We only support one key right now
    update_auto_increment();
    temp_auto= table->next_number_field->val_int();
    DBUG_PRINT("info", ("archive would see %d and %d", 
                        temp_auto, share->auto_increment_value));

    /*
      Bad news, this will cause a search for the unique value which is very 
      expensive since we will have to do a table scan which will lock up 
      all other writers during this period. This could perhaps be optimized 
      in the future.
    */
    if (temp_auto == share->auto_increment_value && 
        mkey->flags & HA_NOSAME)
    {
      rc= HA_ERR_FOUND_DUPP_KEY;
      goto error;
    }

    if (temp_auto < share->auto_increment_value && 
        mkey->flags & HA_NOSAME)
    {
      /* 
        First we create a buffer that we can use for reading rows, and can pass
        to get_row().
      */
      if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
      {
        rc= HA_ERR_OUT_OF_MEM;
        goto error;
      }
       /* 
         All of the buffer must be written out or we won't see all of the
         data 
       */
      azflush(&(share->archive_write), Z_SYNC_FLUSH);
      /*
        Set the position of the local read thread to the beginning postion.
      */
      if (read_data_header(&archive))
      {
        rc= HA_ERR_CRASHED_ON_USAGE;
        goto error;
      }

      /*
        Now we read and check all of the rows.
        if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
      */
      Field *mfield= table->next_number_field;

      while (!(get_row(&archive, read_buf)))
      {
        if ((longlong)temp_auto == 
            mfield->val_int((char*)(read_buf + mfield->offset())))
        {
          rc= HA_ERR_FOUND_DUPP_KEY;
          goto error;
        }
      }
    }
    else
    {
      auto_increment_value= share->auto_increment_value= temp_auto;
    }
  }

  /*
    Notice that the global auto_increment has been increased.
    In case of a failed row write, we will never try to reuse the value.
  */

  share->rows_recorded++;
  rc= real_write_row(buf, &(share->archive_write));
error:
  pthread_mutex_unlock(&share->mutex);
  if (read_buf)
    my_free(read_buf, MYF(0));

  DBUG_RETURN(rc);
}


ulonglong ha_archive::get_auto_increment()
{
  return auto_increment_value= ++share->auto_increment_value;
}

/* Initialized at each key walk (called multiple times unlike rnd_init()) */
int ha_archive::index_init(uint keynr, bool sorted)
{
  DBUG_ENTER("ha_archive::index_init");
  active_index= keynr;
  DBUG_RETURN(0);
}


/*
  No indexes, so if we get a request for an index search since we tell
  the optimizer that we have unique indexes, we scan
*/
int ha_archive::index_read(byte *buf, const byte *key,
                             uint key_len, enum ha_rkey_function find_flag)
{
  int rc;
  DBUG_ENTER("ha_archive::index_read");
  rc= index_read_idx(buf, active_index, key, key_len, find_flag);
  DBUG_RETURN(rc);
}


int ha_archive::index_read_idx(byte *buf, uint index, const byte *key,
                                 uint key_len, enum ha_rkey_function find_flag)
{
  int rc= 0;
  bool found= 0;
  KEY *mkey= &table->s->key_info[index];
  uint k_offset= mkey->key_part->offset;


  DBUG_ENTER("ha_archive::index_read_idx");

  /* 
    All of the buffer must be written out or we won't see all of the
    data 
  */
  pthread_mutex_lock(&share->mutex);
  azflush(&(share->archive_write), Z_SYNC_FLUSH);
  pthread_mutex_unlock(&share->mutex);

  /*
    Set the position of the local read thread to the beginning postion.
  */
  if (read_data_header(&archive))
  {
    rc= HA_ERR_CRASHED_ON_USAGE;
    goto error;
  }

  while (!(get_row(&archive, buf)))
  {
    if (!memcmp(key, buf+k_offset, key_len))
    {
      found= 1;
      break;
    }
  }

  if (found)
    DBUG_RETURN(0);

error:
  DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
}

/*
  All calls that need to scan the table start with this method. If we are told
  that it is a table scan we rewind the file to the beginning, otherwise
@@ -726,7 +927,8 @@ int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
  DBUG_ENTER("ha_archive::get_row");

  read= azread(file_to_read, buf, table->s->reclength);
  DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, table->s->reclength));
  DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, 
                                     table->s->reclength));

  if (read == Z_STREAM_ERROR)
    DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
@@ -912,9 +1114,18 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
    if (!rc)
    {
      share->rows_recorded= 0;
      auto_increment_value= share->auto_increment_value= 0;
      while (!(rc= get_row(&archive, buf)))
      {
        real_write_row(buf, &writer);
        if (table->found_next_number_field)
        {
          Field *field= table->found_next_number_field;
          if (share->auto_increment_value < 
              field->val_int((char*)(buf + field->offset())))
            auto_increment_value= share->auto_increment_value=
              field->val_int((char*)(buf + field->offset()));
        }
        share->rows_recorded++;
      }
    }
@@ -1028,6 +1239,9 @@ void ha_archive::info(uint flag)
  delete_length= 0;
  index_file_length=0;

  if (flag & HA_STATUS_AUTO)
    auto_increment_value= share->auto_increment_value;

  DBUG_VOID_RETURN;
}

+17 −5
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@
#pragma interface			/* gcc class implementation */
#endif

#include <values.h>
#include <zlib.h>
#include "../storage/archive/azlib.h"

@@ -38,13 +39,14 @@ typedef struct st_archive_share {
  bool dirty;               /* Flag for if a flush should occur */
  bool crashed;             /* Meta file is crashed */
  ha_rows rows_recorded;    /* Number of rows in tables */
  ulonglong auto_increment_value;
} ARCHIVE_SHARE;

/*
  Version for file format.
  1 - Initial Version
*/
#define ARCHIVE_VERSION 1
#define ARCHIVE_VERSION 2

class ha_archive: public handler
{
@@ -68,13 +70,22 @@ class ha_archive: public handler
  const char **bas_ext() const;
  ulong table_flags() const
  {
    return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | HA_NO_AUTO_INCREMENT |
    return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT |
            HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY);
  }
  ulong index_flags(uint idx, uint part, bool all_parts) const
  {
    return 0;
    return HA_ONLY_WHOLE_INDEX;
  }
  ulonglong get_auto_increment();
  uint max_supported_keys()          const { return 1; }
  uint max_supported_key_length()    const { return sizeof(ulonglong); }
  uint max_supported_key_part_length() const { return sizeof(ulonglong); }
  int index_init(uint keynr, bool sorted);
  virtual int index_read(byte * buf, const byte * key,
			 uint key_len, enum ha_rkey_function find_flag);
  virtual int index_read_idx(byte * buf, uint index, const byte * key,
			     uint key_len, enum ha_rkey_function find_flag);
  int open(const char *name, int mode, uint test_if_locked);
  int close(void);
  int write_row(byte * buf);
@@ -84,8 +95,9 @@ class ha_archive: public handler
  int rnd_next(byte *buf);
  int rnd_pos(byte * buf, byte *pos);
  int get_row(azio_stream *file_to_read, byte *buf);
  int read_meta_file(File meta_file, ha_rows *rows);
  int write_meta_file(File meta_file, ha_rows rows, bool dirty);
  int read_meta_file(File meta_file, ha_rows *rows, ulonglong *auto_increment);
  int write_meta_file(File meta_file, ha_rows rows, 
                      ulonglong auto_increment, bool dirty);
  ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table);
  int free_share(ARCHIVE_SHARE *share);
  bool auto_repair() const { return 1; } // For the moment we just do this