Commit 12e660e8 authored by unknown's avatar unknown
Browse files

Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb_improved_on-line_discover

into  mysql.com:/usr/local/home/marty/MySQL/mysql-5.1-new-ndb_improved_on-line_discover


sql/ha_ndbcluster.cc:
  Auto merged
sql/ha_ndbcluster.h:
  Auto merged
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Auto merged
sql/ha_ndbcluster_binlog.cc:
  Merge
parents b6717496 9e348383
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -3,10 +3,19 @@ create table t1 ( a int primary key, b varchar(10), c varchar(10), index (b) )
engine=ndb;
insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three');
create index c on t1(c);
show indexes from t1;
Table	Non_unique	Key_name	Seq_in_index	Column_name	Collation	Cardinality	Sub_part	Packed	Null	Index_type	Comment
t1	0	PRIMARY	1	a	A	3	NULL	NULL		BTREE	
t1	1	b	1	b	A	3	NULL	NULL	YES	BTREE	
t1	1	c	1	c	A	3	NULL	NULL	YES	BTREE	
select * from t1 where c = 'two';
a	b	c
2	two	two
alter table t1 drop index c;
show indexes from t1;
Table	Non_unique	Key_name	Seq_in_index	Column_name	Collation	Cardinality	Sub_part	Packed	Null	Index_type	Comment
t1	0	PRIMARY	1	a	A	3	NULL	NULL		BTREE	
t1	1	b	1	b	A	3	NULL	NULL	YES	BTREE	
select * from t1 where c = 'two';
a	b	c
2	two	two
+2 −0
Original line number Diff line number Diff line
@@ -13,10 +13,12 @@ engine=ndb;
insert into t1 values (1,'one','one'), (2,'two','two'), (3,'three','three');
create index c on t1(c); 
connection server2;
show indexes from t1;
select * from t1 where c = 'two';
connection server1;
alter table t1 drop index c;
connection server2;
show indexes from t1;
select * from t1 where c = 'two';
connection server1;
drop table t1;
+10 −6
Original line number Diff line number Diff line
@@ -519,6 +519,7 @@ void ha_ndbcluster::invalidate_dictionary_cache(bool global)
  {
    NDBINDEX *index = (NDBINDEX *) m_index[i].index;
    NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
    if (!index && !unique_index) continue;
    NDB_INDEX_TYPE idx_type= m_index[i].type;

    switch (idx_type) {
@@ -991,7 +992,7 @@ bool ha_ndbcluster::uses_blob_value()
    -2   Meta data has changed; Re-read data and try again
*/

static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
            uint pack_length)
{
  DBUG_ENTER("cmp_frm");
@@ -1076,7 +1077,7 @@ int ha_ndbcluster::get_metadata(const char *path)
  m_table= (void *)tab; 
  m_table_info= NULL; // Set in external lock
  
  DBUG_RETURN(open_indexes(ndb, table));
  DBUG_RETURN(open_indexes(ndb, table, FALSE));
}

static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
@@ -1249,7 +1250,7 @@ int ha_ndbcluster::add_index_handle(THD *thd, NDBDICT *dict, KEY *key_info,
/*
  Associate index handles for each index of a table
*/
int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab)
int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error)
{
  uint i;
  int error= 0;
@@ -1263,6 +1264,9 @@ int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab)
  for (i= 0; i < tab->s->keys; i++, key_info++, key_name++)
  {
    if ((error= add_index_handle(thd, dict, key_info, *key_name, i)))
      if (ignore_error)
        m_index[i].index= m_index[i].unique_index= NULL;
      else
        break;
  }
  
@@ -3739,7 +3743,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
      {
        m_table= (void *)tab;
        m_table_version = tab->getObjectVersion();
        if (!(my_errno= open_indexes(ndb, table)))
        if (!(my_errno= open_indexes(ndb, table, FALSE)))
          DBUG_RETURN(my_errno);
      }
      m_table_info= tab_info;
+4 −1
Original line number Diff line number Diff line
@@ -691,6 +691,9 @@ static void set_tabname(const char *pathname, char *tabname);

private:
  friend int ndbcluster_drop_database_impl(const char *path);
  friend int ndb_handle_schema_change(THD *thd, 
                                      Ndb *ndb, NdbEventOperation *pOp,
                                      NDB_SHARE *share);
  int alter_table_name(const char *to);
  static int delete_table(ha_ndbcluster *h, Ndb *ndb,
			  const char *path,
@@ -708,7 +711,7 @@ static void set_tabname(const char *pathname, char *tabname);
  int create_indexes(Ndb *ndb, TABLE *tab);
  void clear_index(int i);
  void clear_indexes();
  int open_indexes(Ndb *ndb, TABLE *tab);
  int open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error);
  void renumber_indexes(Ndb *ndb, TABLE *tab);
  int drop_indexes(Ndb *ndb, TABLE *tab);
  int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
+165 −79
Original line number Diff line number Diff line
@@ -230,60 +230,13 @@ static void run_query(THD *thd, char *buf, char *end,
  }
}

/*
  Initialize the binlog part of the NDB_SHARE
*/
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
{
  THD *thd= current_thd;
  MEM_ROOT *mem_root= &share->mem_root;
  int do_event_op= ndb_binlog_running;

  share->op= 0;
  share->table= 0;

  if (!schema_share &&
      strcmp(share->db, NDB_REP_DB) == 0 &&
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
    do_event_op= 1;

  {
    int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
    share->subscriber_bitmap= (MY_BITMAP*)
      alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
    for (i= 0; i < no_nodes; i++)
    {
      bitmap_init(&share->subscriber_bitmap[i],
                  (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
                  max_ndb_nodes, false);
      bitmap_clear_all(&share->subscriber_bitmap[i]);
    }
    bitmap_init(&share->slock_bitmap, share->slock,
                sizeof(share->slock)*8, false);
    bitmap_clear_all(&share->slock_bitmap);
  }

  if (!do_event_op)
  {
    if (_table)
    {
      if (_table->s->primary_key == MAX_KEY)
        share->flags|= NSF_HIDDEN_PK;
      if (_table->s->blob_fields != 0)
        share->flags|= NSF_BLOB_FLAG;
    }
    else
    {
      share->flags|= NSF_NO_BINLOG;
    }
    return;
  }
  while (1) 
int
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
                             TABLE_SHARE *table_share, TABLE *table)
{
    TABLE_SHARE *table_share= 
      (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
    TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
  int error;
  MEM_ROOT *mem_root= &share->mem_root;
  DBUG_ENTER("ndbcluster_binlog_open_table");
  
  init_tmp_table_share(table_share, share->db, 0, share->table_name, 
                       share->key);
@@ -296,7 +249,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
    table_share= 0;
    my_free((gptr) table, MYF(0));
    table= 0;
      break;
    DBUG_RETURN(error);
  }
  if ((error= open_table_from_share(thd, table_share, "", 0, 
                                    (uint) READ_ALL, 0, table, FALSE)))
@@ -308,7 +261,7 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
    table_share= 0;
    my_free((gptr) table, MYF(0));
    table= 0;
      break;
    DBUG_RETURN(error);
  }
  assign_new_table_id(table);
  if (!table->record[1] || table->record[1] == table->record[0])
@@ -338,6 +291,67 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
  share->ndb_value[1]= (NdbValue*)
    alloc_root(mem_root, sizeof(NdbValue) * table->s->fields
               +1 /*extra for hidden key*/);

  DBUG_RETURN(0);
}


/*
  Initialize the binlog part of the NDB_SHARE
*/
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
{
  THD *thd= current_thd;
  MEM_ROOT *mem_root= &share->mem_root;
  int do_event_op= ndb_binlog_running;

  share->op= 0;
  share->table= 0;

  if (!schema_share &&
      strcmp(share->db, NDB_REP_DB) == 0 &&
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
    do_event_op= 1;

  {
    int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
    share->subscriber_bitmap= (MY_BITMAP*)
      alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
    for (i= 0; i < no_nodes; i++)
    {
      bitmap_init(&share->subscriber_bitmap[i],
                  (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
                  max_ndb_nodes, false);
      bitmap_clear_all(&share->subscriber_bitmap[i]);
    }
    bitmap_init(&share->slock_bitmap, share->slock,
                sizeof(share->slock)*8, false);
    bitmap_clear_all(&share->slock_bitmap);
  }

  if (!do_event_op)
  {
    if (_table)
    {
      if (_table->s->primary_key == MAX_KEY)
        share->flags|= NSF_HIDDEN_PK;
      if (_table->s->blob_fields != 0)
        share->flags|= NSF_BLOB_FLAG;
    }
    else
    {
      share->flags|= NSF_NO_BINLOG;
    }
    return;
  }
  while (1) 
  {
    int error;
    TABLE_SHARE *table_share= 
      (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
    TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
    if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
      break;
    if (table->s->primary_key == MAX_KEY)
      share->flags|= NSF_HIDDEN_PK;
    if (table->s->blob_fields != 0)
@@ -1285,24 +1299,85 @@ ndbcluster_update_slock(THD *thd,
/*
  Handle _non_ data events from the storage nodes
*/
static int
//static int
int
ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
                         NDB_SHARE *share)
{
  DBUG_ENTER("ndb_handle_schema_change");
  int remote_drop_table= 0, do_close_cached_tables= 0;
  const char *dbname= share->table->s->db.str;
  const char *tabname= share->table->s->table_name.str;
  bool online_alter_table= (pOp->getEventType() == NDBEVENT::TE_ALTER &&
                            pOp->tableFrmChanged());

  if (pOp->getEventType() != NDBEVENT::TE_CLUSTER_FAILURE &&
      pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
      (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id())
  {
    ndb->setDatabaseName(share->table->s->db.str);
    ha_ndbcluster::invalidate_dictionary_cache(share->table->s,
                                               ndb,
                                               share->table->s->db.str,
                                               share->table->s->table_name.str,
                                               TRUE);
    TABLE_SHARE *table_share= share->table->s; //share->table_share;
    TABLE* table= share->table;
    
    /* 
       Invalidate table and all it's indexes
    */
    ha_ndbcluster table_handler(table_share);
    table_handler.set_dbname(share->key);
    table_handler.set_tabname(share->key);
    table_handler.open_indexes(ndb, table, TRUE);
    table_handler.invalidate_dictionary_cache(TRUE);
    
    if (online_alter_table)
    {  
      char key[FN_REFLEN];
      const void *data= 0, *pack_data= 0;
      uint length, pack_length;
      int error;
      NDBDICT *dict= ndb->getDictionary();
      const NDBTAB *altered_table= pOp->getTable();

      DBUG_PRINT("info", ("Detected frm change of table %s.%s",
                          dbname, tabname));
      build_table_filename(key, FN_LEN-1, dbname, tabname, NullS);
      /*
        If the frm of the altered table is different than the one on
        disk then overwrite it with the new table definition
      */
      if (readfrm(key, &data, &length) == 0 &&
          packfrm(data, length, &pack_data, &pack_length) == 0 &&
          cmp_frm(altered_table, pack_data, pack_length))
      {
        DBUG_DUMP("frm", (char*)altered_table->getFrmData(), 
                  altered_table->getFrmLength());
        pthread_mutex_lock(&LOCK_open);
        dict->putTable(altered_table);
        
        if ((error= unpackfrm(&data, &length, altered_table->getFrmData())) ||
            (error= writefrm(key, data, length)))
        {
          sql_print_information("NDB: Failed write frm for %s.%s, error %d",
                                dbname, tabname, error);
        }
        pthread_mutex_unlock(&LOCK_open);
        close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
        /*
        if ((error= ndbcluster_binlog_open_table(thd, share, 
                                                 table_share, table)))
          sql_print_information("NDB: Failed to re-open table %s.%s",
                                dbname, tabname);
        */
      }
    }
    remote_drop_table= 1;
  }

  // If only frm was changed continue replicating
  if (online_alter_table)
  {
    /* Signal ha_ndbcluster::alter_table that drop is done */
    (void) pthread_cond_signal(&injector_cond);
    DBUG_RETURN(0);
  }

  (void) pthread_mutex_lock(&share->mutex);
  DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
  if (share->op_old == pOp)
@@ -1481,11 +1556,16 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
      // skip
      break;
    case NDBEVENT::TE_ALTER:
      if (pOp->tableNameChanged())
      {  
        DBUG_PRINT("info", ("Detected name change of table %s.%s",
                            share->db, share->table_name));
        /* do the rename of the table in the share */
        share->table->s->db.str= share->db;
        share->table->s->db.length= strlen(share->db);
        share->table->s->table_name.str= share->table_name;
        share->table->s->table_name.length= strlen(share->table_name);
      }
      ndb_handle_schema_change(thd, ndb, pOp, share);
      break;
    case NDBEVENT::TE_CLUSTER_FAILURE:
@@ -1766,6 +1846,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
  /* Handle any trailing share */
  NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                             (byte*) key, key_len);

  if (share && share_may_exist)
  {
    if (share->flags & NSF_NO_BINLOG ||
@@ -2362,6 +2443,10 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
                       share->key, share, pOp, share->op, share->op_old));
    break;
  case NDBEVENT::TE_ALTER:
    if (pOp->tableNameChanged())
    {
      DBUG_PRINT("info", ("Detected name change of table %s.%s",
                          share->db, share->table_name));
      /* ToDo: remove printout */
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
@@ -2373,6 +2458,7 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
      share->table->s->db.length= strlen(share->db);
      share->table->s->table_name.str= share->table_name;
      share->table->s->table_name.length= strlen(share->table_name);
    }
    goto drop_alter_common;
  case NDBEVENT::TE_DROP:
    if (apply_status_share == share)
Loading