Commit 2ce03bb5 authored by unknown's avatar unknown
Browse files

ndb - rbr blobs etc: set db/schema in injector_ndb before calling NDB


storage/ndb/src/ndbapi/NdbBlob.cpp:
  DBUG
storage/ndb/include/ndbapi/Ndb.hpp:
  method to set db/schema from table name + format check in internalize_table_name
storage/ndb/include/ndbapi/NdbDictionary.hpp:
  method to set db/schema from table name + format check in internalize_table_name
storage/ndb/src/ndbapi/Ndb.cpp:
  method to set db/schema from table name + format check in internalize_table_name
sql/ha_ndbcluster_binlog.cc:
  set injector_ndb db/schema before calling NDB (may be more cases..).  only place to get it is table internal name
parent f8ddd2b2
Loading
Loading
Loading
Loading
+21 −2
Original line number Diff line number Diff line
@@ -2065,7 +2065,19 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
      DBUG_RETURN(-1);
    }

    NdbEventOperation *op= ndb->createEventOperation(event_name);
    NdbEventOperation* op;
    if (do_schema_share)
      op= ndb->createEventOperation(event_name);
    else
    {
      // set injector_ndb database/schema from table internal name
      int ret= ndb->setDatabaseAndSchemaName(ndbtab);
      assert(ret == 0);
      op= ndb->createEventOperation(event_name);
      // reset to catch errors
      ndb->setDatabaseName("");
      ndb->setDatabaseSchemaName("");
    }
    if (!op)
    {
      pthread_mutex_unlock(&injector_mutex);
@@ -2632,7 +2644,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
    goto err;
  }

  if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
  // empty database and schema
  if (!(ndb= new Ndb(g_ndb_cluster_connection, "", "")) ||
      ndb->init())
  {
    sql_print_error("NDB Binlog: Getting Ndb object failed");
@@ -2885,11 +2898,17 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
            DBUG_ASSERT(share != 0);
          }
#endif
          // set injector_ndb database/schema from table internal name
          int ret= ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
          assert(ret == 0);
          if ((unsigned) pOp->getEventType() <
              (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
            ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
          else
            ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
          // reset to catch errors
          ndb->setDatabaseName("");
          ndb->setDatabaseSchemaName("");

          pOp= ndb->nextEvent();
        } while (pOp && pOp->getGCI() == gci);
+9 −0
Original line number Diff line number Diff line
@@ -1147,6 +1147,15 @@ public:
   */
  void setDatabaseSchemaName(const char * aDatabaseSchemaName);

#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
  /** Set database and schema name to match previously retrieved table
   *
   * Returns non-zero if table internal name does not contain
   * non-empty database and schema names
   */
  int setDatabaseAndSchemaName(const NdbDictionary::Table* t);
#endif

  /**
   * Initializes the Ndb object
   *
+1 −0
Original line number Diff line number Diff line
@@ -884,6 +884,7 @@ public:

  private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
    friend class Ndb;
    friend class NdbDictionaryImpl;
    friend class NdbTableImpl;
    friend class NdbEventOperationImpl;
+33 −10
Original line number Diff line number Diff line
@@ -1035,39 +1035,37 @@ convertEndian(Uint32 Data)
  return Data;
#endif
}

// <internal>
const char * Ndb::getCatalogName() const
{
  return theImpl->m_dbname.c_str();
}


void Ndb::setCatalogName(const char * a_catalog_name)
{
  if (a_catalog_name)
  {
  // TODO can table_name_separator be escaped?
  if (a_catalog_name && ! strchr(a_catalog_name, table_name_separator)) {
    theImpl->m_dbname.assign(a_catalog_name);
    theImpl->update_prefix();
  }
}


const char * Ndb::getSchemaName() const
{
  return theImpl->m_schemaname.c_str();
}


void Ndb::setSchemaName(const char * a_schema_name)
{
  if (a_schema_name) {
  // TODO can table_name_separator be escaped?
  if (a_schema_name && ! strchr(a_schema_name, table_name_separator)) {
    theImpl->m_schemaname.assign(a_schema_name);
    theImpl->update_prefix();
  }
}
// </internal>
 
/*
Deprecated functions
*/
const char * Ndb::getDatabaseName() const
{
  return getCatalogName();
@@ -1088,6 +1086,24 @@ void Ndb::setDatabaseSchemaName(const char * a_schema_name)
  setSchemaName(a_schema_name);
}

int Ndb::setDatabaseAndSchemaName(const NdbDictionary::Table* t)
{
  const char* s0 = t->m_impl.m_internalName.c_str();
  const char* s1 = strchr(s0, table_name_separator);
  if (s1 && s1 != s0) {
    const char* s2 = strchr(s1 + 1, table_name_separator);
    if (s2 && s2 != s1 + 1) {
      char buf[200];
      sprintf(buf, "%.*s", s1 - s0, s0);
      setDatabaseName(buf);
      sprintf(buf, "%.*s", s2 - (s1 + 1), s1 + 1);
      setDatabaseSchemaName(buf);
      return 0;
    }
  }
  return -1;
}
 
bool Ndb::usingFullyQualifiedNames()
{
  return fullyQualifiedNames;
@@ -1149,9 +1165,16 @@ Ndb::internalize_table_name(const char *external_name) const
  if (fullyQualifiedNames)
  {
    /* Internal table name format <db>/<schema>/<table>
       <db>/<schema> is already available in m_prefix
       <db>/<schema>/ is already available in m_prefix
       so just concat the two strings
     */
#ifdef VM_TRACE
    // verify that m_prefix looks like abc/def/
    const char* s0 = theImpl->m_prefix.c_str();
    const char* s1 = s0 ? strchr(s0, table_name_separator) : 0;
    const char* s2 = s1 ? strchr(s1 + 1, table_name_separator) : 0;
    assert(s1 && s1 != s0 && s2 && s2 != s1 + 1 && *(s2 + 1) == 0);
#endif
    ret.assfmt("%s%s",
               theImpl->m_prefix.c_str(),
               external_name);
+8 −4
Original line number Diff line number Diff line
@@ -61,22 +61,26 @@ NdbBlob::setState(State newState)
int
NdbBlob::getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName)
{
  DBUG_ENTER("NdbBlob::getBlobTableName");
  NdbTableImpl* t = anNdb->theDictionary->m_impl.getTable(tableName);
  if (t == NULL)
    return -1;
    DBUG_RETURN(-1);
  NdbColumnImpl* c = t->getColumn(columnName);
  if (c == NULL)
    return -1;
    DBUG_RETURN(-1);
  getBlobTableName(btname, t, c);
  return 0;
  DBUG_RETURN(0);
}

void
NdbBlob::getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c)
{
  assert(t != 0 && c != 0 && c->getBlobType());
  DBUG_ENTER("NdbBlob::getBlobTableName");
  assert(t != 0 && c != 0 && c->getBlobType() && c->getPartSize() != 0);
  memset(btname, 0, NdbBlobImpl::BlobTableNameSize);
  sprintf(btname, "NDB$BLOB_%d_%d", (int)t->m_id, (int)c->m_column_no);
  DBUG_PRINT("info", ("blob table name: %s", btname));
  DBUG_VOID_RETURN;
}

void