Loading storage/ndb/include/ndbapi/NdbBlob.hpp +37 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ class NdbOperation; class NdbRecAttr; class NdbTableImpl; class NdbColumnImpl; class NdbEventOperationImpl; /** * @class NdbBlob Loading Loading @@ -71,6 +72,10 @@ class NdbColumnImpl; * writes. It avoids execute penalty if nothing is pending. It is not * needed after execute (obviously) or after next scan result. * * NdbBlob also supports reading post or pre blob data from events. The * handle can be read after next event on main table has been retrieved. * The data is available immediately. See NdbEventOperation. * * NdbBlob methods return -1 on error and 0 on success, and use output * parameters when necessary. * Loading Loading @@ -145,6 +150,12 @@ public: * then the callback is invoked. */ int setActiveHook(ActiveHook* activeHook, void* arg); /** * Check if blob value is defined (NULL or not). Used as first call * on event based blob. The argument is set to -1 for not defined. * Unlike getNull() this does not cause error on the handle. */ int getDefined(int& isNull); /** * Check if blob is null. */ Loading Loading @@ -191,6 +202,11 @@ public: * Get blob parts table name. Useful only to test programs. */ static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName); /** * Get blob event name. The blob event is created if the main event * monitors the blob column. The name includes main event name. */ static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName); /** * Return error object. The error may be blob specific (below) or may * be copied from a failed implicit operation. Loading @@ -217,17 +233,29 @@ private: friend class NdbScanOperation; friend class NdbDictionaryImpl; friend class NdbResultSet; // atNextResult friend class NdbEventBuffer; friend class NdbEventOperationImpl; #endif // state State theState; void setState(State newState); // quick and dirty support for events (consider subclassing) int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event // define blob table static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c); static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c); // ndb api stuff Ndb* theNdb; NdbTransaction* theNdbCon; NdbOperation* theNdbOp; NdbEventOperationImpl* theEventOp; NdbEventOperationImpl* theBlobEventOp; NdbRecAttr* theBlobEventPkRecAttr; NdbRecAttr* theBlobEventDistRecAttr; NdbRecAttr* theBlobEventPartRecAttr; NdbRecAttr* theBlobEventDataRecAttr; const NdbTableImpl* theTable; const NdbTableImpl* theAccessTable; const NdbTableImpl* theBlobTable; Loading Loading @@ -263,6 +291,8 @@ private: Buf theHeadInlineBuf; Buf theHeadInlineCopyBuf; // for writeTuple Buf thePartBuf; Buf theBlobEventDataBuf; Uint32 thePartNumber; // for event Head* theHead; char* theInlineData; NdbRecAttr* theHeadInlineRecAttr; Loading Loading @@ -306,6 +336,8 @@ private: int readDataPrivate(char* buf, Uint32& bytes); int writeDataPrivate(const char* buf, Uint32 bytes); int readParts(char* buf, Uint32 part, Uint32 count); int readTableParts(char* buf, Uint32 part, Uint32 count); int readEventParts(char* buf, Uint32 part, Uint32 count); int insertParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count); Loading @@ -317,19 +349,23 @@ private: int invokeActiveHook(); // blob handle maintenance int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn); int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version); int prepareColumn(); int preExecute(NdbTransaction::ExecType anExecType, bool& batch); int postExecute(NdbTransaction::ExecType anExecType); int preCommit(); int atNextResult(); int atNextEvent(); // errors void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true); #ifdef VM_TRACE int getOperationType() const; friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); #endif // list stuff void next(NdbBlob* obj) { theNext= obj;} NdbBlob* next() { return theNext;} friend struct Ndb_free_list_t<NdbBlob>; Loading storage/ndb/include/ndbapi/NdbDictionary.hpp +19 −1 Original line number Diff line number Diff line Loading @@ -1124,7 +1124,7 @@ public: _TE_NODE_FAILURE=10, _TE_SUBSCRIBE=11, _TE_UNSUBSCRIBE=12, _TE_NUL=13 // internal (INS o DEL within same GCI) _TE_NUL=13 // internal (e.g. INS o DEL within same GCI) }; #endif /** Loading Loading @@ -1261,6 +1261,24 @@ public: */ int getNoOfEventColumns() const; /** * The merge events flag is false by default. Setting it true * implies that events are merged in following ways: * * - for given NdbEventOperation associated with this event, * events on same PK within same GCI are merged into single event * * - a blob table event is created for each blob attribute * and blob events are handled as part of main table events * * - blob post/pre data from the blob part events can be read * via NdbBlob methods as a single value * * NOTE: Currently this flag is not inherited by NdbEventOperation * and must be set on NdbEventOperation explicitly. */ void mergeEvents(bool flag); /** * Get object status */ Loading storage/ndb/include/ndbapi/NdbEventOperation.hpp +8 −0 Original line number Diff line number Diff line Loading @@ -150,6 +150,14 @@ public: */ NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0); /** * These methods replace getValue/getPreValue for blobs. Each * method creates a blob handle NdbBlob. The handle supports only * read operations. See NdbBlob. */ NdbBlob* getBlobHandle(const char *anAttrName); NdbBlob* getPreBlobHandle(const char *anAttrName); int isOverrun() const; /** Loading storage/ndb/ndbapi-examples/ndbapi_event/Makefile +3 −3 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ OBJS = ndbapi_event.o CXX = g++ -g CFLAGS = -c -Wall -fno-rtti -fno-exceptions CXXFLAGS = DEBUG = DEBUG =# -DVM_TRACE LFLAGS = -Wall TOP_SRCDIR = ../../../.. INCLUDE_DIR = $(TOP_SRCDIR)/storage/ndb/include Loading @@ -16,8 +16,8 @@ SYS_LIB = $(TARGET): $(OBJS) $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS) $(TARGET).o: $(SRCS) Makefile $(CXX) $(CFLAGS) $(DEBUG) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS) clean: rm -f *.o $(TARGET) storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp +117 −52 Original line number Diff line number Diff line Loading @@ -54,26 +54,32 @@ #include <stdio.h> #include <iostream> #include <unistd.h> #ifdef VM_TRACE #include <my_global.h> #endif #ifndef assert #include <assert.h> #endif /** * * Assume that there is a table t0 which is being updated by * Assume that there is a table which is being updated by * another process (e.g. flexBench -l 0 -stdtables). * We want to monitor what happens with columns c0,c1,c2,c3. * We want to monitor what happens with column values. * * or together with the mysql client; * Or using the mysql client: * * shell> mysql -u root * mysql> create database TEST_DB; * mysql> use TEST_DB; * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4), * mysql> create table t0 * (c0 int, c1 int, c2 char(4), c3 char(4), c4 text, * primary key(c0, c2)) engine ndb charset latin1; * * In another window start ndbapi_event, wait until properly started * insert into t0 values (1, 2, 'a', 'b'); insert into t0 values (3, 4, 'c', 'd'); insert into t0 values (1, 2, 'a', 'b', null); insert into t0 values (3, 4, 'c', 'd', null); update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk update t0 set c3 = 'f'; -- use scan update t0 set c3 = 'F'; -- use scan update to 'same' Loading @@ -81,7 +87,18 @@ update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same' update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK delete from t0; * insert ...; update ...; -- see events w/ same pk merged (if -m option) delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU update ...; update ...; -- text requires -m flag set @a = repeat('a',256); -- inline size set @b = repeat('b',2000); -- part size set @c = repeat('c',2000*30); -- 30 parts -- update the text field using combinations of @a, @b, @c ... * you should see the data popping up in the example window * */ Loading @@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnName, const int noEventColumnName); const int noEventColumnName, bool merge_events); int main(int argc, char** argv) { ndb_init(); bool merge_events = argc > 1 && strcmp(argv[1], "-m") == 0; bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0; #ifdef VM_TRACE bool dbug = argc > 1 && strchr(argv[1], 'd') != 0; if (dbug) DBUG_PUSH("d:t:"); if (dbug) putenv("API_SIGNAL_LOG=-"); #endif Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(); // Object representing the cluster Loading Loading @@ -134,12 +157,13 @@ int main(int argc, char** argv) const char *eventName= "CHNG_IN_t0"; const char *eventTableName= "t0"; const int noEventColumnName= 4; const int noEventColumnName= 5; const char *eventColumnName[noEventColumnName]= {"c0", "c1", "c2", "c3" "c3", "c4" }; // Create events Loading @@ -147,9 +171,14 @@ int main(int argc, char** argv) eventName, eventTableName, eventColumnName, noEventColumnName); noEventColumnName, merge_events); int j= 0; // Normal values and blobs are unfortunately handled differently.. typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH; int i, j, k, l; j = 0; while (j < 99) { // Start "transaction" for handling events Loading @@ -160,12 +189,17 @@ int main(int argc, char** argv) op->mergeEvents(merge_events); printf("get values\n"); NdbRecAttr* recAttr[noEventColumnName]; NdbRecAttr* recAttrPre[noEventColumnName]; RA_BH recAttr[noEventColumnName]; RA_BH recAttrPre[noEventColumnName]; // primary keys should always be a part of the result for (int i = 0; i < noEventColumnName; i++) { recAttr[i] = op->getValue(eventColumnName[i]); recAttrPre[i] = op->getPreValue(eventColumnName[i]); for (i = 0; i < noEventColumnName; i++) { if (i < 4) { recAttr[i].ra = op->getValue(eventColumnName[i]); recAttrPre[i].ra = op->getPreValue(eventColumnName[i]); } else if (merge_events) { recAttr[i].bh = op->getBlobHandle(eventColumnName[i]); recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]); } } // set up the callbacks Loading @@ -174,13 +208,16 @@ int main(int argc, char** argv) if (op->execute()) APIERROR(op->getNdbError()); int i= 0; NdbEventOperation* the_op = op; i= 0; while (i < 40) { // printf("now waiting for event...\n"); int r = myNdb->pollEvents(1000); // wait for event or 1000 ms if (r > 0) { // printf("got data! %d\n", r); while ((op= myNdb->nextEvent())) { assert(the_op == op); i++; switch (op->getEventType()) { case NdbDictionary::Event::TE_INSERT: Loading @@ -195,40 +232,66 @@ int main(int argc, char** argv) default: abort(); // should not happen } printf(" gci=%d\n", op->getGCI()); printf("post: "); for (int i = 0; i < noEventColumnName; i++) { if (recAttr[i]->isNULL() >= 0) { // we have a value if (recAttr[i]->isNULL() == 0) { // we have a non-null value if (i < 2) printf("%-5u", recAttr[i]->u_32_value()); printf(" gci=%d\n", (int)op->getGCI()); for (k = 0; k <= 1; k++) { printf(k == 0 ? "post: " : "pre : "); for (l = 0; l < noEventColumnName; l++) { if (l < 4) { NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra; if (ra->isNULL() >= 0) { // we have a value if (ra->isNULL() == 0) { // we have a non-null value if (l < 2) printf("%-5u", ra->u_32_value()); else printf("%-5.4s", recAttr[i]->aRef()); } else // we have a null value printf("%-5.4s", ra->aRef()); } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); printf("%-5s", "-"); // no value } else if (merge_events) { int isNull; NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh; bh->getDefined(isNull); if (isNull >= 0) { // we have a value if (! isNull) { // we have a non-null value Uint64 length = 0; bh->getLength(length); // read into buffer unsigned char* buf = new unsigned char [length]; memset(buf, 'X', length); Uint32 n = length; bh->readData(buf, n); // n is in/out assert(n == length); // pretty-print bool first = true; Uint32 i = 0; while (i < n) { unsigned char c = buf[i++]; Uint32 m = 1; while (i < n && buf[i] == c) i++, m++; if (! first) printf("+"); printf("%u%c", m, c); first = false; } printf("\npre : "); for (int i = 0; i < noEventColumnName; i++) { if (recAttrPre[i]->isNULL() >= 0) { // we have a value if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value if (i < 2) printf("%-5u", recAttrPre[i]->u_32_value()); else printf("%-5.4s", recAttrPre[i]->aRef()); } else // we have a null value printf("[%u]", n); delete [] buf; } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); printf("%-5s", "-"); // no value } } printf("\n"); } } } else ;//printf("timed out\n"); } // don't want to listen to events anymore if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError()); if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError()); the_op = 0; j++; } Loading @@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnNames, const int noEventColumnNames) const int noEventColumnNames, bool merge_events) { NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); if (!myDict) APIERROR(myNdb->getNdbError()); Loading @@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb, // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); myEvent.addEventColumns(noEventColumnNames, eventColumnNames); myEvent.mergeEvents(merge_events); // Add event to database if (myDict->createEvent(myEvent) == 0) Loading Loading
storage/ndb/include/ndbapi/NdbBlob.hpp +37 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ class NdbOperation; class NdbRecAttr; class NdbTableImpl; class NdbColumnImpl; class NdbEventOperationImpl; /** * @class NdbBlob Loading Loading @@ -71,6 +72,10 @@ class NdbColumnImpl; * writes. It avoids execute penalty if nothing is pending. It is not * needed after execute (obviously) or after next scan result. * * NdbBlob also supports reading post or pre blob data from events. The * handle can be read after next event on main table has been retrieved. * The data is available immediately. See NdbEventOperation. * * NdbBlob methods return -1 on error and 0 on success, and use output * parameters when necessary. * Loading Loading @@ -145,6 +150,12 @@ public: * then the callback is invoked. */ int setActiveHook(ActiveHook* activeHook, void* arg); /** * Check if blob value is defined (NULL or not). Used as first call * on event based blob. The argument is set to -1 for not defined. * Unlike getNull() this does not cause error on the handle. */ int getDefined(int& isNull); /** * Check if blob is null. */ Loading Loading @@ -191,6 +202,11 @@ public: * Get blob parts table name. Useful only to test programs. */ static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName); /** * Get blob event name. The blob event is created if the main event * monitors the blob column. The name includes main event name. */ static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName); /** * Return error object. The error may be blob specific (below) or may * be copied from a failed implicit operation. Loading @@ -217,17 +233,29 @@ private: friend class NdbScanOperation; friend class NdbDictionaryImpl; friend class NdbResultSet; // atNextResult friend class NdbEventBuffer; friend class NdbEventOperationImpl; #endif // state State theState; void setState(State newState); // quick and dirty support for events (consider subclassing) int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event // define blob table static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c); static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c); static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c); // ndb api stuff Ndb* theNdb; NdbTransaction* theNdbCon; NdbOperation* theNdbOp; NdbEventOperationImpl* theEventOp; NdbEventOperationImpl* theBlobEventOp; NdbRecAttr* theBlobEventPkRecAttr; NdbRecAttr* theBlobEventDistRecAttr; NdbRecAttr* theBlobEventPartRecAttr; NdbRecAttr* theBlobEventDataRecAttr; const NdbTableImpl* theTable; const NdbTableImpl* theAccessTable; const NdbTableImpl* theBlobTable; Loading Loading @@ -263,6 +291,8 @@ private: Buf theHeadInlineBuf; Buf theHeadInlineCopyBuf; // for writeTuple Buf thePartBuf; Buf theBlobEventDataBuf; Uint32 thePartNumber; // for event Head* theHead; char* theInlineData; NdbRecAttr* theHeadInlineRecAttr; Loading Loading @@ -306,6 +336,8 @@ private: int readDataPrivate(char* buf, Uint32& bytes); int writeDataPrivate(const char* buf, Uint32 bytes); int readParts(char* buf, Uint32 part, Uint32 count); int readTableParts(char* buf, Uint32 part, Uint32 count); int readEventParts(char* buf, Uint32 part, Uint32 count); int insertParts(const char* buf, Uint32 part, Uint32 count); int updateParts(const char* buf, Uint32 part, Uint32 count); int deleteParts(Uint32 part, Uint32 count); Loading @@ -317,19 +349,23 @@ private: int invokeActiveHook(); // blob handle maintenance int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn); int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version); int prepareColumn(); int preExecute(NdbTransaction::ExecType anExecType, bool& batch); int postExecute(NdbTransaction::ExecType anExecType); int preCommit(); int atNextResult(); int atNextEvent(); // errors void setErrorCode(int anErrorCode, bool invalidFlag = true); void setErrorCode(NdbOperation* anOp, bool invalidFlag = true); void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true); void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true); #ifdef VM_TRACE int getOperationType() const; friend class NdbOut& operator<<(NdbOut&, const NdbBlob&); #endif // list stuff void next(NdbBlob* obj) { theNext= obj;} NdbBlob* next() { return theNext;} friend struct Ndb_free_list_t<NdbBlob>; Loading
storage/ndb/include/ndbapi/NdbDictionary.hpp +19 −1 Original line number Diff line number Diff line Loading @@ -1124,7 +1124,7 @@ public: _TE_NODE_FAILURE=10, _TE_SUBSCRIBE=11, _TE_UNSUBSCRIBE=12, _TE_NUL=13 // internal (INS o DEL within same GCI) _TE_NUL=13 // internal (e.g. INS o DEL within same GCI) }; #endif /** Loading Loading @@ -1261,6 +1261,24 @@ public: */ int getNoOfEventColumns() const; /** * The merge events flag is false by default. Setting it true * implies that events are merged in following ways: * * - for given NdbEventOperation associated with this event, * events on same PK within same GCI are merged into single event * * - a blob table event is created for each blob attribute * and blob events are handled as part of main table events * * - blob post/pre data from the blob part events can be read * via NdbBlob methods as a single value * * NOTE: Currently this flag is not inherited by NdbEventOperation * and must be set on NdbEventOperation explicitly. */ void mergeEvents(bool flag); /** * Get object status */ Loading
storage/ndb/include/ndbapi/NdbEventOperation.hpp +8 −0 Original line number Diff line number Diff line Loading @@ -150,6 +150,14 @@ public: */ NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0); /** * These methods replace getValue/getPreValue for blobs. Each * method creates a blob handle NdbBlob. The handle supports only * read operations. See NdbBlob. */ NdbBlob* getBlobHandle(const char *anAttrName); NdbBlob* getPreBlobHandle(const char *anAttrName); int isOverrun() const; /** Loading
storage/ndb/ndbapi-examples/ndbapi_event/Makefile +3 −3 Original line number Diff line number Diff line Loading @@ -4,7 +4,7 @@ OBJS = ndbapi_event.o CXX = g++ -g CFLAGS = -c -Wall -fno-rtti -fno-exceptions CXXFLAGS = DEBUG = DEBUG =# -DVM_TRACE LFLAGS = -Wall TOP_SRCDIR = ../../../.. INCLUDE_DIR = $(TOP_SRCDIR)/storage/ndb/include Loading @@ -16,8 +16,8 @@ SYS_LIB = $(TARGET): $(OBJS) $(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET) $(TARGET).o: $(SRCS) $(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS) $(TARGET).o: $(SRCS) Makefile $(CXX) $(CFLAGS) $(DEBUG) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS) clean: rm -f *.o $(TARGET)
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp +117 −52 Original line number Diff line number Diff line Loading @@ -54,26 +54,32 @@ #include <stdio.h> #include <iostream> #include <unistd.h> #ifdef VM_TRACE #include <my_global.h> #endif #ifndef assert #include <assert.h> #endif /** * * Assume that there is a table t0 which is being updated by * Assume that there is a table which is being updated by * another process (e.g. flexBench -l 0 -stdtables). * We want to monitor what happens with columns c0,c1,c2,c3. * We want to monitor what happens with column values. * * or together with the mysql client; * Or using the mysql client: * * shell> mysql -u root * mysql> create database TEST_DB; * mysql> use TEST_DB; * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4), * mysql> create table t0 * (c0 int, c1 int, c2 char(4), c3 char(4), c4 text, * primary key(c0, c2)) engine ndb charset latin1; * * In another window start ndbapi_event, wait until properly started * insert into t0 values (1, 2, 'a', 'b'); insert into t0 values (3, 4, 'c', 'd'); insert into t0 values (1, 2, 'a', 'b', null); insert into t0 values (3, 4, 'c', 'd', null); update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk update t0 set c3 = 'f'; -- use scan update t0 set c3 = 'F'; -- use scan update to 'same' Loading @@ -81,7 +87,18 @@ update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same' update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK delete from t0; * insert ...; update ...; -- see events w/ same pk merged (if -m option) delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU update ...; update ...; -- text requires -m flag set @a = repeat('a',256); -- inline size set @b = repeat('b',2000); -- part size set @c = repeat('c',2000*30); -- 30 parts -- update the text field using combinations of @a, @b, @c ... * you should see the data popping up in the example window * */ Loading @@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnName, const int noEventColumnName); const int noEventColumnName, bool merge_events); int main(int argc, char** argv) { ndb_init(); bool merge_events = argc > 1 && strcmp(argv[1], "-m") == 0; bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0; #ifdef VM_TRACE bool dbug = argc > 1 && strchr(argv[1], 'd') != 0; if (dbug) DBUG_PUSH("d:t:"); if (dbug) putenv("API_SIGNAL_LOG=-"); #endif Ndb_cluster_connection *cluster_connection= new Ndb_cluster_connection(); // Object representing the cluster Loading Loading @@ -134,12 +157,13 @@ int main(int argc, char** argv) const char *eventName= "CHNG_IN_t0"; const char *eventTableName= "t0"; const int noEventColumnName= 4; const int noEventColumnName= 5; const char *eventColumnName[noEventColumnName]= {"c0", "c1", "c2", "c3" "c3", "c4" }; // Create events Loading @@ -147,9 +171,14 @@ int main(int argc, char** argv) eventName, eventTableName, eventColumnName, noEventColumnName); noEventColumnName, merge_events); int j= 0; // Normal values and blobs are unfortunately handled differently.. typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH; int i, j, k, l; j = 0; while (j < 99) { // Start "transaction" for handling events Loading @@ -160,12 +189,17 @@ int main(int argc, char** argv) op->mergeEvents(merge_events); printf("get values\n"); NdbRecAttr* recAttr[noEventColumnName]; NdbRecAttr* recAttrPre[noEventColumnName]; RA_BH recAttr[noEventColumnName]; RA_BH recAttrPre[noEventColumnName]; // primary keys should always be a part of the result for (int i = 0; i < noEventColumnName; i++) { recAttr[i] = op->getValue(eventColumnName[i]); recAttrPre[i] = op->getPreValue(eventColumnName[i]); for (i = 0; i < noEventColumnName; i++) { if (i < 4) { recAttr[i].ra = op->getValue(eventColumnName[i]); recAttrPre[i].ra = op->getPreValue(eventColumnName[i]); } else if (merge_events) { recAttr[i].bh = op->getBlobHandle(eventColumnName[i]); recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]); } } // set up the callbacks Loading @@ -174,13 +208,16 @@ int main(int argc, char** argv) if (op->execute()) APIERROR(op->getNdbError()); int i= 0; NdbEventOperation* the_op = op; i= 0; while (i < 40) { // printf("now waiting for event...\n"); int r = myNdb->pollEvents(1000); // wait for event or 1000 ms if (r > 0) { // printf("got data! %d\n", r); while ((op= myNdb->nextEvent())) { assert(the_op == op); i++; switch (op->getEventType()) { case NdbDictionary::Event::TE_INSERT: Loading @@ -195,40 +232,66 @@ int main(int argc, char** argv) default: abort(); // should not happen } printf(" gci=%d\n", op->getGCI()); printf("post: "); for (int i = 0; i < noEventColumnName; i++) { if (recAttr[i]->isNULL() >= 0) { // we have a value if (recAttr[i]->isNULL() == 0) { // we have a non-null value if (i < 2) printf("%-5u", recAttr[i]->u_32_value()); printf(" gci=%d\n", (int)op->getGCI()); for (k = 0; k <= 1; k++) { printf(k == 0 ? "post: " : "pre : "); for (l = 0; l < noEventColumnName; l++) { if (l < 4) { NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra; if (ra->isNULL() >= 0) { // we have a value if (ra->isNULL() == 0) { // we have a non-null value if (l < 2) printf("%-5u", ra->u_32_value()); else printf("%-5.4s", recAttr[i]->aRef()); } else // we have a null value printf("%-5.4s", ra->aRef()); } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); printf("%-5s", "-"); // no value } else if (merge_events) { int isNull; NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh; bh->getDefined(isNull); if (isNull >= 0) { // we have a value if (! isNull) { // we have a non-null value Uint64 length = 0; bh->getLength(length); // read into buffer unsigned char* buf = new unsigned char [length]; memset(buf, 'X', length); Uint32 n = length; bh->readData(buf, n); // n is in/out assert(n == length); // pretty-print bool first = true; Uint32 i = 0; while (i < n) { unsigned char c = buf[i++]; Uint32 m = 1; while (i < n && buf[i] == c) i++, m++; if (! first) printf("+"); printf("%u%c", m, c); first = false; } printf("\npre : "); for (int i = 0; i < noEventColumnName; i++) { if (recAttrPre[i]->isNULL() >= 0) { // we have a value if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value if (i < 2) printf("%-5u", recAttrPre[i]->u_32_value()); else printf("%-5.4s", recAttrPre[i]->aRef()); } else // we have a null value printf("[%u]", n); delete [] buf; } else printf("%-5s", "NULL"); } else printf("%-5s", "-"); printf("%-5s", "-"); // no value } } printf("\n"); } } } else ;//printf("timed out\n"); } // don't want to listen to events anymore if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError()); if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError()); the_op = 0; j++; } Loading @@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb, const char *eventName, const char *eventTableName, const char **eventColumnNames, const int noEventColumnNames) const int noEventColumnNames, bool merge_events) { NdbDictionary::Dictionary *myDict= myNdb->getDictionary(); if (!myDict) APIERROR(myNdb->getNdbError()); Loading @@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb, // myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE); myEvent.addEventColumns(noEventColumnNames, eventColumnNames); myEvent.mergeEvents(merge_events); // Add event to database if (myDict->createEvent(myEvent) == 0) Loading