Commit 572f03cb authored by unknown's avatar unknown
Browse files

ndb - bug#17761 blob tables patch 3a [requires next patch 3b]


storage/ndb/include/kernel/signaldata/DictTabInfo.hpp:
  parse blob table name (temp hack)
storage/ndb/src/common/debugger/signaldata/DictTabInfo.cpp:
  parse blob table name (temp hack)
storage/ndb/src/ndbapi/DictCache.cpp:
  do not put blob tables in ndb api dict cache
  main table is cached and blob tables are owned by its blob columns
storage/ndb/src/ndbapi/Ndb.cpp:
  do not put blob tables in ndb api dict cache
  main table is cached and blob tables are owned by its blob columns
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  do not put blob tables in ndb api dict cache
  main table is cached and blob tables are owned by its blob columns
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp:
  do not put blob tables in ndb api dict cache
  main table is cached and blob tables are owned by its blob columns
parent 40b55ac6
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -197,6 +197,10 @@ public:
    Undofile = 23           ///< Undofile
  };

  // used 1) until type BlobTable added 2) in upgrade code
  static bool
  isBlobTableName(const char* name, Uint32* ptab_id = 0, Uint32* pcol_no = 0);
  
  static inline bool
  isTable(int tableType) {
    return
+30 −0
Original line number Diff line number Diff line
@@ -260,3 +260,33 @@ DictFilegroupInfo::File::init(){
  FileSizeLo = 0;
  FileFreeExtents = 0;
}

// blob table name hack

bool
DictTabInfo::isBlobTableName(const char* name, Uint32* ptab_id, Uint32* pcol_no)
{ 
  const char* const prefix = "NDB$BLOB_";
  const char* s = strrchr(name, table_name_separator);
  s = (s == NULL ? name : s + 1);
  if (strncmp(s, prefix, strlen(prefix)) != 0)
    return false;
  s += strlen(prefix);
  uint i, n;
  for (i = 0, n = 0; '0' <= s[i] && s[i] <= '9'; i++)
    n += 10 * n + (s[i] - '0');
  if (i == 0 || s[i] != '_')
    return false;
  const uint tab_id = n;
  s = &s[i + 1];
  for (i = 0, n = 0; '0' <= s[i] && s[i] <= '9'; i++)
    n += 10 * n + (s[i] - '0');
  if (i == 0 || s[i] != 0)
    return false;
  const uint col_no = n;
  if (ptab_id)
    *ptab_id = tab_id;
  if (pcol_no)
    *pcol_no = col_no;
  return true;
}
+10 −0
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ static NdbTableImpl f_altered_table;
Ndb_local_table_info *
Ndb_local_table_info::create(NdbTableImpl *table_impl, Uint32 sz)
{
  assert(! is_ndb_blob_table(table_impl));
  Uint32 tot_size= sizeof(Ndb_local_table_info) - sizeof(Uint64)
    + ((sz+7) & ~7); // round to Uint64
  void *data= malloc(tot_size);
@@ -44,6 +45,7 @@ void Ndb_local_table_info::destroy(Ndb_local_table_info *info)

Ndb_local_table_info::Ndb_local_table_info(NdbTableImpl *table_impl)
{
  assert(! is_ndb_blob_table(table_impl));
  m_table_impl= table_impl;
}

@@ -61,18 +63,21 @@ LocalDictCache::~LocalDictCache(){

Ndb_local_table_info * 
LocalDictCache::get(const char * name){
  assert(! is_ndb_blob_table(name));
  const Uint32 len = strlen(name);
  return m_tableHash.getData(name, len);
}

void 
LocalDictCache::put(const char * name, Ndb_local_table_info * tab_info){
  assert(! is_ndb_blob_table(name));
  const Uint32 id = tab_info->m_table_impl->m_id;
  m_tableHash.insertKey(name, strlen(name), id, tab_info);
}

void
LocalDictCache::drop(const char * name){
  assert(! is_ndb_blob_table(name));
  Ndb_local_table_info *info= m_tableHash.deleteKey(name, strlen(name));
  DBUG_ASSERT(info != 0);
  Ndb_local_table_info::destroy(info);
@@ -142,6 +147,7 @@ GlobalDictCache::get(const char * name)
{
  DBUG_ENTER("GlobalDictCache::get");
  DBUG_PRINT("enter", ("name: %s", name));
  assert(! is_ndb_blob_table(name));

  const Uint32 len = strlen(name);
  Vector<TableVersion> * versions = 0;
@@ -196,6 +202,7 @@ GlobalDictCache::put(const char * name, NdbTableImpl * tab)
                       tab ? tab->m_internalName.c_str() : "tab NULL",
                       tab ? tab->m_version & 0xFFFFFF : 0,
                       tab ? tab->m_version >> 24 : 0));
  assert(! is_ndb_blob_table(name));

  const Uint32 len = strlen(name);
  Vector<TableVersion> * vers = m_tableHash.getData(name, len);
@@ -261,6 +268,7 @@ GlobalDictCache::drop(NdbTableImpl * tab)
{
  DBUG_ENTER("GlobalDictCache::drop");
  DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
  assert(! is_ndb_blob_table(tab));

  unsigned i;
  const Uint32 len = strlen(tab->m_internalName.c_str());
@@ -316,6 +324,7 @@ GlobalDictCache::release(NdbTableImpl * tab)
{
  DBUG_ENTER("GlobalDictCache::release");
  DBUG_PRINT("enter", ("internal_name: %s", tab->m_internalName.c_str()));
  assert(! is_ndb_blob_table(tab));

  unsigned i;
  const Uint32 len = strlen(tab->m_internalName.c_str());
@@ -365,6 +374,7 @@ GlobalDictCache::alter_table_rep(const char * name,
				 Uint32 tableVersion,
				 bool altered)
{
  assert(! is_ndb_blob_table(name));
  const Uint32 len = strlen(name);
  Vector<TableVersion> * vers = 
    m_tableHash.getData(name, len);
+2 −2
Original line number Diff line number Diff line
@@ -754,7 +754,7 @@ Ndb::getAutoIncrementValue(const char* aTableName, Uint32 cacheSize)
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0)
    DBUG_RETURN(~(Uint64)0);
  const NdbTableImpl *table= info->m_table_impl;
@@ -846,7 +846,7 @@ Ndb::setAutoIncrementValue(const char* aTableName, Uint64 val, bool increase)
  BaseString internal_tabname(internalize_table_name(aTableName));

  Ndb_local_table_info *info=
    theDictionary->get_local_table_info(internal_tabname, false);
    theDictionary->get_local_table_info(internal_tabname);
  if (info == 0) {
    theError= theDictionary->getNdbError();
    DBUG_RETURN(false);
+80 −67
Original line number Diff line number Diff line
@@ -49,6 +49,18 @@

extern Uint64 g_latest_trans_gci;

bool
is_ndb_blob_table(const char* name)
{
  return DictTabInfo::isBlobTableName(name);
}

bool
is_ndb_blob_table(const NdbTableImpl* t)
{
  return is_ndb_blob_table(t->m_internalName.c_str());
}

//#define EVENT_DEBUG

/**
@@ -239,6 +251,9 @@ NdbColumnImpl::init(Type t)

NdbColumnImpl::~NdbColumnImpl()
{
  if (m_blobTable != NULL)
    delete m_blobTable;
  m_blobTable = NULL;
}

bool
@@ -1282,6 +1297,14 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
  if (impl == 0){
    impl = m_receiver.getTable(internalTableName,
			       m_ndb.usingFullyQualifiedNames());
    if (impl != 0) {
      int ret = getBlobTables(*impl);
      if (ret != 0) {
        delete impl;
        return 0;
      }
    }

    m_globalHash->lock();
    m_globalHash->put(internalTableName.c_str(), impl);
    m_globalHash->unlock();
@@ -1307,6 +1330,9 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
{
  NdbTableImpl *old;

  int ret = getBlobTables(*impl);
  assert(ret == 0);

  m_globalHash->lock();
  if ((old= m_globalHash->get(impl->m_internalName.c_str())))
  {
@@ -1322,12 +1348,41 @@ NdbDictionaryImpl::putTable(NdbTableImpl *impl)
  
  m_localHash.put(impl->m_internalName.c_str(), info);

  addBlobTables(*impl);

  m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
  m_ndb.theLastTupleId[impl->getTableId()]  = ~0;
}

int
NdbDictionaryImpl::getBlobTables(NdbTableImpl &t)
{
  unsigned n= t.m_noOfBlobs;
  DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
  // optimized for blob column being the last one
  // and not looking for more than one if not neccessary
  for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
    i--;
    NdbColumnImpl & c = *t.m_columns[i];
    if (! c.getBlobType() || c.getPartSize() == 0)
      continue;
    n--;
    // retrieve blob table def from DICT - by-pass cache
    char btname[NdbBlobImpl::BlobTableNameSize];
    NdbBlob::getBlobTableName(btname, &t, &c);
    BaseString btname_internal = m_ndb.internalize_table_name(btname);
    NdbTableImpl* bt =
      m_receiver.getTable(btname_internal, m_ndb.usingFullyQualifiedNames());
    if (bt == NULL)
      DBUG_RETURN(-1);

    // TODO check primary id/version when returned by DICT

    // the blob column owns the blob table
    assert(c.m_blobTable == NULL);
    c.m_blobTable = bt;
  }
  DBUG_RETURN(0); 
}

#if 0
bool
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
@@ -2148,31 +2203,6 @@ NdbDictionaryImpl::createBlobTables(NdbTableImpl &t)
  DBUG_RETURN(0); 
}

int
NdbDictionaryImpl::addBlobTables(NdbTableImpl &t)
{
  unsigned n= t.m_noOfBlobs;
  DBUG_ENTER("NdbDictionaryImpl::addBlobTables");
  // optimized for blob column being the last one
  // and not looking for more than one if not neccessary
  for (unsigned i = t.m_columns.size(); i > 0 && n > 0;) {
    i--;
    NdbColumnImpl & c = *t.m_columns[i];
    if (! c.getBlobType() || c.getPartSize() == 0)
      continue;
    n--;
    char btname[NdbBlobImpl::BlobTableNameSize];
    NdbBlob::getBlobTableName(btname, &t, &c);
    // Save BLOB table handle
    NdbTableImpl * cachedBlobTable = getTable(btname);
    if (cachedBlobTable == 0) {
      DBUG_RETURN(-1);
    }
    c.m_blobTable = cachedBlobTable;
  }
  DBUG_RETURN(0); 
}

int 
NdbDictInterface::createTable(Ndb & ndb,
			      NdbTableImpl & impl)
@@ -2188,7 +2218,7 @@ int NdbDictionaryImpl::alterTable(NdbTableImpl &impl)

  DBUG_ENTER("NdbDictionaryImpl::alterTable");
  Ndb_local_table_info * local = 0;
  if((local= get_local_table_info(originalInternalName, false)) == 0)
  if((local= get_local_table_info(originalInternalName)) == 0)
  {
    m_error.code = 709;
    DBUG_RETURN(-1);
@@ -2727,15 +2757,21 @@ NdbDictionaryImpl::dropBlobTables(NdbTableImpl & t)
    NdbColumnImpl & c = *t.m_columns[i];
    if (! c.getBlobType() || c.getPartSize() == 0)
      continue;
    char btname[NdbBlobImpl::BlobTableNameSize];
    NdbBlob::getBlobTableName(btname, &t, &c);
    if (dropTable(btname) != 0) {
      if (m_error.code != 709 && m_error.code != 723){
	DBUG_PRINT("exit",("error %u - exiting",m_error.code));
    NdbTableImpl* bt = c.m_blobTable;
    if (bt == NULL) {
      DBUG_PRINT("info", ("col %s: blob table pointer is NULL",
                          c.m_name.c_str()));
      continue; // "force" mode on
    }
    // drop directly - by-pass cache
    int ret = m_receiver.dropTable(*c.m_blobTable);
    if (ret != 0) {
      DBUG_PRINT("info", ("col %s: blob table %s: error %d",
                 c.m_name.c_str(), bt->m_internalName.c_str(), m_error.code));
      if (! (ret == 709 || ret == 723)) // "force" mode on
        DBUG_RETURN(-1);
    }
      DBUG_PRINT("info",("error %u - continuing",m_error.code));
    }
    // leave c.m_blobTable defined
  }
  DBUG_RETURN(0);
}
@@ -2794,52 +2830,30 @@ NdbDictInterface::execDROP_TABLE_REF(NdbApiSignal * signal,
}

int
NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl, bool lock)
NdbDictionaryImpl::invalidateObject(NdbTableImpl & impl)
{
  const char * internalTableName = impl.m_internalName.c_str();
  DBUG_ENTER("NdbDictionaryImpl::invalidateObject");
  DBUG_PRINT("enter", ("internal_name: %s", internalTableName));

  if (lock)
    m_globalHash->lock();
  if (impl.m_noOfBlobs != 0) {
    for (uint i = 0; i < impl.m_columns.size(); i++) {
      NdbColumnImpl& c = *impl.m_columns[i];
      if (! c.getBlobType() || c.getPartSize() == 0)
        continue;
      assert(c.m_blobTable != NULL);
      invalidateObject(*c.m_blobTable, false);
    }
  }
  m_localHash.drop(internalTableName);
  m_globalHash->lock();
  impl.m_status = NdbDictionary::Object::Invalid;
  m_globalHash->drop(&impl);
  if (lock)
  m_globalHash->unlock();
  DBUG_RETURN(0);
}

int
NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl, bool lock)
NdbDictionaryImpl::removeCachedObject(NdbTableImpl & impl)
{
  const char * internalTableName = impl.m_internalName.c_str();
  DBUG_ENTER("NdbDictionaryImpl::removeCachedObject");
  DBUG_PRINT("enter", ("internal_name: %s", internalTableName));

  if (lock)
    m_globalHash->lock();
  if (impl.m_noOfBlobs != 0) {
    for (uint i = 0; i < impl.m_columns.size(); i++) {
      NdbColumnImpl& c = *impl.m_columns[i];
      if (! c.getBlobType() || c.getPartSize() == 0)
        continue;
      assert(c.m_blobTable != NULL);
      removeCachedObject(*c.m_blobTable, false);
    }
  }
  m_localHash.drop(internalTableName);  
  m_globalHash->lock();
  m_globalHash->release(&impl);
  if (lock)
  m_globalHash->unlock();
  DBUG_RETURN(0);
}
@@ -2851,8 +2865,7 @@ NdbIndexImpl*
NdbDictionaryImpl::getIndexImpl(const char * externalName,
				const BaseString& internalName)
{
  Ndb_local_table_info * info = get_local_table_info(internalName,
						     false);
  Ndb_local_table_info * info = get_local_table_info(internalName);
  if(info == 0){
    m_error.code = 4243;
    return 0;
@@ -3503,7 +3516,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
    // We only have the table name with internal name
    DBUG_PRINT("info",("table %s", ev->getTableName()));
    Ndb_local_table_info *info;
    info= get_local_table_info(ev->getTableName(), true);
    info= get_local_table_info(ev->getTableName());
    if (info == 0)
    {
      DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
@@ -3513,7 +3526,7 @@ NdbDictionaryImpl::getEvent(const char * eventName, NdbTableImpl* tab)
    if (info->m_table_impl->m_status != NdbDictionary::Object::Retrieved)
    {
      removeCachedObject(*info->m_table_impl);
      info= get_local_table_info(ev->getTableName(), true);
      info= get_local_table_info(ev->getTableName());
      if (info == 0)
      {
        DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
Loading