Loading sql/ha_ndbcluster_binlog.cc +58 −22 Original line number Diff line number Diff line Loading @@ -746,10 +746,10 @@ static int ndbcluster_create_schema_table(THD *thd) */ end= strmov(buf, "CREATE TABLE IF NOT EXISTS " NDB_REP_DB "." NDB_SCHEMA_TABLE " ( db VARCHAR(63) NOT NULL," " name VARCHAR(63) NOT NULL," " ( db VARBINARY(63) NOT NULL," " name VARBINARY(63) NOT NULL," " slock BINARY(32) NOT NULL," " query VARCHAR(4094) NOT NULL," " query BLOB NOT NULL," " node_id INT UNSIGNED NOT NULL," " epoch BIGINT UNSIGNED NOT NULL," " id INT UNSIGNED NOT NULL," Loading Loading @@ -802,7 +802,6 @@ void ndbcluster_setup_binlog_table_shares(THD *thd) #define SCHEMA_TYPE_I 8u #define SCHEMA_SIZE 9u #define SCHEMA_SLOCK_SIZE 32u #define SCHEMA_QUERY_SIZE 4096u struct Cluster_schema { Loading @@ -813,7 +812,7 @@ struct Cluster_schema unsigned char slock_length; uint32 slock[SCHEMA_SLOCK_SIZE/4]; unsigned short query_length; char query[SCHEMA_QUERY_SIZE]; char *query; Uint64 epoch; uint32 node_id; uint32 id; Loading @@ -824,10 +823,26 @@ struct Cluster_schema /* Transfer schema table data into corresponding struct */ static void ndbcluster_get_schema(TABLE *table, static void ndbcluster_get_schema(NDB_SHARE *share, Cluster_schema *s) { TABLE *table= share->table; Field **field; /* unpack blob values */ byte* blobs_buffer= 0; uint blobs_buffer_size= 0; { ptrdiff_t ptrdiff= 0; int ret= get_ndb_blobs_value(table, share->ndb_value[0], blobs_buffer, blobs_buffer_size, ptrdiff); if (ret != 0) { my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); DBUG_PRINT("info", ("blob read error")); DBUG_ASSERT(false); } } /* db varchar 1 length byte */ field= table->field; s->db_length= *(uint8*)(*field)->ptr; Loading @@ -847,13 +862,19 @@ static void ndbcluster_get_schema(TABLE *table, s->slock_length= (*field)->field_length; DBUG_ASSERT((*field)->field_length == sizeof(s->slock)); memcpy(s->slock, (*field)->ptr, s->slock_length); /* query varchar 2 length bytes */ /* query blob */ field++; s->query_length= uint2korr((*field)->ptr); DBUG_ASSERT(s->query_length <= (*field)->field_length); DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query)); memcpy(s->query, (*field)->ptr + 2, s->query_length); s->query[s->query_length]= 0; { Field_blob *field_blob= (Field_blob*)(*field); uint blob_len= field_blob->get_length((*field)->ptr); char *blob_ptr= 0; field_blob->get_ptr(&blob_ptr); assert(blob_len == 0 || blob_ptr != 0); s->query_length= blob_len; s->query= sql_alloc(blob_len+1); memcpy(s->query, blob_ptr, blob_len); s->query[blob_len]= 0; } /* node_id */ field++; s->node_id= ((Field_long *)*field)->val_int(); Loading @@ -869,6 +890,8 @@ static void ndbcluster_get_schema(TABLE *table, /* type */ field++; s->type= ((Field_long *)*field)->val_int(); /* free blobs buffer */ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); } /* Loading Loading @@ -1013,7 +1036,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, char save_db[FN_REFLEN]; strcpy(save_db, ndb->getDatabaseName()); char tmp_buf[SCHEMA_QUERY_SIZE]; char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Loading @@ -1037,10 +1060,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, for (i= 0; i < SCHEMA_SIZE; i++) { col[i]= ndbtab->getColumn(i); if (i != SCHEMA_QUERY_I) { sz[i]= col[i]->getLength(); DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); } } } while (1) { Loading Loading @@ -1068,9 +1094,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap); DBUG_ASSERT(r == 0); /* query */ ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length); r|= op->setValue(SCHEMA_QUERY_I, tmp_buf); { NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I); DBUG_ASSERT(ndb_blob != 0); uint blob_len= query_length; const char* blob_ptr= query; r|= ndb_blob->setValue(blob_ptr, blob_len); DBUG_ASSERT(r == 0); } /* node_id */ r|= op->setValue(SCHEMA_NODE_ID_I, node_id); DBUG_ASSERT(r == 0); Loading Loading @@ -1203,7 +1234,7 @@ ndbcluster_update_slock(THD *thd, char save_db[FN_HEADLEN]; strcpy(save_db, ndb->getDatabaseName()); char tmp_buf[SCHEMA_QUERY_SIZE]; char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Loading @@ -1227,10 +1258,13 @@ ndbcluster_update_slock(THD *thd, for (i= 0; i < SCHEMA_SIZE; i++) { col[i]= ndbtab->getColumn(i); if (i != SCHEMA_QUERY_I) { sz[i]= col[i]->getLength(); DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); } } } while (1) { Loading Loading @@ -1506,7 +1540,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, MY_BITMAP slock; bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(share->table, schema); ndbcluster_get_schema(share, schema); if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; Loading Loading @@ -2265,6 +2299,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ DBUG_ENTER("ndbcluster_create_event_ops"); DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name)); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); DBUG_ASSERT(share != 0); Loading Loading @@ -2374,6 +2409,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, else { DBUG_PRINT("info", ("%s blob", col_name)); DBUG_ASSERT(share->flags & NSF_BLOB_FLAG); attr0.blob= op->getBlobHandle(col_name); attr1.blob= op->getPreBlobHandle(col_name); if (attr0.blob == NULL || attr1.blob == NULL) Loading storage/ndb/include/ndbapi/NdbDictionary.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -1164,7 +1164,7 @@ public: */ enum EventReport { ER_UPDATED = 0, ER_ALL = 1, ER_ALL = 1, // except not-updated blob inlines ER_SUBSCRIBE = 2 }; Loading storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -878,6 +878,7 @@ ArrayPool<TupTriggerData> c_triggerPool; {} Bitmask<MAXNROFATTRIBUTESINWORDS> notNullAttributeMask; Bitmask<MAXNROFATTRIBUTESINWORDS> blobAttributeMask; ReadFunction* readFunctionArray; UpdateFunction* updateFunctionArray; Loading storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp +6 −0 Original line number Diff line number Diff line Loading @@ -201,6 +201,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) regTabPtr.p->m_no_of_attributes= noOfAttributes; regTabPtr.p->notNullAttributeMask.clear(); regTabPtr.p->blobAttributeMask.clear(); Uint32 offset[10]; Uint32 tableDescriptorRef= allocTabDescr(regTabPtr.p, offset); Loading Loading @@ -286,6 +287,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec); Uint32 attrId = signal->theData[2]; Uint32 attrDescriptor = signal->theData[3]; Uint32 extType = AttributeDescriptor::getType(attrDescriptor); // DICT sends charset number in upper half Uint32 csNumber = (signal->theData[4] >> 16); Loading Loading @@ -353,6 +355,10 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) regTabPtr.p->notNullAttributeMask.set(attrId); } if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT) { regTabPtr.p->blobAttributeMask.set(attrId); } switch (AttributeDescriptor::getArrayType(attrDescriptor)) { case NDB_ARRAYTYPE_FIXED: { Loading storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp +12 −3 Original line number Diff line number Diff line Loading @@ -867,10 +867,19 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, } else { ljam(); //-------------------------------------------------------------------- // All others send all attributes that are monitored // All others send all attributes that are monitored, except: // Omit unchanged blob inlines on update i.e. // attributeMask & ~ (blobAttributeMask & ~ changeMask) //-------------------------------------------------------------------- numAttrsToRead = setAttrIds(trigPtr->attributeMask, regTabPtr->m_no_of_attributes, &readBuffer[0]); Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask; attributeMask = trigPtr->attributeMask; if (regOperPtr->op_struct.op_type == ZUPDATE) { Bitmask<MAXNROFATTRIBUTESINWORDS> tmpMask = regTabPtr->blobAttributeMask; tmpMask.bitANDC(req_struct->changeMask); attributeMask.bitANDC(tmpMask); } numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes, &readBuffer[0]); } ndbrequire(numAttrsToRead < MAX_ATTRIBUTES_IN_TABLE); //-------------------------------------------------------------------- Loading Loading
sql/ha_ndbcluster_binlog.cc +58 −22 Original line number Diff line number Diff line Loading @@ -746,10 +746,10 @@ static int ndbcluster_create_schema_table(THD *thd) */ end= strmov(buf, "CREATE TABLE IF NOT EXISTS " NDB_REP_DB "." NDB_SCHEMA_TABLE " ( db VARCHAR(63) NOT NULL," " name VARCHAR(63) NOT NULL," " ( db VARBINARY(63) NOT NULL," " name VARBINARY(63) NOT NULL," " slock BINARY(32) NOT NULL," " query VARCHAR(4094) NOT NULL," " query BLOB NOT NULL," " node_id INT UNSIGNED NOT NULL," " epoch BIGINT UNSIGNED NOT NULL," " id INT UNSIGNED NOT NULL," Loading Loading @@ -802,7 +802,6 @@ void ndbcluster_setup_binlog_table_shares(THD *thd) #define SCHEMA_TYPE_I 8u #define SCHEMA_SIZE 9u #define SCHEMA_SLOCK_SIZE 32u #define SCHEMA_QUERY_SIZE 4096u struct Cluster_schema { Loading @@ -813,7 +812,7 @@ struct Cluster_schema unsigned char slock_length; uint32 slock[SCHEMA_SLOCK_SIZE/4]; unsigned short query_length; char query[SCHEMA_QUERY_SIZE]; char *query; Uint64 epoch; uint32 node_id; uint32 id; Loading @@ -824,10 +823,26 @@ struct Cluster_schema /* Transfer schema table data into corresponding struct */ static void ndbcluster_get_schema(TABLE *table, static void ndbcluster_get_schema(NDB_SHARE *share, Cluster_schema *s) { TABLE *table= share->table; Field **field; /* unpack blob values */ byte* blobs_buffer= 0; uint blobs_buffer_size= 0; { ptrdiff_t ptrdiff= 0; int ret= get_ndb_blobs_value(table, share->ndb_value[0], blobs_buffer, blobs_buffer_size, ptrdiff); if (ret != 0) { my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); DBUG_PRINT("info", ("blob read error")); DBUG_ASSERT(false); } } /* db varchar 1 length byte */ field= table->field; s->db_length= *(uint8*)(*field)->ptr; Loading @@ -847,13 +862,19 @@ static void ndbcluster_get_schema(TABLE *table, s->slock_length= (*field)->field_length; DBUG_ASSERT((*field)->field_length == sizeof(s->slock)); memcpy(s->slock, (*field)->ptr, s->slock_length); /* query varchar 2 length bytes */ /* query blob */ field++; s->query_length= uint2korr((*field)->ptr); DBUG_ASSERT(s->query_length <= (*field)->field_length); DBUG_ASSERT((*field)->field_length + 2 == sizeof(s->query)); memcpy(s->query, (*field)->ptr + 2, s->query_length); s->query[s->query_length]= 0; { Field_blob *field_blob= (Field_blob*)(*field); uint blob_len= field_blob->get_length((*field)->ptr); char *blob_ptr= 0; field_blob->get_ptr(&blob_ptr); assert(blob_len == 0 || blob_ptr != 0); s->query_length= blob_len; s->query= sql_alloc(blob_len+1); memcpy(s->query, blob_ptr, blob_len); s->query[blob_len]= 0; } /* node_id */ field++; s->node_id= ((Field_long *)*field)->val_int(); Loading @@ -869,6 +890,8 @@ static void ndbcluster_get_schema(TABLE *table, /* type */ field++; s->type= ((Field_long *)*field)->val_int(); /* free blobs buffer */ my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); } /* Loading Loading @@ -1013,7 +1036,7 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, char save_db[FN_REFLEN]; strcpy(save_db, ndb->getDatabaseName()); char tmp_buf[SCHEMA_QUERY_SIZE]; char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Loading @@ -1037,10 +1060,13 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, for (i= 0; i < SCHEMA_SIZE; i++) { col[i]= ndbtab->getColumn(i); if (i != SCHEMA_QUERY_I) { sz[i]= col[i]->getLength(); DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); } } } while (1) { Loading Loading @@ -1068,9 +1094,14 @@ int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share, r|= op->setValue(SCHEMA_SLOCK_I, (char*)schema_subscribers.bitmap); DBUG_ASSERT(r == 0); /* query */ ndb_pack_varchar(col[SCHEMA_QUERY_I], tmp_buf, query, query_length); r|= op->setValue(SCHEMA_QUERY_I, tmp_buf); { NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I); DBUG_ASSERT(ndb_blob != 0); uint blob_len= query_length; const char* blob_ptr= query; r|= ndb_blob->setValue(blob_ptr, blob_len); DBUG_ASSERT(r == 0); } /* node_id */ r|= op->setValue(SCHEMA_NODE_ID_I, node_id); DBUG_ASSERT(r == 0); Loading Loading @@ -1203,7 +1234,7 @@ ndbcluster_update_slock(THD *thd, char save_db[FN_HEADLEN]; strcpy(save_db, ndb->getDatabaseName()); char tmp_buf[SCHEMA_QUERY_SIZE]; char tmp_buf[FN_REFLEN]; NDBDICT *dict= ndb->getDictionary(); ndb->setDatabaseName(NDB_REP_DB); const NDBTAB *ndbtab= dict->getTable(NDB_SCHEMA_TABLE); Loading @@ -1227,10 +1258,13 @@ ndbcluster_update_slock(THD *thd, for (i= 0; i < SCHEMA_SIZE; i++) { col[i]= ndbtab->getColumn(i); if (i != SCHEMA_QUERY_I) { sz[i]= col[i]->getLength(); DBUG_ASSERT(sz[i] <= sizeof(tmp_buf)); } } } while (1) { Loading Loading @@ -1506,7 +1540,7 @@ ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb, MY_BITMAP slock; bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, false); uint node_id= g_ndb_cluster_connection->node_id(); ndbcluster_get_schema(share->table, schema); ndbcluster_get_schema(share, schema); if (schema->node_id != node_id) { int log_query= 0, post_epoch_unlock= 0; Loading Loading @@ -2265,6 +2299,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ DBUG_ENTER("ndbcluster_create_event_ops"); DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name)); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); DBUG_ASSERT(share != 0); Loading Loading @@ -2374,6 +2409,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, else { DBUG_PRINT("info", ("%s blob", col_name)); DBUG_ASSERT(share->flags & NSF_BLOB_FLAG); attr0.blob= op->getBlobHandle(col_name); attr1.blob= op->getPreBlobHandle(col_name); if (attr0.blob == NULL || attr1.blob == NULL) Loading
storage/ndb/include/ndbapi/NdbDictionary.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -1164,7 +1164,7 @@ public: */ enum EventReport { ER_UPDATED = 0, ER_ALL = 1, ER_ALL = 1, // except not-updated blob inlines ER_SUBSCRIBE = 2 }; Loading
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -878,6 +878,7 @@ ArrayPool<TupTriggerData> c_triggerPool; {} Bitmask<MAXNROFATTRIBUTESINWORDS> notNullAttributeMask; Bitmask<MAXNROFATTRIBUTESINWORDS> blobAttributeMask; ReadFunction* readFunctionArray; UpdateFunction* updateFunctionArray; Loading
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp +6 −0 Original line number Diff line number Diff line Loading @@ -201,6 +201,7 @@ void Dbtup::execTUPFRAGREQ(Signal* signal) regTabPtr.p->m_no_of_attributes= noOfAttributes; regTabPtr.p->notNullAttributeMask.clear(); regTabPtr.p->blobAttributeMask.clear(); Uint32 offset[10]; Uint32 tableDescriptorRef= allocTabDescr(regTabPtr.p, offset); Loading Loading @@ -286,6 +287,7 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) ptrCheckGuard(fragOperPtr, cnoOfFragoprec, fragoperrec); Uint32 attrId = signal->theData[2]; Uint32 attrDescriptor = signal->theData[3]; Uint32 extType = AttributeDescriptor::getType(attrDescriptor); // DICT sends charset number in upper half Uint32 csNumber = (signal->theData[4] >> 16); Loading Loading @@ -353,6 +355,10 @@ void Dbtup::execTUP_ADD_ATTRREQ(Signal* signal) regTabPtr.p->notNullAttributeMask.set(attrId); } if (extType == NDB_TYPE_BLOB || extType == NDB_TYPE_TEXT) { regTabPtr.p->blobAttributeMask.set(attrId); } switch (AttributeDescriptor::getArrayType(attrDescriptor)) { case NDB_ARRAYTYPE_FIXED: { Loading
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp +12 −3 Original line number Diff line number Diff line Loading @@ -867,10 +867,19 @@ bool Dbtup::readTriggerInfo(TupTriggerData* const trigPtr, } else { ljam(); //-------------------------------------------------------------------- // All others send all attributes that are monitored // All others send all attributes that are monitored, except: // Omit unchanged blob inlines on update i.e. // attributeMask & ~ (blobAttributeMask & ~ changeMask) //-------------------------------------------------------------------- numAttrsToRead = setAttrIds(trigPtr->attributeMask, regTabPtr->m_no_of_attributes, &readBuffer[0]); Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask; attributeMask = trigPtr->attributeMask; if (regOperPtr->op_struct.op_type == ZUPDATE) { Bitmask<MAXNROFATTRIBUTESINWORDS> tmpMask = regTabPtr->blobAttributeMask; tmpMask.bitANDC(req_struct->changeMask); attributeMask.bitANDC(tmpMask); } numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes, &readBuffer[0]); } ndbrequire(numAttrsToRead < MAX_ATTRIBUTES_IN_TABLE); //-------------------------------------------------------------------- Loading