Commit d913b14a authored by unknown's avatar unknown
Browse files

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql51

parents 07104f36 04120069
Loading
Loading
Loading
Loading
+16 −48
Original line number Diff line number Diff line
@@ -4189,8 +4189,8 @@ int ha_ndbcluster::create(const char *name,
    if ((my_errno= write_ndb_file(name)))
      DBUG_RETURN(my_errno);
#ifdef HAVE_NDB_BINLOG
    if (ndb_binlog_thread_running > 0)
      ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname, 0);
    ndbcluster_create_binlog_setup(get_ndb(), name2, strlen(name2),
                                   m_dbname, m_tabname, FALSE);
#endif /* HAVE_NDB_BINLOG */
    DBUG_RETURN(my_errno);
  }
@@ -4386,6 +4386,8 @@ int ha_ndbcluster::create(const char *name,
                        " Event: %s", name2);
        /* a warning has been issued to the client */
      }
      if (share && ndb_binlog_thread_running <= 0)
        share->flags|= NSF_NO_BINLOG;
      ndbcluster_log_schema_op(current_thd, share,
                               current_thd->query, current_thd->query_length,
                               share->db, share->table_name,
@@ -5460,7 +5462,7 @@ int ndbcluster_find_all_files(THD *thd)
        /* no such database defined, skip table */
        continue;
      }
      strxnmov(end, FN_LEN-1-(key-end), "/", elmt.name, NullS);
      end= strxnmov(end, FN_LEN-1-(end-key), "/", elmt.name, NullS);
      const void *data= 0, *pack_data= 0;
      uint length, pack_length;
      int discover= 0;
@@ -5486,41 +5488,25 @@ int ndbcluster_find_all_files(THD *thd)
      my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
      my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));

      pthread_mutex_lock(&LOCK_open);
      if (discover)
      {
        /* ToDo 4.1 database needs to be created if missing */
        pthread_mutex_lock(&LOCK_open);
        if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
        {
          /* ToDo 4.1 handle error */
        }
        pthread_mutex_unlock(&LOCK_open);
      }
#ifdef HAVE_NDB_BINLOG
      else if (ndb_binlog_thread_running > 0)
      else
      {
        /* set up replication for this table */
        NDB_SHARE *share;
        pthread_mutex_lock(&ndbcluster_mutex);
        if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
                                            (byte*) key, strlen(key)))
              && share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
            || share == 0)
        {
          /*
            there is no binlog creation setup for this table
            attempt to do it
          */
          pthread_mutex_unlock(&ndbcluster_mutex);
          pthread_mutex_lock(&LOCK_open);
          ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name,
                                         share);
          pthread_mutex_unlock(&LOCK_open);
        }
        else
          pthread_mutex_unlock(&ndbcluster_mutex);
        ndbcluster_create_binlog_setup(ndb, key, end-key,
                                       elmt.database, elmt.name,
                                       TRUE);
      }
#endif
      pthread_mutex_unlock(&LOCK_open);
    }
  }
  while (unhandled && retries--);
@@ -5635,36 +5621,18 @@ int ndbcluster_find_files(THD *thd,const char *db,const char *path,

#ifdef HAVE_NDB_BINLOG
  /* setup logging to binlog for all discovered tables */
  if (ndb_binlog_thread_running > 0)
  {
    char *end;
    char *end1=
    char *end, *end1=
      strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS);
    NDB_SHARE *share;
    pthread_mutex_lock(&ndbcluster_mutex);
    for (i= 0; i < ok_tables.records; i++)
    {
      file_name= (char*)hash_element(&ok_tables, i);
      end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
      if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
                                          (byte*)name, end - name))
          && share->op == 0 && share->op_old == 0 && ! (share->flags & NSF_NO_BINLOG))
      {
        /*
          there is no binlog creation setup for this table
          attempt to do it
	*/
        
        pthread_mutex_unlock(&ndbcluster_mutex);
      pthread_mutex_lock(&LOCK_open);
        ndbcluster_create_binlog_setup(ndb, name, db, file_name, share);
      ndbcluster_create_binlog_setup(ndb, name, end-name,
                                     db, file_name, TRUE);
      pthread_mutex_unlock(&LOCK_open);
        pthread_mutex_lock(&ndbcluster_mutex);
    }
      /* Table existed in the mysqld so there should be a share */
      DBUG_ASSERT(share != NULL);
    }
    pthread_mutex_unlock(&ndbcluster_mutex);
  }
#endif

+39 −13
Original line number Diff line number Diff line
@@ -239,11 +239,17 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
  share->table= 0;
  if (ndb_binlog_thread_running <= 0)
  {
    DBUG_ASSERT(_table != 0);
    if (_table)
    {
      if (_table->s->primary_key == MAX_KEY)
        share->flags|= NSF_HIDDEN_PK;
      if (_table->s->blob_fields != 0)
        share->flags|= NSF_BLOB_FLAG;
    }
    else
    {
      share->flags|= NSF_NO_BINLOG;
    }
    return;
  }
  while (1) 
@@ -1626,25 +1632,37 @@ ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
  create/discover.
*/
int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
                                   uint key_len,
                                   const char *db,
                                   const char *table_name,
                                   NDB_SHARE *share)
                                   my_bool share_may_exist)
{
  DBUG_ENTER("ndbcluster_create_binlog_setup");
  DBUG_PRINT("enter",("key: %s  key_len: %d  %s.%s  share_may_exist: %d",
                      key, key_len, db, table_name, share_may_exist));
  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
  DBUG_ASSERT(strlen(key) == key_len);

  pthread_mutex_lock(&ndbcluster_mutex);

  /* Handle any trailing share */
  if (share == 0)
  NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                             (byte*) key, key_len);
  if (share && share_may_exist)
  {
    if (share->flags & NSF_NO_BINLOG ||
        share->op != 0 ||
        share->op_old != 0)
    {
    share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                    (byte*) key, strlen(key));
      pthread_mutex_unlock(&ndbcluster_mutex);
      DBUG_RETURN(0); // replication already setup, or should not
    }
  }

  if (share)
  {
    handle_trailing_share(share);
  }
  else
    handle_trailing_share(share);

  /* Create share which is needed to hold replication information */
  if (!(share= get_share(key, 0, true, true)))
@@ -1652,6 +1670,13 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
    sql_print_error("NDB Binlog: "
                    "allocating table share for %s failed", key);
  }

  if (ndb_binlog_thread_running <= 0)
  {
    share->flags|= NSF_NO_BINLOG;
    pthread_mutex_unlock(&ndbcluster_mutex);
    DBUG_RETURN(0);
  }
  pthread_mutex_unlock(&ndbcluster_mutex);

  while (share && !IS_TMP_PREFIX(table_name))
@@ -1749,6 +1774,7 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
      sql_print_error("NDB Binlog: logging of table %s "
                      "with no PK and blob attributes is not supported",
                      share->key);
      share->flags|= NSF_NO_BINLOG;
      DBUG_RETURN(-1);
    }
    /* No primary key, subscribe for all attributes */
+2 −1
Original line number Diff line number Diff line
@@ -72,9 +72,10 @@ void ndbcluster_binlog_init_handlerton();
void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *table);

int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
                                   uint key_len,
                                   const char *db,
                                   const char *table_name,
                                   NDB_SHARE *share);
                                   my_bool share_may_exist);
int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
                            const char *event_name, NDB_SHARE *share);
int ndbcluster_create_event_ops(NDB_SHARE *share,