Commit 6c2b5613 authored by unknown's avatar unknown
Browse files

Bug #17415 special character tables are not handled correctly in ndb binlog/schema dist

parent 36017fce
Loading
Loading
Loading
Loading
+30 −31
Original line number Diff line number Diff line
@@ -477,8 +477,7 @@ ha_ndbcluster::invalidate_dictionary_cache(TABLE_SHARE *share, Ndb *ndb,

#ifdef HAVE_NDB_BINLOG
  char key[FN_REFLEN];
  strxnmov(key, FN_LEN-1, mysql_data_home, "/",
           dbname, "/", tabname, NullS);
  build_table_filename(key, sizeof(key), dbname, tabname, "");
  DBUG_PRINT("info", ("Getting ndbcluster mutex"));
  pthread_mutex_lock(&ndbcluster_mutex);
  NDB_SHARE *ndb_share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
@@ -4191,16 +4190,14 @@ int ha_ndbcluster::create(const char *name,
  NDBCOL col;
  uint pack_length, length, i, pk_length= 0;
  const void *data, *pack_data;
  char name2[FN_HEADLEN];
  bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);

  DBUG_ENTER("ha_ndbcluster::create");
  DBUG_PRINT("enter", ("name: %s", name));

  strcpy(name2, name);
  DBUG_ASSERT(*fn_rext((char*)name2) == 0);
  set_dbname(name2);
  set_tabname(name2);    
  DBUG_ASSERT(*fn_rext((char*)name) == 0);
  set_dbname(name);
  set_tabname(name);

  table= form;
  if (create_from_engine)
@@ -4213,7 +4210,7 @@ int ha_ndbcluster::create(const char *name,
    if ((my_errno= write_ndb_file(name)))
      DBUG_RETURN(my_errno);
#ifdef HAVE_NDB_BINLOG
    ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2),
    ndbcluster_create_binlog_setup(get_ndb(), name, strlen(name),
                                   m_dbname, m_tabname, FALSE);
#endif /* HAVE_NDB_BINLOG */
    DBUG_RETURN(my_errno);
@@ -4361,18 +4358,18 @@ int ha_ndbcluster::create(const char *name,
      First make sure we get a "fresh" share here, not an old trailing one...
    */
    {
      const char *key= name2;
      uint length= (uint) strlen(key);
      uint length= (uint) strlen(name);
      if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                           (byte*) key, length)))
                                           (byte*) name, length)))
        handle_trailing_share(share);
    }
    /*
      get a new share
    */
    if (!(share= get_share(name2, form, true, true)))

    if (!(share= get_share(name, form, true, true)))
    {
      sql_print_error("NDB: allocating table share for %s failed", name2);
      sql_print_error("NDB: allocating table share for %s failed", name);
      /* my_errno is set */
    }
    pthread_mutex_unlock(&ndbcluster_mutex);
@@ -4413,7 +4410,7 @@ int ha_ndbcluster::create(const char *name,
          ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0)
      {
        sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
                        " Event: %s", name2);
                        " Event: %s", name);
        /* a warning has been issued to the client */
      }
      if (share && !do_event_op)
@@ -5285,7 +5282,7 @@ int ndbcluster_discover(THD* thd, const char *db, const char *name,
  NDBDICT* dict= ndb->getDictionary();
  dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
  dict->invalidateTable(name);
  strxnmov(key, FN_LEN-1, mysql_data_home, "/", db, "/", name, NullS);
  build_table_filename(key, sizeof(key), db, name, "");
  NDB_SHARE *share= get_share(key, 0, false);
  if (share && get_ndb_share_state(share) == NSS_ALTERED)
  {
@@ -5419,13 +5416,14 @@ int ndbcluster_drop_database_impl(const char *path)
  }
  // Drop any tables belonging to database
  char full_path[FN_REFLEN];
  char *tmp= strxnmov(full_path, FN_REFLEN-1, share_prefix, dbname, "/",
                      NullS);
  char *tmp= full_path +
    build_table_filename(full_path, sizeof(full_path), dbname, "", "");

  ndb->setDatabaseName(dbname);
  List_iterator_fast<char> it(drop_list);
  while ((tabname=it++))
  {
    strxnmov(tmp, FN_REFLEN - (tmp - full_path)-1, tabname, NullS);
    tablename_to_filename(tabname, tmp, FN_REFLEN - (tmp - full_path)-1);
    if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
    {
      const NdbError err= dict->getNdbError();
@@ -5518,14 +5516,16 @@ int ndbcluster_find_all_files(THD *thd)
        continue;
    
      /* check if database exists */
      char *end= strxnmov(key, FN_LEN-1, mysql_data_home, "/",
                          elmt.database, NullS);
      char *end= key +
        build_table_filename(key, sizeof(key), elmt.database, "", "");
      if (my_access(key, F_OK))
      {
        /* no such database defined, skip table */
        continue;
      }
      end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS);
      /* finalize construction of path */
      end+= tablename_to_filename(elmt.name, end,
                                  sizeof(key)-(end-key));
      const void *data= 0, *pack_data= 0;
      uint length, pack_length;
      int discover= 0;
@@ -5660,10 +5660,9 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
    }
    
    // File is not in NDB, check for .ndb file with this name
    (void)strxnmov(name, FN_REFLEN-1,
                   mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
    build_table_filename(name, sizeof(name), db, file_name, ha_ndb_ext);
    DBUG_PRINT("info", ("Check access for %s", name));
    if (access(name, F_OK))
    if (my_access(name, F_OK))
    {
      DBUG_PRINT("info", ("%s did not exist on disk", name));     
      // .ndb file did not exist on disk, another table type
@@ -5685,12 +5684,13 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
#ifdef HAVE_NDB_BINLOG
  /* setup logging to binlog for all discovered tables */
  {
    char *end, *end1=
      strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS);
    char *end, *end1= name +
      build_table_filename(name, sizeof(name), db, "", "");
    for (i= 0; i < ok_tables.records; i++)
    {
      file_name= (char*)hash_element(&ok_tables, i);
      end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
      end= end1 +
        tablename_to_filename(file_name, end1, sizeof(name) - (end1 - name));
      pthread_mutex_lock(&LOCK_open);
      ndbcluster_create_binlog_setup(ndb, name, end-name,
                                     db, file_name, TRUE);
@@ -5707,9 +5707,8 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,
    file_name= hash_element(&ndb_tables, i);
    if (!hash_search(&ok_tables, file_name, strlen(file_name)))
    {
      strxnmov(name, sizeof(name)-1,
               mysql_data_home, "/", db, "/", file_name, reg_ext, NullS);
      if (access(name, F_OK))
      build_table_filename(name, sizeof(name), db, file_name, reg_ext);
      if (my_access(name, F_OK))
      {
        DBUG_PRINT("info", ("%s must be discovered", file_name));
        // File is in list of ndb tables and not in ok_tables
@@ -6243,7 +6242,7 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
  NDB_SHARE *share;
  DBUG_ENTER("ndb_get_commitcount");

  (void)strxnmov(name, FN_REFLEN-1, share_prefix, dbname, "/", tabname, NullS);
  build_table_filename(name, sizeof(name), dbname, tabname, "");
  DBUG_PRINT("enter", ("name: %s", name));
  pthread_mutex_lock(&ndbcluster_mutex);
  if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+7 −14
Original line number Diff line number Diff line
@@ -659,11 +659,8 @@ static int ndbcluster_create_apply_status_table(THD *thd)
    if so, remove it since there is none in Ndb
  */
  {
    strxnmov(buf, sizeof(buf),
             mysql_data_home,
             "/" NDB_REP_DB "/" NDB_APPLY_TABLE,
             reg_ext, NullS);
    unpack_filename(buf,buf);
    build_table_filename(buf, sizeof(buf),
                         NDB_REP_DB, NDB_APPLY_TABLE, reg_ext);
    my_delete(buf, MYF(0));
  }

@@ -711,11 +708,8 @@ static int ndbcluster_create_schema_table(THD *thd)
    if so, remove it since there is none in Ndb
  */
  {
    strxnmov(buf, sizeof(buf),
             mysql_data_home,
             "/" NDB_REP_DB "/" NDB_SCHEMA_TABLE,
             reg_ext, NullS);
    unpack_filename(buf,buf);
    build_table_filename(buf, sizeof(buf),
                         NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext);
    my_delete(buf, MYF(0));
  }

@@ -940,8 +934,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  if (get_a_share)
  {
    char key[FN_REFLEN];
    (void)strxnmov(key, FN_REFLEN, share_prefix, db,
                   "/", table_name, NullS);
    build_table_filename(key, sizeof(key), db, table_name, "");
    share= get_share(key, 0, false, false);
  }

@@ -1434,8 +1427,8 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
        case SOT_CLEAR_SLOCK:
        {
          char key[FN_REFLEN];
          (void)strxnmov(key, FN_REFLEN, share_prefix, schema->db,
                         "/", schema->name, NullS);
          build_table_filename(key, sizeof(key),
                               schema->db, schema->name, "");
          NDB_SHARE *share= get_share(key, 0, false, false);
          if (share)
          {