Commit df3494ca authored by unknown's avatar unknown
Browse files

Bug #26021 - ndb: valgrind warning handle_trailing_share/ndbcluster_free_share invalid read


sql/ha_ndbcluster.cc:
  Bug #26021: make sure ndbcluster_create_event_ops even if no binlogging as flags NSF_NO_BINLOG is set there
  set NSS_DROPPED state in handle_trailing_share if appropriate
  added dbug printout and comments for NDB_SHARE references
sql/ha_ndbcluster_binlog.cc:
  Bug #26021: set NSF_NO_BINLOG if !ndb_binlog_running
  Bug #26021: only reuse share if share_may_exist
  setup apply_status table to be logged always, even if no binlogging
  added dbug printout and comments for NDB_SHARE references
parent c0cc9468
Loading
Loading
Loading
Loading
+144 −19
Original line number Diff line number Diff line
@@ -5009,11 +5009,17 @@ int ha_ndbcluster::create(const char *name,
      get a new share
    */

    /* ndb_share reference create */
    if (!(share= get_share(name, form, true, true)))
    {
      sql_print_error("NDB: allocating table share for %s failed", name);
      /* my_errno is set */
    }
    else
    {
      DBUG_PRINT("NDB_SHARE", ("%s binlog create  use_count: %u",
                               share->key, share->use_count));
    }
    pthread_mutex_unlock(&ndbcluster_mutex);

    while (!IS_TMP_PREFIX(m_tabname))
@@ -5037,7 +5043,7 @@ int ha_ndbcluster::create(const char *name,
        if (ndb_extra_logging)
          sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
                                event_name.c_ptr());
        if (share && do_event_op &&
        if (share && 
            ndbcluster_create_event_ops(share, m_table, event_name.c_ptr()))
        {
          sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
@@ -5129,6 +5135,9 @@ int ha_ndbcluster::create_handler_files(const char *file,
  }
  
  set_ndb_share_state(m_share, NSS_INITIAL);
  /* ndb_share reference schema(?) free */
  DBUG_PRINT("NDB_SHARE", ("%s binlog schema(?) free  use_count: %u",
                           m_share->key, m_share->use_count));
  free_share(&m_share); // Decrease ref_count

  DBUG_RETURN(error);
@@ -5255,7 +5264,10 @@ int ha_ndbcluster::create_ndb_index(const char *name,
*/ 
void ha_ndbcluster::prepare_for_alter()
{
  /* ndb_share reference schema */
  ndbcluster_get_share(m_share); // Increase ref_count
  DBUG_PRINT("NDB_SHARE", ("%s binlog schema  use_count: %u",
                           m_share->key, m_share->use_count));
  set_ndb_share_state(m_share, NSS_ALTERED);
}

@@ -5289,6 +5301,9 @@ int ha_ndbcluster::add_index(TABLE *table_arg,
  if (error)
  {
    set_ndb_share_state(m_share, NSS_INITIAL);
    /* ndb_share reference schema free */
    DBUG_PRINT("NDB_SHARE", ("%s binlog schema free  use_count: %u",
                             m_share->key, m_share->use_count));
    free_share(&m_share); // Decrease ref_count
  }
  DBUG_RETURN(error);  
@@ -5333,6 +5348,9 @@ int ha_ndbcluster::final_drop_index(TABLE *table_arg)
  if((error= drop_indexes(ndb, table_arg)))
  {
    m_share->state= NSS_INITIAL;
    /* ndb_share reference schema free */
    DBUG_PRINT("NDB_SHARE", ("%s binlog schema free  use_count: %u",
                             m_share->key, m_share->use_count));
    free_share(&m_share); // Decrease ref_count
  }
  DBUG_RETURN(error);
@@ -5374,9 +5392,12 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
  int ndb_table_id= orig_tab->getObjectId();
  int ndb_table_version= orig_tab->getObjectVersion();

  /* ndb_share reference temporary */
  NDB_SHARE *share= get_share(from, 0, false);
  if (share)
  {
    DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                             share->key, share->use_count));
    int r= rename_share(share, to);
    DBUG_ASSERT(r == 0);
  }
@@ -5400,6 +5421,9 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
    {
      int r= rename_share(share, from);
      DBUG_ASSERT(r == 0);
      /* ndb_share reference temporary free */
      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
    }
#endif
@@ -5412,7 +5436,12 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
    // ToDo in 4.1 should rollback alter table...
#ifdef HAVE_NDB_BINLOG
    if (share)
    {
      /* ndb_share reference temporary free */
      DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
    }
#endif
    DBUG_RETURN(result);
  }
@@ -5446,7 +5475,7 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
      if (ndb_extra_logging)
        sql_print_information("NDB Binlog: RENAME Event: %s",
                              event_name.c_ptr());
      if (share && ndb_binlog_running &&
      if (share &&
          ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
      {
        sql_print_error("NDB Binlog: FAILED create event operations "
@@ -5493,7 +5522,12 @@ int ha_ndbcluster::rename_table(const char *from, const char *to)
    }
  }
  if (share)
  {
    /* ndb_share reference temporary free */
    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                             share->key, share->use_count));
    free_share(&share);
  }
#endif

  DBUG_RETURN(result);
@@ -5528,7 +5562,13 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
    DBUG_PRINT("info", ("Schema distribution table not setup"));
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
  }
  /* ndb_share reference temporary */
  NDB_SHARE *share= get_share(path, 0, false);
  if (share)
  {
    DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                             share->key, share->use_count));
  }
#endif

  /* Drop the table from NDB */
@@ -5608,9 +5648,14 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
          The share kept by the server has not been freed, free it
        */
        share->state= NSS_DROPPED;
        /* ndb_share reference create free */
        DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                                 share->key, share->use_count));
        free_share(&share, TRUE);
      }
      /* free the share taken above */
      /* ndb_share reference temporary free */
      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share, TRUE);
      pthread_mutex_unlock(&ndbcluster_mutex);
    }
@@ -5660,9 +5705,14 @@ ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
        The share kept by the server has not been freed, free it
      */
      share->state= NSS_DROPPED;
      /* ndb_share reference create free */
      DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share, TRUE);
    }
    /* free the share taken above */
    /* ndb_share reference temporary free */
    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                             share->key, share->use_count));
    free_share(&share, TRUE);
    pthread_mutex_unlock(&ndbcluster_mutex);
  }
@@ -5838,6 +5888,9 @@ ha_ndbcluster::~ha_ndbcluster()

  if (m_share)
  {
    /* ndb_share reference handler free */
    DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
                             m_share->key, m_share->use_count));
    free_share(&m_share);
  }
  release_metadata(thd, ndb);
@@ -5900,14 +5953,21 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
  DBUG_PRINT("info", ("ref_length: %d", ref_length));

  // Init table lock structure 
  /* ndb_share reference handler */
  if (!(m_share=get_share(name, table)))
    DBUG_RETURN(1);
  DBUG_PRINT("NDB_SHARE", ("%s handler  use_count: %u",
                           m_share->key, m_share->use_count));
  thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
  
  set_dbname(name);
  set_tabname(name);
  
  if (check_ndb_connection()) {
  if (check_ndb_connection())
  {
    /* ndb_share reference handler free */
    DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
                             m_share->key, m_share->use_count));
    free_share(&m_share);
    m_share= 0;
    DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -5968,6 +6028,9 @@ int ha_ndbcluster::close(void)
  DBUG_ENTER("close");
  THD *thd= current_thd;
  Ndb *ndb= thd ? check_ndb_in_thd(thd) : g_ndb;
  /* ndb_share reference handler free */
  DBUG_PRINT("NDB_SHARE", ("%s handler free  use_count: %u",
                           m_share->key, m_share->use_count));
  free_share(&m_share);
  m_share= 0;
  release_metadata(thd, ndb);
@@ -6074,7 +6137,13 @@ int ndbcluster_discover(handlerton *hton, THD* thd, const char *db,
  ndb->setDatabaseName(db);
  NDBDICT* dict= ndb->getDictionary();
  build_table_filename(key, sizeof(key), db, name, "", 0);
  /* ndb_share reference temporary */
  NDB_SHARE *share= get_share(key, 0, false);
  if (share)
  {
    DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                             share->key, share->use_count));
  }
  if (share && get_ndb_share_state(share) == NSS_ALTERED)
  {
    // Frm has been altered on disk, but not yet written to ndb
@@ -6120,12 +6189,22 @@ int ndbcluster_discover(handlerton *hton, THD* thd, const char *db,
  *frmblob= data;
  
  if (share)
  {
    /* ndb_share reference temporary free */
    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                             share->key, share->use_count));
    free_share(&share);
  }

  DBUG_RETURN(0);
err:
  if (share)
  {
    /* ndb_share reference temporary free */
    DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                             share->key, share->use_count));
    free_share(&share);
  }
  if (ndb_error.code)
  {
    ERR_RETURN(ndb_error);
@@ -6362,7 +6441,13 @@ int ndbcluster_find_all_files(THD *thd)
      }
      else if (cmp_frm(ndbtab, pack_data, pack_length))
      {
        /* ndb_share reference temporary */
        NDB_SHARE *share= get_share(key, 0, false);
        if (share)
        {
          DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                   share->key, share->use_count));
        }
        if (!share || get_ndb_share_state(share) != NSS_ALTERED)
        {
          discover= 1;
@@ -6370,8 +6455,13 @@ int ndbcluster_find_all_files(THD *thd)
                                elmt.database, elmt.name);
        }
        if (share)
        {
          /* ndb_share reference temporary free */
          DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                   share->key, share->use_count));
          free_share(&share);
        }
      }
      my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
      my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));

@@ -7175,7 +7265,10 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
    DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name));
    DBUG_RETURN(1);
  }
  /* ndb_share reference temporary, free below */
  share->use_count++;
  DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                           share->key, share->use_count));
  pthread_mutex_unlock(&ndbcluster_mutex);

  pthread_mutex_lock(&share->mutex);
@@ -7188,6 +7281,9 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
      DBUG_PRINT("info", ("Getting commit_count: %s from share",
                          llstr(share->commit_count, buff)));
      pthread_mutex_unlock(&share->mutex);
      /* ndb_share reference temporary free */
      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
      DBUG_RETURN(0);
    }
@@ -7206,6 +7302,9 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
    if (ndbtab_g.get_table() == 0
        || ndb_get_table_statistics(NULL, false, ndb, ndbtab_g.get_table(), &stat))
    {
      /* ndb_share reference temporary free */
      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
      DBUG_RETURN(1);
    }
@@ -7226,6 +7325,9 @@ uint ndb_get_commitcount(THD *thd, char *dbname, char *tabname,
    *commit_count= 0;
  }
  pthread_mutex_unlock(&share->mutex);
  /* ndb_share reference temporary free */
  DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                           share->key, share->use_count));
  free_share(&share);
  DBUG_RETURN(0);
}
@@ -7442,7 +7544,10 @@ int handle_trailing_share(NDB_SHARE *share)
  static ulong trailing_share_id= 0;
  DBUG_ENTER("handle_trailing_share");

  /* ndb_share reference temporary, free below */
  ++share->use_count;
  DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                           share->key, share->use_count));
  pthread_mutex_unlock(&ndbcluster_mutex);

  TABLE_LIST table_list;
@@ -7453,10 +7558,14 @@ int handle_trailing_share(NDB_SHARE *share)
  close_cached_tables(thd, 0, &table_list, TRUE);

  pthread_mutex_lock(&ndbcluster_mutex);
  /* ndb_share reference temporary free */
  DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                           share->key, share->use_count));
  if (!--share->use_count)
  {
    if (ndb_extra_logging)
      sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) "
      sql_print_information("NDB_SHARE: trailing share "
                            "%s(connect_count: %u) "
                            "released by close_cached_tables at "
                            "connect_count: %u",
                            share->key,
@@ -7470,10 +7579,19 @@ int handle_trailing_share(NDB_SHARE *share)
    share still exists, if share has not been dropped by server
    release that share
  */
  if (share->state != NSS_DROPPED && !--share->use_count)
  if (share->state != NSS_DROPPED)
  {
    share->state= NSS_DROPPED;
    /* ndb_share reference create free */
    DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                             share->key, share->use_count));
    --share->use_count;

    if (share->use_count == 0)
    {
      if (ndb_extra_logging)
      sql_print_information("NDB_SHARE: trailing share %s(connect_count: %u) "
        sql_print_information("NDB_SHARE: trailing share "
                              "%s(connect_count: %u) "
                              "released after NSS_DROPPED check "
                              "at connect_count: %u",
                              share->key,
@@ -7482,8 +7600,7 @@ int handle_trailing_share(NDB_SHARE *share)
      ndbcluster_real_free_share(&share);
      DBUG_RETURN(0);
    }
  DBUG_PRINT("error", ("NDB_SHARE: %s already exists  use_count=%d.",
                       share->key, share->use_count));
  }

  sql_print_error("NDB_SHARE: %s already exists  use_count=%d."
                  " Moving away for safety, but possible memleak.",
@@ -8540,7 +8657,10 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
        continue; // injector thread is the only user, skip statistics
      share->util_lock= current_thd; // Mark that util thread has lock
#endif /* HAVE_NDB_BINLOG */
      /* ndb_share reference temporary, free below */
      share->use_count++; /* Make sure the table can't be closed */
      DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                               share->key, share->use_count));
      DBUG_PRINT("ndb_util_thread",
                 ("Found open table[%d]: %s, use_count: %d",
                  i, share->table_name, share->use_count));
@@ -8561,6 +8681,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
        /*
          Util thread and injector thread is the only user, skip statistics
	*/
        /* ndb_share reference temporary free */
        DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                 share->key, share->use_count));
        free_share(&share);
        continue;
      }
@@ -8604,7 +8727,9 @@ pthread_handler_t ndb_util_thread_func(void *arg __attribute__((unused)))
        share->commit_count= stat.commit_count;
      pthread_mutex_unlock(&share->mutex);

      /* Decrease the use count and possibly free share */
      /* ndb_share reference temporary free */
      DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
    }

+100 −9
Original line number Diff line number Diff line
@@ -371,6 +371,10 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
      strcmp(share->db, NDB_REP_DB) == 0 &&
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
    do_event_op= 1;
  else if (!ndb_apply_status_share &&
           strcmp(share->db, NDB_REP_DB) == 0 &&
           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
    do_event_op= 1;

  {
    int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
@@ -606,7 +610,14 @@ static int ndbcluster_binlog_end(THD *thd)
        DBUG_PRINT("share",
                   ("table->s->db.table_name: %s.%s",
                    share->table->s->db.str, share->table->s->table_name.str));
      if (share->state != NSS_DROPPED && !--share->use_count)
      /* ndb_share reference create free */
      if (share->state != NSS_DROPPED)
      {
        DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                                 share->key, share->use_count));
        --share->use_count;
      }
      if (share->use_count == 0)
        ndbcluster_real_free_share(&share);
      else
      {
@@ -1716,15 +1727,25 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
  (void) pthread_cond_signal(&injector_cond);

  pthread_mutex_lock(&ndbcluster_mutex);
  /* ndb_share reference binlog free */
  DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                           share->key, share->use_count));
  free_share(&share, TRUE);
  if (is_remote_change && share && share->state != NSS_DROPPED)
  {
    DBUG_PRINT("info", ("remote change"));
    share->state= NSS_DROPPED;
    if (share->use_count != 1)
    {
      /* open handler holding reference */
      /* wait with freeing create ndb_share to below */
      do_close_cached_tables= TRUE;
    }
    else
    {
      /* ndb_share reference create free */
      DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share, TRUE);
      share= 0;
    }
@@ -1747,6 +1768,9 @@ ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
    table_list.db= (char *)dbname;
    table_list.alias= table_list.table_name= (char *)tabname;
    close_cached_tables(thd, 0, &table_list);
    /* ndb_share reference create free */
    DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
                             share->key, share->use_count));
    free_share(&share);
  }
  DBUG_RETURN(0);
@@ -1814,7 +1838,13 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
          char key[FN_REFLEN];
          build_table_filename(key, sizeof(key),
                               schema->db, schema->name, "", 0);
          /* ndb_share reference temporary, free below */
          NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
          if (share)
          {
            DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                     share->key, share->use_count));
          }
          // invalidation already handled by binlog thread
          if (!share || !share->op)
          {
@@ -1830,9 +1860,14 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
            table_list.alias= table_list.table_name= schema->name;
            close_cached_tables(thd, 0, &table_list, FALSE);
          }
          /* ndb_share reference temporary free */
          if (share)
          {
            DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                     share->key, share->use_count));
            free_share(&share);
          }
        }
        // fall through
        case SOT_CREATE_TABLE:
          pthread_mutex_lock(&LOCK_open);
@@ -1940,6 +1975,10 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,

      /* begin protect ndb_schema_share */
      pthread_mutex_lock(&ndb_schema_share_mutex);
      /* ndb_share reference binlog extra free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                               ndb_schema_share->key,
                               ndb_schema_share->use_count));
      free_share(&ndb_schema_share);
      ndb_schema_share= 0;
      pthread_mutex_unlock(&ndb_schema_share_mutex);
@@ -2066,7 +2105,13 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
        pthread_mutex_unlock(&ndbcluster_mutex);
        continue;
      }
      /* ndb_share reference temporary, free below */
      NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
      if (share)
      {
        DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
                                 share->key, share->use_count));
      }
      switch (schema_type)
      {
      case SOT_DROP_DB:
@@ -2111,6 +2156,9 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
          */
          if (share)
          {
            /* ndb_share reference temporary free */
            DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                     share->key, share->use_count));
            free_share(&share);
            share= 0;
          }
@@ -2143,6 +2191,9 @@ ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
      }
      if (share)
      {
        /* ndb_share reference temporary free */
        DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
                                 share->key, share->use_count));
        free_share(&share);
        share= 0;
      }
@@ -2445,10 +2496,8 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
      pthread_mutex_unlock(&ndbcluster_mutex);
      DBUG_RETURN(1);
    }
#ifdef NOT_YET
    if (share->connect_count != 
    if (!share_may_exist || share->connect_count != 
        g_ndb_cluster_connection->get_connect_count())
#endif
    {
      handle_trailing_share(share);
      share= NULL;
@@ -2456,23 +2505,33 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
  }

  /* Create share which is needed to hold replication information */
#ifdef NOT_YET
  if (share)
  {
    /* ndb_share reference create */
    ++share->use_count;
    DBUG_PRINT("NDB_SHARE", ("%s create  use_count: %u",
                             share->key, share->use_count));
  }
  else
#endif
  if (!(share= get_share(key, 0, TRUE, TRUE)))
  /* ndb_share reference create */
  else if (!(share= get_share(key, 0, TRUE, TRUE)))
  {
    sql_print_error("NDB Binlog: "
                    "allocating table share for %s failed", key);
  }
  else
  {
    DBUG_PRINT("NDB_SHARE", ("%s create  use_count: %u",
                             share->key, share->use_count));
  }

  if (!ndb_schema_share &&
      strcmp(share->db, NDB_REP_DB) == 0 &&
      strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
    do_event_op= 1;
  else if (!ndb_apply_status_share &&
           strcmp(share->db, NDB_REP_DB) == 0 &&
           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
    do_event_op= 1;

  if (!do_event_op)
  {
@@ -2744,7 +2803,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
  else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
           strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
    do_ndb_apply_status_share= 1;
  else if (!binlog_filter->db_ok(share->db))
  else if (!binlog_filter->db_ok(share->db) || !ndb_binlog_running)
  {
    share->flags|= NSF_NO_BINLOG;
    DBUG_RETURN(0);
@@ -2756,6 +2815,9 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,

    DBUG_ASSERT(share->use_count > 1);
    sql_print_error("NDB Binlog: discover reusing old ev op");
    /* ndb_share reference ToDo free */
    DBUG_PRINT("NDB_SHARE", ("%s ToDo free  use_count: %u",
                             share->key, share->use_count));
    free_share(&share); // old event op already has reference
    DBUG_RETURN(0);
  }
@@ -2898,15 +2960,24 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
    break;
  }

  /* ndb_share reference binlog */
  get_share(share);
  DBUG_PRINT("NDB_SHARE", ("%s binlog  use_count: %u",
                           share->key, share->use_count));
  if (do_ndb_apply_status_share)
  {
    /* ndb_share reference binlog extra */
    ndb_apply_status_share= get_share(share);
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
                             share->key, share->use_count));
    (void) pthread_cond_signal(&injector_cond);
  }
  else if (do_ndb_schema_share)
  {
    /* ndb_share reference binlog extra */
    ndb_schema_share= get_share(share);
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
                             share->key, share->use_count));
    (void) pthread_cond_signal(&injector_cond);
  }

@@ -3093,6 +3164,9 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
      /* ndb_share reference binlog extra free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                               share->key, share->use_count));
      free_share(&ndb_apply_status_share);
      ndb_apply_status_share= 0;
    }
@@ -3109,6 +3183,9 @@ ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
          ndb_binlog_tables_inited && ndb_binlog_running)
        sql_print_information("NDB Binlog: ndb tables initially "
                              "read only on reconnect.");
      /* ndb_share reference binlog extra free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                               share->key, share->use_count));
      free_share(&ndb_apply_status_share);
      ndb_apply_status_share= 0;
    }
@@ -4009,6 +4086,10 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)

  if (ndb_apply_status_share)
  {
    /* ndb_share reference binlog extra free */
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                             ndb_apply_status_share->key,
                             ndb_apply_status_share->use_count));
    free_share(&ndb_apply_status_share);
    ndb_apply_status_share= 0;
  }
@@ -4016,6 +4097,10 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
  {
    /* begin protect ndb_schema_share */
    pthread_mutex_lock(&ndb_schema_share_mutex);
    /* ndb_share reference binlog extra free */
    DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
                             ndb_schema_share->key,
                             ndb_schema_share->use_count));
    free_share(&ndb_schema_share);
    ndb_schema_share= 0;
    pthread_mutex_unlock(&ndb_schema_share_mutex);
@@ -4037,6 +4122,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
      DBUG_ASSERT(share->op == op ||
                  share->op_old == op);
      share->op= share->op_old= 0;
      /* ndb_share reference binlog free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
      s_ndb->dropEventOperation(op);
    }
@@ -4057,6 +4145,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
      DBUG_ASSERT(share->op == op ||
                  share->op_old == op);
      share->op= share->op_old= 0;
      /* ndb_share reference binlog free */
      DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
                               share->key, share->use_count));
      free_share(&share);
      i_ndb->dropEventOperation(op);
    }