Loading mysql-test/t/disabled.def +4 −0 Original line number Diff line number Diff line Loading @@ -29,3 +29,7 @@ ndb_autodiscover : Needs to be fixed w.r.t binlog ndb_autodiscover2 : Needs to be fixed w.r.t binlog system_mysql_db : Needs fixing system_mysql_db_fix : Needs fixing #ndb_alter_table_row : sometimes wrong error 1015!=1046 ndb_gis : garbled msgs from corrupt THD* + partitioning problem # vim: set filetype=conf: sql/ha_ndbcluster.cc +67 −36 Original line number Diff line number Diff line Loading @@ -35,6 +35,11 @@ #include "ha_ndbcluster_binlog.h" #ifdef ndb_dynamite #undef assert #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) #endif // options from from mysqld.cc extern my_bool opt_ndb_optimized_node_selection; extern const char *opt_ndbcluster_connectstring; Loading Loading @@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) if (ndb_blob->blobsNextBlob() != NULL) DBUG_RETURN(0); ha_ndbcluster *ha= (ha_ndbcluster *)arg; DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); int ret= get_ndb_blobs_value(ha->table, ha->m_value, ha->m_blobs_buffer, ha->m_blobs_buffer_size, 0); DBUG_RETURN(ret); } int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) /* This routine is shared by injector. There is no common blobs buffer so the buffer and length are passed by reference. Injector also passes a record pointer diff. */ int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, byte*& buffer, uint& buffer_size, my_ptrdiff_t ptrdiff) { DBUG_ENTER("get_ndb_blobs_value"); Loading @@ -803,14 +818,17 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) for (int loop= 0; loop <= 1; loop++) { uint32 offset= 0; for (uint i= 0; i < table_share->fields; i++) for (uint i= 0; i < table->s->fields; i++) { Field *field= table->field[i]; NdbValue value= m_value[i]; NdbValue value= value_array[i]; if (value.ptr != NULL && (field->flags & BLOB_FLAG)) { Field_blob *field_blob= (Field_blob *)field; NdbBlob *ndb_blob= value.blob; int isNull; ndb_blob->getDefined(isNull); if (isNull == 0) { // XXX -1 should be allowed only for events Uint64 blob_len= 0; if (ndb_blob->getLength(blob_len) != 0) DBUG_RETURN(-1); Loading @@ -820,27 +838,31 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) blob_size+= 8 - blob_size % 8; if (loop == 1) { char *buf= m_blobs_buffer + offset; char *buf= buffer + offset; uint32 len= 0xffffffff; // Max uint32 DBUG_PRINT("value", ("read blob ptr=%lx len=%u", DBUG_PRINT("info", ("read blob ptr=%p len=%u", buf, (uint) blob_len)); if (ndb_blob->readData(buf, len) != 0) DBUG_RETURN(-1); DBUG_ASSERT(len == blob_len); // Ugly hack assumes only ptr needs to be changed field_blob->ptr += ptrdiff; field_blob->set_ptr(len, buf); field_blob->ptr -= ptrdiff; } offset+= blob_size; } } if (loop == 0 && offset > m_blobs_buffer_size) } if (loop == 0 && offset > buffer_size) { my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer_size= 0; DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); m_blobs_buffer= my_malloc(offset, MYF(MY_WME)); if (m_blobs_buffer == NULL) my_free(buffer, MYF(MY_ALLOW_ZERO_PTR)); buffer_size= 0; DBUG_PRINT("info", ("allocate blobs buffer size %u", offset)); buffer= my_malloc(offset, MYF(MY_WME)); if (buffer == NULL) DBUG_RETURN(-1); m_blobs_buffer_size= offset; buffer_size= offset; } } DBUG_RETURN(0); Loading Loading @@ -2713,15 +2735,23 @@ void ndb_unpack_record(TABLE *table, NdbValue *value, else { NdbBlob *ndb_blob= (*value).blob; bool isNull= TRUE; #ifndef DBUG_OFF int ret= #endif ndb_blob->getNull(isNull); DBUG_ASSERT(ret == 0); if (isNull) int isNull; ndb_blob->getDefined(isNull); if (isNull != 0) { uint col_no = ndb_blob->getColumn()->getColumnNo(); if (isNull == 1) { DBUG_PRINT("info",("[%u] NULL", col_no)) field->set_null(row_offset); } else { DBUG_PRINT("info",("[%u] UNDEFINED", col_no)); bitmap_clear_bit(defined, col_no); } } } } } DBUG_VOID_RETURN; Loading Loading @@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to) NDBDICT *dict= ndb->getDictionary(); const NDBTAB *orig_tab= (const NDBTAB *) m_table; DBUG_ENTER("alter_table_name"); DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to)); NdbDictionary::Table new_tab= *orig_tab; new_tab.setName(to); Loading sql/ha_ndbcluster.h +8 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,9 @@ #pragma interface /* gcc class implementation */ #endif /* Blob tables and events are internal to NDB and must never be accessed */ #define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB") #include <NdbApi.hpp> #include <ndbapi_limits.h> Loading Loading @@ -78,6 +81,10 @@ typedef struct ndb_index_data { typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, byte*& buffer, uint& buffer_size, my_ptrdiff_t ptrdiff); typedef enum { NSS_INITIAL= 0, NSS_DROPPED, Loading Loading @@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share { #ifdef HAVE_NDB_BINLOG /* NDB_SHARE.flags */ #define NSF_HIDDEN_PK 1 /* table has hidden primary key */ #define NSF_BLOB_FLAG 2 /* table has blob attributes */ #define NSF_NO_BINLOG 4 /* table should not be binlogged */ #endif Loading sql/ha_ndbcluster_binlog.cc +113 −29 Original line number Diff line number Diff line Loading @@ -23,6 +23,11 @@ #include "slave.h" #include "ha_ndbcluster_binlog.h" #ifdef ndb_dynamite #undef assert #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) #endif /* defines for cluster replication table names */ Loading Loading @@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) DBUG_ASSERT(_table != 0); if (_table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; if (_table->s->blob_fields != 0) share->flags|= NSF_BLOB_FLAG; return; } while (1) Loading Loading @@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) } if (table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; if (table->s->blob_fields != 0) share->flags|= NSF_BLOB_FLAG; break; } } Loading Loading @@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, NDB_SHARE *share) { DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); pthread_mutex_lock(&ndbcluster_mutex); Loading Loading @@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, const char *event_name, NDB_SHARE *share) { DBUG_ENTER("ndbcluster_create_event"); DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s", ndbtab->getName(), ndbtab->getObjectVersion(), event_name, share ? share->key : "(nil)")); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); if (!share) { DBUG_PRINT("info", ("share == NULL")); Loading @@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, my_event.addTableEvent(NDBEVENT::TE_ALL); if (share->flags & NSF_HIDDEN_PK) { /* No primary key, susbscribe for all attributes */ if (share->flags & NSF_BLOB_FLAG) { sql_print_error("NDB Binlog: logging of table %s " "with no PK and blob attributes is not supported", share->key); DBUG_RETURN(-1); } /* No primary key, subscribe for all attributes */ my_event.setReport(NDBEVENT::ER_ALL); DBUG_PRINT("info", ("subscription all")); } Loading @@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, DBUG_PRINT("info", ("subscription all and subscribe")); } } if (share->flags & NSF_BLOB_FLAG) my_event.mergeEvents(true); /* add all columns to the event */ int n_cols= ndbtab->getNoOfColumns(); Loading Loading @@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ DBUG_ENTER("ndbcluster_create_event_ops"); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); DBUG_ASSERT(share != 0); Loading @@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } TABLE *table= share->table; if (table) { /* Logging of blob tables is not yet implemented, it would require: 1. setup of events also on the blob attribute tables 2. collect the pieces of the blob into one from an epoch to provide a full blob to binlog */ if (table->s->blob_fields) { sql_print_error("NDB Binlog: logging of blob table %s " "is not supported", share->key); share->flags|= NSF_NO_BINLOG; DBUG_RETURN(0); } } int do_schema_share= 0, do_apply_status_share= 0; int retries= 100; Loading Loading @@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, DBUG_RETURN(-1); } if (share->flags & NSF_BLOB_FLAG) op->mergeEvents(true); // currently not inherited from event if (share->flags & NSF_BLOB_FLAG) { /* * Given servers S1 S2, following results in out-of-date * event->m_tableImpl and column->m_blobTable. * * S1: create table t1(a int primary key); * S2: drop table t1; * S1: create table t2(a int primary key, b blob); * S1: alter table t2 add x int; * S1: alter table t2 drop x; * * TODO fix at right place before we get here */ ndb->getDictionary()->fix_blob_events(ndbtab, event_name); } int n_columns= ndbtab->getNoOfColumns(); int n_fields= table ? table->s->fields : 0; int n_fields= table ? table->s->fields : 0; // XXX ??? for (int j= 0; j < n_columns; j++) { const char *col_name= ndbtab->getColumn(j)->getName(); NdbRecAttr *attr0, *attr1; NdbValue attr0, attr1; if (j < n_fields) { Field *f= share->table->field[j]; if (is_ndb_compatible_type(f)) { DBUG_PRINT("info", ("%s compatible", col_name)); attr0= op->getValue(col_name, f->ptr); attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) + attr0.rec= op->getValue(col_name, f->ptr); attr1.rec= op->getPreValue(col_name, (f->ptr - share->table->record[0]) + share->table->record[1]); } else else if (! (f->flags & BLOB_FLAG)) { DBUG_PRINT("info", ("%s non compatible", col_name)); attr0= op->getValue(col_name); attr1= op->getPreValue(col_name); attr0.rec= op->getValue(col_name); attr1.rec= op->getPreValue(col_name); } else { DBUG_PRINT("info", ("%s blob", col_name)); attr0.blob= op->getBlobHandle(col_name); attr1.blob= op->getPreBlobHandle(col_name); } } else { DBUG_PRINT("info", ("%s hidden key", col_name)); attr0= op->getValue(col_name); attr1= op->getPreValue(col_name); attr0.rec= op->getValue(col_name); attr1.rec= op->getPreValue(col_name); } share->ndb_value[0][j].rec= attr0; share->ndb_value[1][j].rec= attr1; share->ndb_value[0][j].ptr= attr0.ptr; share->ndb_value[1][j].ptr= attr1.ptr; } op->setCustomData((void *) share); // set before execute share->op= op; // assign op in NDB_SHARE Loading Loading @@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, (saves moving data about many times) */ /* for now malloc/free blobs buffer each time TODO if possible share single permanent buffer with handlers */ byte* blobs_buffer[2] = { 0, 0 }; uint blobs_buffer_size[2] = { 0, 0 }; switch(pOp->getEventType()) { case NDBEVENT::TE_INSERT: row.n_inserts++; DBUG_PRINT("info", ("INSERT INTO %s", share->key)); { if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= 0; int ret= get_ndb_blobs_value(table, share->ndb_value[0], blobs_buffer[0], blobs_buffer_size[0], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); trans.write_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[0]); Loading @@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, key */ if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= table->record[n] - table->record[0]; int ret= get_ndb_blobs_value(table, share->ndb_value[n], blobs_buffer[n], blobs_buffer_size[n], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); print_records(table, table->record[n]); trans.delete_row(::server_id, injector::transaction::table(table, true), Loading @@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, row.n_updates++; DBUG_PRINT("info", ("UPDATE %s", share->key)); { if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= 0; int ret= get_ndb_blobs_value(table, share->ndb_value[0], blobs_buffer[0], blobs_buffer_size[0], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); print_records(table, table->record[0]); if (table->s->primary_key != MAX_KEY) { /* since table has a primary key, we can to a write since table has a primary key, we can do a write using only after values */ trans.write_row(::server_id, injector::transaction::table(table, true), Loading @@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, mysql server cannot handle the ndb hidden key and therefore needs the before image as well */ if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= table->record[1] - table->record[0]; int ret= get_ndb_blobs_value(table, share->ndb_value[1], blobs_buffer[1], blobs_buffer_size[1], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]); print_records(table, table->record[1]); trans.update_row(::server_id, Loading @@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, break; } if (share->flags & NSF_BLOB_FLAG) { my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR)); my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR)); } return 0; } Loading Loading @@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Binlog_index_row row; while (pOp != NULL) { // sometimes get TE_ALTER with invalid table DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER || ! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName())); ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); Loading Loading @@ -2684,6 +2767,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) DBUG_PRINT("info",("removing all event operations")); while ((op= ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); Loading storage/ndb/include/ndbapi/NdbBlob.hpp +37 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ class NdbOperation; class NdbRecAttr; class NdbTableImpl; class NdbColumnImpl; class NdbEventOperationImpl; /** * @class NdbBlob Loading Loading @@ -71,6 +72,10 @@ class NdbColumnImpl; * writes. It avoids execute penalty if nothing is pending. It is not * needed after execute (obviously) or after next scan result. * * NdbBlob also supports reading post or pre blob data from events. The * handle can be read after next event on main table has been retrieved. * The data is available immediately. See NdbEventOperation. * * NdbBlob methods return -1 on error and 0 on success, and use output * parameters when necessary. * Loading Loading @@ -145,6 +150,12 @@ public: * then the callback is invoked. */ int setActiveHook(ActiveHook* activeHook, void* arg); /** * Check if blob value is defined (NULL or not). Used as first call * on event based blob. The argument is set to -1 for not defined. * Unlike getNull() this does not cause error on the handle. */ int getDefined(int& isNull); /** * Check if blob is null. */ Loading Loading @@ -191,6 +202,11 @@ public: * Get blob parts table name. Useful only to test programs. */ static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName); /** * Get blob event name. The blob event is created if the main event * monitors the blob column. The name includes main event name. */ static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName); /** * Return error object. The error may be blob specific (below) or may * be copied from a failed implicit operation. Loading @@ -217,17 +233,29 @@ private: friend class NdbScanOperation; friend class NdbDictionaryImpl; friend class NdbResultSet; // atNextResult friend class NdbEventBuffer; friend class NdbEventOperationImpl; #endif // state State theState; void setState(State newState); // quick and dirty support for events (consider subclassing) int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event // define blob table static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c); static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c); // ndb api stuff Ndb* theNdb; NdbTransaction* theNdbCon; NdbOperation* theNdbOp; NdbEventOperationImpl* theEventOp; NdbEventOperationImpl* theBlobEventOp; NdbRecAttr* theBlobEventPkRecAttr; NdbRecAttr* theBlobEventDistRecAttr; NdbRecAttr* theBlobEventPartRecAttr; NdbRecAttr* theBlobEventDataRecAttr; const NdbTableImpl* theTable; const NdbTableImpl* theAccessTable; const NdbTableImpl* theBlobTable; Loading Loading @@ -263,6 +291,8 @@ private: Buf theHeadInlineBuf; Buf theHeadInlineCopyBuf; // for writeTuple Buf thePartBuf; Buf theBlobEventDataBuf; Uint32 thePartNumber; // for event Head* theHead; char* theInlineData; NdbRecAttr* theHeadInlineRecAttr; Loading Loading @@ -306,6 +336,8 @@ private: int readDataPrivate(char* buf, Uint32& bytes); int writeDataPrivate(const char* buf, Uint32 bytes); int readParts(char* buf, Uint32 part, Uint32 count); int readTableParts(char* buf, Uint32 part, Uint32 count); int readEventParts(char* buf, Uint32 part, Uint32 count); int insertParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count); Loading @@ -317,19 +349,23 @@ private: int invokeActiveHook(); // blob handle maintenance int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn); int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version); int prepareColumn(); int preExecute(NdbTransaction::ExecType anExecType, bool& batch); int postExecute(NdbTransaction::ExecType anExecType); int preCommit(); int atNextResult(); int atNextEvent(); // errors void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true); #ifdef VM_TRACE int getOperationType() const; friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); #endif // list stuff void next(NdbBlob* obj) { theNext= obj;} NdbBlob* next() { return theNext;} friend struct Ndb_free_list_t<NdbBlob>; Loading Loading
mysql-test/t/disabled.def +4 −0 Original line number Diff line number Diff line Loading @@ -29,3 +29,7 @@ ndb_autodiscover : Needs to be fixed w.r.t binlog ndb_autodiscover2 : Needs to be fixed w.r.t binlog system_mysql_db : Needs fixing system_mysql_db_fix : Needs fixing #ndb_alter_table_row : sometimes wrong error 1015!=1046 ndb_gis : garbled msgs from corrupt THD* + partitioning problem # vim: set filetype=conf:
sql/ha_ndbcluster.cc +67 −36 Original line number Diff line number Diff line Loading @@ -35,6 +35,11 @@ #include "ha_ndbcluster_binlog.h" #ifdef ndb_dynamite #undef assert #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) #endif // options from from mysqld.cc extern my_bool opt_ndb_optimized_node_selection; extern const char *opt_ndbcluster_connectstring; Loading Loading @@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) if (ndb_blob->blobsNextBlob() != NULL) DBUG_RETURN(0); ha_ndbcluster *ha= (ha_ndbcluster *)arg; DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob)); int ret= get_ndb_blobs_value(ha->table, ha->m_value, ha->m_blobs_buffer, ha->m_blobs_buffer_size, 0); DBUG_RETURN(ret); } int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) /* This routine is shared by injector. There is no common blobs buffer so the buffer and length are passed by reference. Injector also passes a record pointer diff. */ int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, byte*& buffer, uint& buffer_size, my_ptrdiff_t ptrdiff) { DBUG_ENTER("get_ndb_blobs_value"); Loading @@ -803,14 +818,17 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) for (int loop= 0; loop <= 1; loop++) { uint32 offset= 0; for (uint i= 0; i < table_share->fields; i++) for (uint i= 0; i < table->s->fields; i++) { Field *field= table->field[i]; NdbValue value= m_value[i]; NdbValue value= value_array[i]; if (value.ptr != NULL && (field->flags & BLOB_FLAG)) { Field_blob *field_blob= (Field_blob *)field; NdbBlob *ndb_blob= value.blob; int isNull; ndb_blob->getDefined(isNull); if (isNull == 0) { // XXX -1 should be allowed only for events Uint64 blob_len= 0; if (ndb_blob->getLength(blob_len) != 0) DBUG_RETURN(-1); Loading @@ -820,27 +838,31 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob) blob_size+= 8 - blob_size % 8; if (loop == 1) { char *buf= m_blobs_buffer + offset; char *buf= buffer + offset; uint32 len= 0xffffffff; // Max uint32 DBUG_PRINT("value", ("read blob ptr=%lx len=%u", DBUG_PRINT("info", ("read blob ptr=%p len=%u", buf, (uint) blob_len)); if (ndb_blob->readData(buf, len) != 0) DBUG_RETURN(-1); DBUG_ASSERT(len == blob_len); // Ugly hack assumes only ptr needs to be changed field_blob->ptr += ptrdiff; field_blob->set_ptr(len, buf); field_blob->ptr -= ptrdiff; } offset+= blob_size; } } if (loop == 0 && offset > m_blobs_buffer_size) } if (loop == 0 && offset > buffer_size) { my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer_size= 0; DBUG_PRINT("value", ("allocate blobs buffer size %u", offset)); m_blobs_buffer= my_malloc(offset, MYF(MY_WME)); if (m_blobs_buffer == NULL) my_free(buffer, MYF(MY_ALLOW_ZERO_PTR)); buffer_size= 0; DBUG_PRINT("info", ("allocate blobs buffer size %u", offset)); buffer= my_malloc(offset, MYF(MY_WME)); if (buffer == NULL) DBUG_RETURN(-1); m_blobs_buffer_size= offset; buffer_size= offset; } } DBUG_RETURN(0); Loading Loading @@ -2713,15 +2735,23 @@ void ndb_unpack_record(TABLE *table, NdbValue *value, else { NdbBlob *ndb_blob= (*value).blob; bool isNull= TRUE; #ifndef DBUG_OFF int ret= #endif ndb_blob->getNull(isNull); DBUG_ASSERT(ret == 0); if (isNull) int isNull; ndb_blob->getDefined(isNull); if (isNull != 0) { uint col_no = ndb_blob->getColumn()->getColumnNo(); if (isNull == 1) { DBUG_PRINT("info",("[%u] NULL", col_no)) field->set_null(row_offset); } else { DBUG_PRINT("info",("[%u] UNDEFINED", col_no)); bitmap_clear_bit(defined, col_no); } } } } } DBUG_VOID_RETURN; Loading Loading @@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to) NDBDICT *dict= ndb->getDictionary(); const NDBTAB *orig_tab= (const NDBTAB *) m_table; DBUG_ENTER("alter_table_name"); DBUG_PRINT("info", ("from: %s to: %s", orig_tab->getName(), to)); NdbDictionary::Table new_tab= *orig_tab; new_tab.setName(to); Loading
sql/ha_ndbcluster.h +8 −0 Original line number Diff line number Diff line Loading @@ -25,6 +25,9 @@ #pragma interface /* gcc class implementation */ #endif /* Blob tables and events are internal to NDB and must never be accessed */ #define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB") #include <NdbApi.hpp> #include <ndbapi_limits.h> Loading Loading @@ -78,6 +81,10 @@ typedef struct ndb_index_data { typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue; int get_ndb_blobs_value(TABLE* table, NdbValue* value_array, byte*& buffer, uint& buffer_size, my_ptrdiff_t ptrdiff); typedef enum { NSS_INITIAL= 0, NSS_DROPPED, Loading Loading @@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share { #ifdef HAVE_NDB_BINLOG /* NDB_SHARE.flags */ #define NSF_HIDDEN_PK 1 /* table has hidden primary key */ #define NSF_BLOB_FLAG 2 /* table has blob attributes */ #define NSF_NO_BINLOG 4 /* table should not be binlogged */ #endif Loading
sql/ha_ndbcluster_binlog.cc +113 −29 Original line number Diff line number Diff line Loading @@ -23,6 +23,11 @@ #include "slave.h" #include "ha_ndbcluster_binlog.h" #ifdef ndb_dynamite #undef assert #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0) #endif /* defines for cluster replication table names */ Loading Loading @@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) DBUG_ASSERT(_table != 0); if (_table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; if (_table->s->blob_fields != 0) share->flags|= NSF_BLOB_FLAG; return; } while (1) Loading Loading @@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table) } if (table->s->primary_key == MAX_KEY) share->flags|= NSF_HIDDEN_PK; if (table->s->blob_fields != 0) share->flags|= NSF_BLOB_FLAG; break; } } Loading Loading @@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key, NDB_SHARE *share) { DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name)); pthread_mutex_lock(&ndbcluster_mutex); Loading Loading @@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, const char *event_name, NDB_SHARE *share) { DBUG_ENTER("ndbcluster_create_event"); DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s", ndbtab->getName(), ndbtab->getObjectVersion(), event_name, share ? share->key : "(nil)")); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); if (!share) { DBUG_PRINT("info", ("share == NULL")); Loading @@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, my_event.addTableEvent(NDBEVENT::TE_ALL); if (share->flags & NSF_HIDDEN_PK) { /* No primary key, susbscribe for all attributes */ if (share->flags & NSF_BLOB_FLAG) { sql_print_error("NDB Binlog: logging of table %s " "with no PK and blob attributes is not supported", share->key); DBUG_RETURN(-1); } /* No primary key, subscribe for all attributes */ my_event.setReport(NDBEVENT::ER_ALL); DBUG_PRINT("info", ("subscription all")); } Loading @@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab, DBUG_PRINT("info", ("subscription all and subscribe")); } } if (share->flags & NSF_BLOB_FLAG) my_event.mergeEvents(true); /* add all columns to the event */ int n_cols= ndbtab->getNoOfColumns(); Loading Loading @@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, */ DBUG_ENTER("ndbcluster_create_event_ops"); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); DBUG_ASSERT(share != 0); Loading @@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, } TABLE *table= share->table; if (table) { /* Logging of blob tables is not yet implemented, it would require: 1. setup of events also on the blob attribute tables 2. collect the pieces of the blob into one from an epoch to provide a full blob to binlog */ if (table->s->blob_fields) { sql_print_error("NDB Binlog: logging of blob table %s " "is not supported", share->key); share->flags|= NSF_NO_BINLOG; DBUG_RETURN(0); } } int do_schema_share= 0, do_apply_status_share= 0; int retries= 100; Loading Loading @@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab, DBUG_RETURN(-1); } if (share->flags & NSF_BLOB_FLAG) op->mergeEvents(true); // currently not inherited from event if (share->flags & NSF_BLOB_FLAG) { /* * Given servers S1 S2, following results in out-of-date * event->m_tableImpl and column->m_blobTable. * * S1: create table t1(a int primary key); * S2: drop table t1; * S1: create table t2(a int primary key, b blob); * S1: alter table t2 add x int; * S1: alter table t2 drop x; * * TODO fix at right place before we get here */ ndb->getDictionary()->fix_blob_events(ndbtab, event_name); } int n_columns= ndbtab->getNoOfColumns(); int n_fields= table ? table->s->fields : 0; int n_fields= table ? table->s->fields : 0; // XXX ??? for (int j= 0; j < n_columns; j++) { const char *col_name= ndbtab->getColumn(j)->getName(); NdbRecAttr *attr0, *attr1; NdbValue attr0, attr1; if (j < n_fields) { Field *f= share->table->field[j]; if (is_ndb_compatible_type(f)) { DBUG_PRINT("info", ("%s compatible", col_name)); attr0= op->getValue(col_name, f->ptr); attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) + attr0.rec= op->getValue(col_name, f->ptr); attr1.rec= op->getPreValue(col_name, (f->ptr - share->table->record[0]) + share->table->record[1]); } else else if (! (f->flags & BLOB_FLAG)) { DBUG_PRINT("info", ("%s non compatible", col_name)); attr0= op->getValue(col_name); attr1= op->getPreValue(col_name); attr0.rec= op->getValue(col_name); attr1.rec= op->getPreValue(col_name); } else { DBUG_PRINT("info", ("%s blob", col_name)); attr0.blob= op->getBlobHandle(col_name); attr1.blob= op->getPreBlobHandle(col_name); } } else { DBUG_PRINT("info", ("%s hidden key", col_name)); attr0= op->getValue(col_name); attr1= op->getPreValue(col_name); attr0.rec= op->getValue(col_name); attr1.rec= op->getPreValue(col_name); } share->ndb_value[0][j].rec= attr0; share->ndb_value[1][j].rec= attr1; share->ndb_value[0][j].ptr= attr0.ptr; share->ndb_value[1][j].ptr= attr1.ptr; } op->setCustomData((void *) share); // set before execute share->op= op; // assign op in NDB_SHARE Loading Loading @@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, (saves moving data about many times) */ /* for now malloc/free blobs buffer each time TODO if possible share single permanent buffer with handlers */ byte* blobs_buffer[2] = { 0, 0 }; uint blobs_buffer_size[2] = { 0, 0 }; switch(pOp->getEventType()) { case NDBEVENT::TE_INSERT: row.n_inserts++; DBUG_PRINT("info", ("INSERT INTO %s", share->key)); { if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= 0; int ret= get_ndb_blobs_value(table, share->ndb_value[0], blobs_buffer[0], blobs_buffer_size[0], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); trans.write_row(::server_id, injector::transaction::table(table, true), &b, n_fields, table->record[0]); Loading @@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, key */ if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= table->record[n] - table->record[0]; int ret= get_ndb_blobs_value(table, share->ndb_value[n], blobs_buffer[n], blobs_buffer_size[n], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); print_records(table, table->record[n]); trans.delete_row(::server_id, injector::transaction::table(table, true), Loading @@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, row.n_updates++; DBUG_PRINT("info", ("UPDATE %s", share->key)); { if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= 0; int ret= get_ndb_blobs_value(table, share->ndb_value[0], blobs_buffer[0], blobs_buffer_size[0], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); print_records(table, table->record[0]); if (table->s->primary_key != MAX_KEY) { /* since table has a primary key, we can to a write since table has a primary key, we can do a write using only after values */ trans.write_row(::server_id, injector::transaction::table(table, true), Loading @@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, mysql server cannot handle the ndb hidden key and therefore needs the before image as well */ if (share->flags & NSF_BLOB_FLAG) { my_ptrdiff_t ptrdiff= table->record[1] - table->record[0]; int ret= get_ndb_blobs_value(table, share->ndb_value[1], blobs_buffer[1], blobs_buffer_size[1], ptrdiff); DBUG_ASSERT(ret == 0); } ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]); print_records(table, table->record[1]); trans.update_row(::server_id, Loading @@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, break; } if (share->flags & NSF_BLOB_FLAG) { my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR)); my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR)); } return 0; } Loading Loading @@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) Binlog_index_row row; while (pOp != NULL) { // sometimes get TE_ALTER with invalid table DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER || ! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName())); ndb-> setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip); ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage); Loading Loading @@ -2684,6 +2767,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg) DBUG_PRINT("info",("removing all event operations")); while ((op= ndb->getEventOperation())) { DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName())); DBUG_PRINT("info",("removing event operation on %s", op->getEvent()->getName())); NDB_SHARE *share= (NDB_SHARE*) op->getCustomData(); Loading
storage/ndb/include/ndbapi/NdbBlob.hpp +37 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ class NdbOperation; class NdbRecAttr; class NdbTableImpl; class NdbColumnImpl; class NdbEventOperationImpl; /** * @class NdbBlob Loading Loading @@ -71,6 +72,10 @@ class NdbColumnImpl; * writes. It avoids execute penalty if nothing is pending. It is not * needed after execute (obviously) or after next scan result. * * NdbBlob also supports reading post or pre blob data from events. The * handle can be read after next event on main table has been retrieved. * The data is available immediately. See NdbEventOperation. * * NdbBlob methods return -1 on error and 0 on success, and use output * parameters when necessary. * Loading Loading @@ -145,6 +150,12 @@ public: * then the callback is invoked. */ int setActiveHook(ActiveHook* activeHook, void* arg); /** * Check if blob value is defined (NULL or not). Used as first call * on event based blob. The argument is set to -1 for not defined. * Unlike getNull() this does not cause error on the handle. */ int getDefined(int& isNull); /** * Check if blob is null. */ Loading Loading @@ -191,6 +202,11 @@ public: * Get blob parts table name. Useful only to test programs. */ static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName); /** * Get blob event name. The blob event is created if the main event * monitors the blob column. The name includes main event name. */ static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName); /** * Return error object. The error may be blob specific (below) or may * be copied from a failed implicit operation. Loading @@ -217,17 +233,29 @@ private: friend class NdbScanOperation; friend class NdbDictionaryImpl; friend class NdbResultSet; // atNextResult friend class NdbEventBuffer; friend class NdbEventOperationImpl; #endif // state State theState; void setState(State newState); // quick and dirty support for events (consider subclassing) int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event // define blob table static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c); static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c); // ndb api stuff Ndb* theNdb; NdbTransaction* theNdbCon; NdbOperation* theNdbOp; NdbEventOperationImpl* theEventOp; NdbEventOperationImpl* theBlobEventOp; NdbRecAttr* theBlobEventPkRecAttr; NdbRecAttr* theBlobEventDistRecAttr; NdbRecAttr* theBlobEventPartRecAttr; NdbRecAttr* theBlobEventDataRecAttr; const NdbTableImpl* theTable; const NdbTableImpl* theAccessTable; const NdbTableImpl* theBlobTable; Loading Loading @@ -263,6 +291,8 @@ private: Buf theHeadInlineBuf; Buf theHeadInlineCopyBuf; // for writeTuple Buf thePartBuf; Buf theBlobEventDataBuf; Uint32 thePartNumber; // for event Head* theHead; char* theInlineData; NdbRecAttr* theHeadInlineRecAttr; Loading Loading @@ -306,6 +336,8 @@ private: int readDataPrivate(char* buf, Uint32& bytes); int writeDataPrivate(const char* buf, Uint32 bytes); int readParts(char* buf, Uint32 part, Uint32 count); int readTableParts(char* buf, Uint32 part, Uint32 count); int readEventParts(char* buf, Uint32 part, Uint32 count); int insertParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count); Loading @@ -317,19 +349,23 @@ private: int invokeActiveHook(); // blob handle maintenance int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn); int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version); int prepareColumn(); int preExecute(NdbTransaction::ExecType anExecType, bool& batch); int postExecute(NdbTransaction::ExecType anExecType); int preCommit(); int atNextResult(); int atNextEvent(); // errors void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true); #ifdef VM_TRACE int getOperationType() const; friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); #endif // list stuff void next(NdbBlob* obj) { theNext= obj;} NdbBlob* next() { return theNext;} friend struct Ndb_free_list_t<NdbBlob>; Loading