Commit 475acacb authored by unknown's avatar unknown
Browse files

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new

parents e0a19fd9 7c45e615
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -183,3 +183,9 @@ connection master;

DROP TABLE IF EXISTS test.t1;
DROP TABLE IF EXISTS test.t2;
# ensure cleanup on slave as well:
# ndb blob tables consist of several tables
# if cluster is shutdown while not all tables are
# properly dropped, the table becomes inconsistent
# and wrecks later test cases
--sync_slave_with_master
+73 −20
Original line number Diff line number Diff line
@@ -4480,6 +4480,21 @@ int ha_ndbcluster::create(const char *name,
    DBUG_RETURN(my_errno);
  }

#ifdef HAVE_NDB_BINLOG
  /*
    Don't allow table creation unless
    schema distribution table is setup
    ( unless it is a creation of the schema dist table itself )
  */
  if (!schema_share &&
      !(strcmp(m_dbname, NDB_REP_DB) == 0 &&
        strcmp(m_tabname, NDB_SCHEMA_TABLE) == 0))
  {
    DBUG_PRINT("info", ("Schema distribution table not setup"));
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }
#endif /* HAVE_NDB_BINLOG */

  DBUG_PRINT("table", ("name: %s", m_tabname));  
  tab.setName(m_tabname);
  tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));    
@@ -5004,7 +5019,8 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
    is_old_table_tmpfile= 0;
    String event_name(INJECTOR_EVENT_LEN);
    ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
    ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share);
    ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share,
                                 "rename table");
  }

  if (!result && !IS_TMP_PREFIX(new_tabname))
@@ -5088,6 +5104,15 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
  DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
  NDBDICT *dict= ndb->getDictionary();
#ifdef HAVE_NDB_BINLOG
  /*
    Don't allow drop table unless
    schema distribution table is setup
  */
  if (!schema_share)
  {
    DBUG_PRINT("info", ("Schema distribution table not setup"));
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }
  NDB_SHARE *share= get_share(path, 0, false);
#endif

@@ -5156,7 +5181,7 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
    ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
    ndbcluster_handle_drop_table(ndb,
                                 table_dropped ? event_name.c_ptr() : 0,
                                 share);
                                 share, "delete table");
  }

  if (share)
@@ -5185,6 +5210,18 @@ int ha_ndbcluster::delete_table(const char *name)
  set_dbname(name);
  set_tabname(name);

#ifdef HAVE_NDB_BINLOG
  /*
    Don't allow drop table unless
    schema distribution table is setup
  */
  if (!schema_share)
  {
    DBUG_PRINT("info", ("Schema distribution table not setup"));
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }
#endif

  if (check_ndb_connection())
    DBUG_RETURN(HA_ERR_NO_CONNECTION);

@@ -5406,6 +5443,11 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
  if (!res)
    info(HA_STATUS_VARIABLE | HA_STATUS_CONST);

#ifdef HAVE_NDB_BINLOG
  if (!ndb_binlog_tables_inited && ndb_binlog_running)
    table->db_stat|= HA_READ_ONLY;
#endif

  DBUG_RETURN(res);
}

@@ -5704,6 +5746,19 @@ int ndbcluster_drop_database_impl(const char *path)

static void ndbcluster_drop_database(char *path)
{
  DBUG_ENTER("ndbcluster_drop_database");
#ifdef HAVE_NDB_BINLOG
  /*
    Don't allow drop database unless
    schema distribution table is setup
  */
  if (!schema_share)
  {
    DBUG_PRINT("info", ("Schema distribution table not setup"));
    DBUG_VOID_RETURN;
    //DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }
#endif
  ndbcluster_drop_database_impl(path);
#ifdef HAVE_NDB_BINLOG
  char db[FN_REFLEN];
@@ -5712,6 +5767,7 @@ static void ndbcluster_drop_database(char *path)
                           current_thd->query, current_thd->query_length,
                           db, "", 0, 0, SOT_DROP_DB);
#endif
  DBUG_VOID_RETURN;
}
/*
  find all tables in ndb and discover those needed
@@ -5740,29 +5796,30 @@ int ndbcluster_find_all_files(THD *thd)

  NDBDICT *dict= ndb->getDictionary();

  int unhandled, retries= 5;
  int unhandled, retries= 5, skipped;
  do
  {
    if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
      ERR_RETURN(dict->getNdbError());
    unhandled= 0;
    skipped= 0;
    retries--;
    for (uint i= 0 ; i < list.count ; i++)
    {
      NDBDICT::List::Element& elmt= list.elements[i];
      int do_handle_table= 0;
      if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
      {
        DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
        continue;
      }
      DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
      if (elmt.state == NDBOBJ::StateOnline ||
          elmt.state == NDBOBJ::StateBackup)
        do_handle_table= 1;
      else if (!(elmt.state == NDBOBJ::StateBuilding))
      if (elmt.state != NDBOBJ::StateOnline &&
          elmt.state != NDBOBJ::StateBackup &&
          elmt.state != NDBOBJ::StateBuilding)
      {
        sql_print_information("NDB: skipping setup table %s.%s, in state %d",
                              elmt.database, elmt.name, elmt.state);
        skipped++;
        continue;
      }

@@ -5771,7 +5828,7 @@ int ndbcluster_find_all_files(THD *thd)

      if (!(ndbtab= dict->getTable(elmt.name)))
      {
        if (do_handle_table)
        if (retries == 0)
          sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
                          elmt.database, elmt.name,
                          dict->getNdbError().code,
@@ -5840,9 +5897,9 @@ int ndbcluster_find_all_files(THD *thd)
      pthread_mutex_unlock(&LOCK_open);
    }
  }
  while (unhandled && retries--);
  while (unhandled && retries);

  DBUG_RETURN(0);
  DBUG_RETURN(-(skipped + unhandled));
}

int ndbcluster_find_files(THD *thd,const char *db,const char *path,
@@ -7706,6 +7763,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
    pthread_cond_wait(&COND_server_started, &LOCK_server_started);
  pthread_mutex_unlock(&LOCK_server_started);

  ndbcluster_util_inited= 1;

  /*
    Wait for cluster to start
  */
@@ -7737,6 +7796,8 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
  }

#ifdef HAVE_NDB_BINLOG
  if (ndb_extra_logging && ndb_binlog_running)
    sql_print_information("NDB Binlog: Ndb tables initially read only.");
  /* create tables needed by the replication */
  ndbcluster_setup_binlog_table_shares(thd);
#else
@@ -7746,17 +7807,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
  ndbcluster_find_all_files(thd);
#endif

  ndbcluster_util_inited= 1;

#ifdef HAVE_NDB_BINLOG
  /* Signal injector thread that all is setup */
  pthread_cond_signal(&injector_cond);
#endif

  set_timespec(abstime, 0);
  for (;!abort_loop;)
  {

    pthread_mutex_lock(&LOCK_ndb_util_thread);
    pthread_cond_timedwait(&COND_ndb_util_thread,
                           &LOCK_ndb_util_thread,
@@ -7774,7 +7827,7 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
      Check that the apply_status_share and schema_share has been created.
      If not try to create it
    */
    if (!apply_status_share || !schema_share)
    if (!ndb_binlog_tables_inited)
      ndbcluster_setup_binlog_table_shares(thd);
#endif

+103 −29
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ int ndb_binlog_thread_running= 0;
  FALSE if not
*/
my_bool ndb_binlog_running= FALSE;
my_bool ndb_binlog_tables_inited= FALSE;

/*
  Global reference to the ndb injector thread THD oject
@@ -775,33 +776,51 @@ static int ndbcluster_create_schema_table(THD *thd)
  DBUG_RETURN(0);
}

void ndbcluster_setup_binlog_table_shares(THD *thd)
int ndbcluster_setup_binlog_table_shares(THD *thd)
{
  int done_find_all_files= 0;
  if (!schema_share &&
      ndbcluster_check_schema_share() == 0)
  {
    if (!done_find_all_files)
    pthread_mutex_lock(&LOCK_open);
    ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
    pthread_mutex_unlock(&LOCK_open);
    if (!schema_share)
    {
      ndbcluster_find_all_files(thd);
      done_find_all_files= 1;
    }
      ndbcluster_create_schema_table(thd);
      // always make sure we create the 'schema' first
      if (!schema_share)
      return;
        return 1;
    }
  }
  if (!apply_status_share &&
      ndbcluster_check_apply_status_share() == 0)
  {
    if (!done_find_all_files)
    pthread_mutex_lock(&LOCK_open);
    ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
    pthread_mutex_unlock(&LOCK_open);
    if (!apply_status_share)
    {
      ndbcluster_find_all_files(thd);
      done_find_all_files= 1;
    }
      ndbcluster_create_apply_status_table(thd);
      if (!apply_status_share)
        return 1;
    }
  }
  if (!ndbcluster_find_all_files(thd))
  {
    pthread_mutex_lock(&LOCK_open);
    ndb_binlog_tables_inited= TRUE;
    if (ndb_binlog_running)
    {
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: ndb tables writable");
      close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, TRUE);
    }
    pthread_mutex_unlock(&LOCK_open);
    /* Signal injector thread that all is setup */
    pthread_cond_signal(&injector_cond);
  }
  return 0;
}

/*
  Defines and struct for schema table.
@@ -936,6 +955,31 @@ static char *ndb_pack_varchar(const NDBCOL *col, char *buf,
/*
  log query in schema table
*/
static void ndb_report_waiting(const char *key,
                               int the_time,
                               const char *op,
                               const char *obj)
{
  ulonglong ndb_latest_epoch= 0;
  const char *proc_info= "<no info>";
  pthread_mutex_lock(&injector_mutex);
  if (injector_ndb)
    ndb_latest_epoch= injector_ndb->getLatestGCI();
  if (injector_thd)
    proc_info= injector_thd->proc_info;
  pthread_mutex_unlock(&injector_mutex);
  sql_print_information("NDB %s:"
                        " waiting max %u sec for %s %s."
                        "  epochs: (%u,%u,%u)"
                        "  injector proc_info: %s"
                        ,key, the_time, op, obj
                        ,(uint)ndb_latest_handled_binlog_epoch
                        ,(uint)ndb_latest_received_binlog_epoch
                        ,(uint)ndb_latest_epoch
                        ,proc_info
                        );
}

int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
                             const char *query, int query_length,
                             const char *db, const char *table_name,
@@ -965,6 +1009,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  }

  char tmp_buf2[FN_REFLEN];
  const char *type_str;
  switch (type)
  {
  case SOT_DROP_TABLE:
@@ -975,6 +1020,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
    query= tmp_buf2;
    query_length= (uint) (strxmov(tmp_buf2, "drop table `",
                                  table_name, "`", NullS) - tmp_buf2);
    type_str= "drop table";
    break;
  case SOT_RENAME_TABLE:
    /* redo the rename table query as is may contain several tables */
@@ -982,20 +1028,28 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
    query_length= (uint) (strxmov(tmp_buf2, "rename table `",
                                  old_db, ".", old_table_name, "` to `",
                                  db, ".", table_name, "`", NullS) - tmp_buf2);
    type_str= "rename table";
    break;
  case SOT_CREATE_TABLE:
    // fall through
    type_str= "create table";
    break;
  case SOT_ALTER_TABLE:
    type_str= "create table";
    break;
  case SOT_DROP_DB:
    type_str= "drop db";
    break;
  case SOT_CREATE_DB:
    type_str= "create db";
    break;
  case SOT_ALTER_DB:
    type_str= "alter db";
    break;
  case SOT_TABLESPACE:
    type_str= "tablespace";
    break;
  case SOT_LOGFILE_GROUP:
    type_str= "logfile group";
    break;
  default:
    abort(); /* should not happen, programming error */
@@ -1201,13 +1255,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
      max_timeout--;
      if (max_timeout == 0)
      {
        sql_print_error("NDB create table: timed out. Ignoring...");
        sql_print_error("NDB %s: distibuting %s timed out. Ignoring...",
                        type_str, ndb_schema_object->key);
        break;
      }
      if (ndb_extra_logging)
        sql_print_information("NDB create table: "
                              "waiting max %u sec for create table %s.",
                              max_timeout, ndb_schema_object->key);
        ndb_report_waiting(type_str, max_timeout,
                           "distributing", ndb_schema_object->key);
    }
    (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
  }
@@ -1689,9 +1743,15 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
      // skip
      break;
    case NDBEVENT::TE_CLUSTER_FAILURE:
      // fall through
    case NDBEVENT::TE_DROP:
      if (ndb_extra_logging &&
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
      free_share(&schema_share);
      schema_share= 0;
      ndb_binlog_tables_inited= FALSE;
      // fall through
    case NDBEVENT::TE_ALTER:
      ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
@@ -2494,9 +2554,15 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,

  get_share(share);
  if (do_apply_status_share)
  {
    apply_status_share= get_share(share);
    (void) pthread_cond_signal(&injector_cond);
  }
  else if (do_schema_share)
  {
    schema_share= get_share(share);
    (void) pthread_cond_signal(&injector_cond);
  }

  DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
                     share->key, share->op, share->use_count));
@@ -2513,7 +2579,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/
int
ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
                             NDB_SHARE *share)
                             NDB_SHARE *share, const char *type_str)
{
  DBUG_ENTER("ndbcluster_handle_drop_table");

@@ -2581,9 +2647,8 @@ ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
      break;
    }
    if (ndb_extra_logging)
      sql_print_information("NDB delete table: "
                            "waiting max %u sec for drop table %s.",
                            max_timeout, share->key);
      ndb_report_waiting(type_str, max_timeout,
                         type_str, share->key);
  }
  (void) pthread_mutex_unlock(&share->mutex);
#else
@@ -2660,13 +2725,18 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
  switch (type)
  {
  case NDBEVENT::TE_CLUSTER_FAILURE:
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
    if (apply_status_share == share)
    {
      if (ndb_extra_logging &&
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
      free_share(&apply_status_share);
      apply_status_share= 0;
      ndb_binlog_tables_inited= FALSE;
    }
    if (ndb_extra_logging)
      sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
    DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
                        "%s  received share: 0x%lx  op: %lx  share op: %lx  "
                        "op_old: %lx",
@@ -2675,8 +2745,13 @@ ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
  case NDBEVENT::TE_DROP:
    if (apply_status_share == share)
    {
      if (ndb_extra_logging &&
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
      free_share(&apply_status_share);
      apply_status_share= 0;
      ndb_binlog_tables_inited= FALSE;
    }
    /* ToDo: remove printout */
    if (ndb_extra_logging)
@@ -3087,7 +3162,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
  thd->proc_info= "Waiting for ndbcluster to start";

  pthread_mutex_lock(&injector_mutex);
  while (!ndbcluster_util_inited)
  while (!schema_share || !apply_status_share)
  {
    /* ndb not connected yet */
    struct timespec abstime;
@@ -3119,10 +3194,6 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
    thd->db= db;
    if (ndb_binlog_running)
      open_binlog_index(thd, &binlog_tables, &binlog_index);
    if (!apply_status_share)
    {
      sql_print_error("NDB: Could not get apply status share");
    }
    thd->db= db;
  }

@@ -3184,6 +3255,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)

    if (unlikely(schema_res > 0))
    {
      thd->proc_info= "Processing events from schema table";
      schema_ndb->
        setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
      schema_ndb->
@@ -3379,6 +3451,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
        if (trans.good())
        {
          //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
          thd->proc_info= "Committing events to binlog";
          injector::transaction::binlog_pos start= trans.start_pos();
          if (int r= trans.commit())
          {
@@ -3418,6 +3491,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
  }
err:
  DBUG_PRINT("info",("Shutting down cluster binlog thread"));
  thd->proc_info= "Shutting down";
  close_thread_tables(thd);
  pthread_mutex_lock(&injector_mutex);
  /* don't mess with the injector_ndb anymore from other threads */
+4 −2
Original line number Diff line number Diff line
@@ -101,7 +101,8 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
                             const char *old_db= 0,
                             const char *old_table_name= 0);
int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
                                 NDB_SHARE *share);
                                 NDB_SHARE *share,
                                 const char *type_str);
void ndb_rep_event_name(String *event_name,
                        const char *db, const char *tbl);
int ndb_create_table_from_engine(THD *thd, const char *db,
@@ -112,12 +113,13 @@ pthread_handler_t ndb_binlog_thread_func(void *arg);
/*
  table cluster_replication.apply_status
*/
void ndbcluster_setup_binlog_table_shares(THD *thd);
int ndbcluster_setup_binlog_table_shares(THD *thd);
extern NDB_SHARE *apply_status_share;
extern NDB_SHARE *schema_share;

extern THD *injector_thd;
extern my_bool ndb_binlog_running;
extern my_bool ndb_binlog_tables_inited;

bool
ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,