Commit 8bc7214f authored by unknown's avatar unknown
Browse files

ndb - bug#17813 schema.query => blob


storage/ndb/include/ndbapi/NdbDictionary.hpp:
  ER_ALL: omit unchanged blob inlines
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  ER_ALL: omit unchanged blob inlines
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp:
  ER_ALL: omit unchanged blob inlines
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  ER_ALL: omit unchanged blob inlines
storage/ndb/src/ndbapi/NdbBlob.cpp:
  atNextEvent: ignore non-data event
sql/ha_ndbcluster_binlog.cc:
  schema.query => blob
storage/ndb/tools/restore/Restore.cpp:
  allow 'system tables' to have blob attrs
storage/ndb/tools/restore/Restore.hpp:
  allow 'system tables' to have blob attrs
storage/ndb/tools/restore/restore_main.cpp:
  allow 'system tables' to have blob attrs
parent 187ff695
Loading
Loading
Loading
Loading
+58 −22
Original line number Diff line number Diff line
@@ -746,10 +746,10 @@ static int ndbcluster_create_schema_table(THD *thd)
  */
  end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
                   NDB_REP_DB "." NDB_SCHEMA_TABLE
                   " ( db VARCHAR(63) NOT NULL,"
                   " name VARCHAR(63) NOT NULL,"
                   " ( db VARBINARY(63) NOT NULL,"
                   " name VARBINARY(63) NOT NULL,"
                   " slock BINARY(32) NOT NULL,"
                   " query VARCHAR(4094) NOT NULL,"
                   " query BLOB NOT NULL,"
                   " node_id INT UNSIGNED NOT NULL,"
                   " epoch BIGINT UNSIGNED NOT NULL,"
                   " id INT UNSIGNED NOT NULL,"
@@ -802,7 +802,6 @@ void ndbcluster_setup_binlog_table_shares(THD *thd)
#define SCHEMA_TYPE_I 8u
#define SCHEMA_SIZE 9u
#define SCHEMA_SLOCK_SIZE 32u
#define SCHEMA_QUERY_SIZE 4096u

struct Cluster_schema
{
@@ -813,7 +812,7 @@ struct Cluster_schema
  unsigned char slock_length;
  uint32 slock[SCHEMA_SLOCK_SIZE/4];
  unsigned short query_length;
  char query[SCHEMA_QUERY_SIZE];
  char *query;
  Uint64 epoch;
  uint32 node_id;
  uint32 id;
@@ -824,10 +823,26 @@ struct Cluster_schema
/*
  Transfer schema table data into corresponding struct
*/
static void ndbcluster_get_schema(TABLE *table,
static void ndbcluster_get_schema(NDB_SHARE *share,
                                  Cluster_schema *s)
{
  TABLE *table= share->table;
  Field **field;
  /* unpack blob values */
  byte* blobs_buffer= 0;
  uint blobs_buffer_size= 0;
  {
    ptrdiff_t ptrdiff= 0;
    int ret= get_ndb_blobs_value(table, share->ndb_value[0],
                                 blobs_buffer, blobs_buffer_size,
                                 ptrdiff);
    if (ret != 0)
    {
      my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
      DBUG_PRINT("info", ("blob read error"));
      DBUG_ASSERT(false);
    }
  }
  /* db varchar 1 length byte */
  field= table->field;
  s->db_length= *(uint8*)(*field)->ptr;
@@ -847,13 +862,19 @@ static void ndbcluster_get_schema(TABLE *table,
  s->slock_length= (*field)->field_length;
  DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
  memcpy(s->slock, (*field)->ptr, s->slock_length);
  /* query varchar 2 length bytes */
  /* query blob */
  field++;
  s->query_length= uint2korr((*field)->ptr);
  DBUG_ASSERT(s->query_length <= (*field)->field_length);
  DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query));
  memcpy(s->query, (*field)->ptr + 2, s->query_length);
  s->query[s->query_length]= 0;
  {
    Field_blob *field_blob= (Field_blob*)(*field);
    uint blob_len= field_blob->get_length((*field)->ptr);
    char *blob_ptr= 0;
    field_blob->get_ptr(&blob_ptr);
    assert(blob_len == 0 || blob_ptr != 0);
    s->query_length= blob_len;
    s->query= sql_alloc(blob_len+1);
    memcpy(s->query, blob_ptr, blob_len);
    s->query[blob_len]= 0;
  }
  /* node_id */
  field++;
  s->node_id= ((Field_long *)*field)->val_int();
@@ -869,6 +890,8 @@ static void ndbcluster_get_schema(TABLE *table,
  /* type */
  field++;
  s->type= ((Field_long *)*field)->val_int();
  /* free blobs buffer */
  my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
}

/*
@@ -1013,7 +1036,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
  char save_db[FN_REFLEN];
  strcpy(save_db, ndb->getDatabaseName());

  char tmp_buf[SCHEMA_QUERY_SIZE];
  char tmp_buf[FN_REFLEN];
  NDBDICT *dict= ndb->getDictionary();
  ndb->setDatabaseName(NDB_REP_DB);
  const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
@@ -1037,10 +1060,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
    for (i= 0; i < SCHEMA_SIZE; i++)
    {
      col[i]= ndbtab->getColumn(i);
      if (i != SCHEMA_QUERY_I)
      {
        sz[i]= col[i]->getLength();
        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
      }
    }
  }

  while (1)
  {
@@ -1068,9 +1094,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
      r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap);
      DBUG_ASSERT(r == 0);
      /* query */
      ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length);
      r|= op->setValue(SCHEMA_QUERY_I, tmp_buf);
      {
        NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
        DBUG_ASSERT(ndb_blob != 0);
        uint blob_len= query_length;
        const char* blob_ptr= query;
        r|= ndb_blob->setValue(blob_ptr, blob_len);
        DBUG_ASSERT(r == 0);
      }
      /* node_id */
      r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
      DBUG_ASSERT(r == 0);
@@ -1203,7 +1234,7 @@ ndbcluster_update_slock(THD *thd,
  char save_db[FN_HEADLEN];
  strcpy(save_db, ndb->getDatabaseName());

  char tmp_buf[SCHEMA_QUERY_SIZE];
  char tmp_buf[FN_REFLEN];
  NDBDICT *dict= ndb->getDictionary();
  ndb->setDatabaseName(NDB_REP_DB);
  const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE);
@@ -1227,10 +1258,13 @@ ndbcluster_update_slock(THD *thd,
    for (i= 0; i < SCHEMA_SIZE; i++)
    {
      col[i]= ndbtab->getColumn(i);
      if (i != SCHEMA_QUERY_I)
      {
        sz[i]= col[i]->getLength();
        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
      }
    }
  }

  while (1)
  {
@@ -1506,7 +1540,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
      MY_BITMAP slock;
      bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false);
      uint node_id= g_ndb_cluster_connection->node_id();
      ndbcluster_get_schema(share->table, schema);
      ndbcluster_get_schema(share, schema);
      if (schema->node_id != node_id)
      {
        int log_query= 0, post_epoch_unlock= 0;
@@ -2265,6 +2299,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
  */

  DBUG_ENTER("ndbcluster_create_event_ops");
  DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
  DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));

  DBUG_ASSERT(share != 0);
@@ -2374,6 +2409,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
        else
        {
          DBUG_PRINT("info", ("%s blob", col_name));
          DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
          attr0.blob= op->getBlobHandle(col_name);
          attr1.blob= op->getPreBlobHandle(col_name);
          if (attr0.blob == NULL || attr1.blob == NULL)
+1 −1
Original line number Diff line number Diff line
@@ -1164,7 +1164,7 @@ public:
     */
    enum EventReport {
      ER_UPDATED = 0,
      ER_ALL = 1,
      ER_ALL = 1, // except not-updated blob inlines
      ER_SUBSCRIBE = 2
    };

+1 −0
Original line number Diff line number Diff line
@@ -878,6 +878,7 @@ ArrayPool<TupTriggerData> c_triggerPool;
      {}
    
    Bitmask<MAXNROFATTRIBUTESINWORDS> notNullAttributeMask;
    Bitmask<MAXNROFATTRIBUTESINWORDS> blobAttributeMask;
    
    ReadFunction* readFunctionArray;
    UpdateFunction* updateFunctionArray;
+6 −0
Original line number Diff line number Diff line
@@ -201,6 +201,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal)
    regTabPtr.p->m_no_of_attributes= noOfAttributes;
    
    regTabPtr.p->notNullAttributeMask.clear();
    regTabPtr.p->blobAttributeMask.clear();
    
    Uint32 offset[10];
    Uint32 tableDescriptorRef= allocTabDescr(regTabPtr.p, offset);
@@ -286,6 +287,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
  ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec);
  Uint32 attrId = signal->theData[2];
  Uint32 attrDescriptor = signal->theData[3];
  Uint32 extType = AttributeDescriptor::getType(attrDescriptor);
  // DICT sends charset number in upper half
  Uint32 csNumber = (signal->theData[4] >> 16);

@@ -353,6 +355,10 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal)
      regTabPtr.p->notNullAttributeMask.set(attrId);
    }

    if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT) {
      regTabPtr.p->blobAttributeMask.set(attrId);
    }

    switch (AttributeDescriptor::getArrayType(attrDescriptor)) {
    case NDB_ARRAYTYPE_FIXED:
    {
+12 −3
Original line number Diff line number Diff line
@@ -867,10 +867,19 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr,
  } else {
    ljam();
//--------------------------------------------------------------------
// All others send all attributes that are monitored
// All others send all attributes that are monitored, except:
// Omit unchanged blob inlines on update i.e.
// attributeMask & ~ (blobAttributeMask & ~ changeMask)
//--------------------------------------------------------------------
    numAttrsToRead = setAttrIds(trigPtr->attributeMask, 
				regTabPtr->m_no_of_attributes, &readBuffer[0]);
    Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
    attributeMask = trigPtr->attributeMask;
    if (regOperPtr->op_struct.op_type == ZUPDATE) {
      Bitmask<MAXNROFATTRIBUTESINWORDS> tmpMask = regTabPtr->blobAttributeMask;
      tmpMask.bitANDC(req_struct->changeMask);
      attributeMask.bitANDC(tmpMask);
    }
    numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes,
                                &readBuffer[0]);
  }
  ndbrequire(numAttrsToRead < MAX_ATTRIBUTES_IN_TABLE);
//--------------------------------------------------------------------
Loading