Commit e698a39d authored by unknown's avatar unknown
Browse files

Merge mysql.com:/space/pekka/ndb/version/my51

into  mysql.com:/space/pekka/ndb/version/my51-rbr


storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  Auto merged
parents 61946d22 efdeeca2
Loading
Loading
Loading
Loading
+37 −1
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ class NdbOperation;
class NdbRecAttr;
class NdbTableImpl;
class NdbColumnImpl;
class NdbEventOperationImpl;

/**
 * @class NdbBlob
@@ -71,6 +72,10 @@ class NdbColumnImpl;
 * writes.  It avoids execute penalty if nothing is pending.  It is not
 * needed after execute (obviously) or after next scan result.
 *
 * NdbBlob also supports reading post or pre blob data from events.  The
 * handle can be read after next event on main table has been retrieved.
 * The data is available immediately.  See NdbEventOperation.
 *
 * NdbBlob methods return -1 on error and 0 on success, and use output
 * parameters when necessary.
 *
@@ -145,6 +150,12 @@ public:
   * then the callback is invoked.
   */
  int setActiveHook(ActiveHook* activeHook, void* arg);
  /**
   * Check if blob value is defined (NULL or not).  Used as first call
   * on event based blob.  The argument is set to -1 for not defined.
   * Unlike getNull() this does not cause error on the handle.
   */
  int getDefined(int& isNull);
  /**
   * Check if blob is null.
   */
@@ -191,6 +202,11 @@ public:
   * Get blob parts table name.  Useful only to test programs.
   */
  static int getBlobTableName(char* btname, Ndb* anNdb, const char* tableName, const char* columnName);
  /**
   * Get blob event name.  The blob event is created if the main event
   * monitors the blob column.  The name includes main event name.
   */
  static int getBlobEventName(char* bename, Ndb* anNdb, const char* eventName, const char* columnName);
  /**
   * Return error object.  The error may be blob specific (below) or may
   * be copied from a failed implicit operation.
@@ -217,17 +233,29 @@ private:
  friend class NdbScanOperation;
  friend class NdbDictionaryImpl;
  friend class NdbResultSet; // atNextResult
  friend class NdbEventBuffer;
  friend class NdbEventOperationImpl;
#endif
  // state
  State theState;
  void setState(State newState);
  // quick and dirty support for events (consider subclassing)
  int theEventBlobVersion; // -1=normal blob 0=post event 1=pre event
  // define blob table
  static void getBlobTableName(char* btname, const NdbTableImpl* t, const NdbColumnImpl* c);
  static void getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnImpl* c);
  static void getBlobEventName(char* bename, const NdbEventImpl* e, const NdbColumnImpl* c);
  static void getBlobEvent(NdbEventImpl& be, const NdbEventImpl* e, const NdbColumnImpl* c);
  // ndb api stuff
  Ndb* theNdb;
  NdbTransaction* theNdbCon;
  NdbOperation* theNdbOp;
  NdbEventOperationImpl* theEventOp;
  NdbEventOperationImpl* theBlobEventOp;
  NdbRecAttr* theBlobEventPkRecAttr;
  NdbRecAttr* theBlobEventDistRecAttr;
  NdbRecAttr* theBlobEventPartRecAttr;
  NdbRecAttr* theBlobEventDataRecAttr;
  const NdbTableImpl* theTable;
  const NdbTableImpl* theAccessTable;
  const NdbTableImpl* theBlobTable;
@@ -263,6 +291,8 @@ private:
  Buf theHeadInlineBuf;
  Buf theHeadInlineCopyBuf;     // for writeTuple
  Buf thePartBuf;
  Buf theBlobEventDataBuf;
  Uint32 thePartNumber;         // for event
  Head* theHead;
  char* theInlineData;
  NdbRecAttr* theHeadInlineRecAttr;
@@ -306,6 +336,8 @@ private:
  int readDataPrivate(char* buf, Uint32& bytes);
  int writeDataPrivate(const char* buf, Uint32 bytes);
  int readParts(char* buf, Uint32 part, Uint32 count);
  int readTableParts(char* buf, Uint32 part, Uint32 count);
  int readEventParts(char* buf, Uint32 part, Uint32 count);
  int insertParts(const char* buf, Uint32 part, Uint32 count);
  int updateParts(const char* buf, Uint32 part, Uint32 count);
  int deleteParts(Uint32 part, Uint32 count);
@@ -317,19 +349,23 @@ private:
  int invokeActiveHook();
  // blob handle maintenance
  int atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl* aColumn);
  int atPrepare(NdbEventOperationImpl* anOp, NdbEventOperationImpl* aBlobOp, const NdbColumnImpl* aColumn, int version);
  int prepareColumn();
  int preExecute(NdbTransaction::ExecType anExecType, bool& batch);
  int postExecute(NdbTransaction::ExecType anExecType);
  int preCommit();
  int atNextResult();
  int atNextEvent();
  // errors
  void setErrorCode(int anErrorCode, bool invalidFlag = true);
  void setErrorCode(NdbOperation* anOp, bool invalidFlag = true);
  void setErrorCode(NdbTransaction* aCon, bool invalidFlag = true);
  void setErrorCode(NdbEventOperationImpl* anOp, bool invalidFlag = true);
#ifdef VM_TRACE
  int getOperationType() const;
  friend class NdbOut& operator<<(NdbOut&, const NdbBlob&);
#endif

  // list stuff
  void next(NdbBlob* obj) { theNext= obj;}
  NdbBlob* next() { return theNext;}
  friend struct Ndb_free_list_t<NdbBlob>;
+19 −1
Original line number Diff line number Diff line
@@ -1124,7 +1124,7 @@ public:
      _TE_NODE_FAILURE=10,
      _TE_SUBSCRIBE=11,
      _TE_UNSUBSCRIBE=12,
      _TE_NUL=13 // internal (INS o DEL within same GCI)
      _TE_NUL=13 // internal (e.g. INS o DEL within same GCI)
    };
#endif
    /**
@@ -1261,6 +1261,24 @@ public:
     */
    int getNoOfEventColumns() const;

    /**
     * The merge events flag is false by default.  Setting it true
     * implies that events are merged in following ways:
     *
     * - for given NdbEventOperation associated with this event,
     *   events on same PK within same GCI are merged into single event
     *
     * - a blob table event is created for each blob attribute
     *   and blob events are handled as part of main table events
     *
     * - blob post/pre data from the blob part events can be read
     *   via NdbBlob methods as a single value
     *
     * NOTE: Currently this flag is not inherited by NdbEventOperation
     * and must be set on NdbEventOperation explicitly.
     */
    void mergeEvents(bool flag);

    /**
     * Get object status
     */
+8 −0
Original line number Diff line number Diff line
@@ -150,6 +150,14 @@ public:
   */
  NdbRecAttr *getPreValue(const char *anAttrName, char *aValue = 0);

  /**
   * These methods replace getValue/getPreValue for blobs.  Each
   * method creates a blob handle NdbBlob.  The handle supports only
   * read operations.  See NdbBlob.
   */
  NdbBlob* getBlobHandle(const char *anAttrName);
  NdbBlob* getPreBlobHandle(const char *anAttrName);

  int isOverrun() const;

  /**
+3 −3
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ OBJS = ndbapi_event.o
CXX = g++ -g
CFLAGS = -c -Wall -fno-rtti -fno-exceptions
CXXFLAGS = 
DEBUG = 
DEBUG =# -DVM_TRACE
LFLAGS = -Wall
TOP_SRCDIR = ../../../..
INCLUDE_DIR = $(TOP_SRCDIR)/storage/ndb/include
@@ -16,8 +16,8 @@ SYS_LIB =
$(TARGET): $(OBJS)
	$(CXX) $(CXXFLAGS) $(LFLAGS) $(LIB_DIR) $(OBJS) -lndbclient -lmysqlclient_r -lmysys -lmystrings -lz $(SYS_LIB) -o $(TARGET)

$(TARGET).o: $(SRCS)
	$(CXX) $(CFLAGS) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)
$(TARGET).o: $(SRCS) Makefile
	$(CXX) $(CFLAGS) $(DEBUG) -I$(INCLUDE_DIR) -I$(INCLUDE_DIR)/ndbapi -I$(TOP_SRCDIR)/include $(SRCS)

clean:
	rm -f *.o $(TARGET)
+117 −52
Original line number Diff line number Diff line
@@ -54,26 +54,32 @@
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#ifdef VM_TRACE
#include <my_global.h>
#endif
#ifndef assert
#include <assert.h>
#endif


/**
 *
 * Assume that there is a table t0 which is being updated by 
 * Assume that there is a table which is being updated by 
 * another process (e.g. flexBench -l 0 -stdtables).
 * We want to monitor what happens with columns c0,c1,c2,c3.
 * We want to monitor what happens with column values.
 *
 * or together with the mysql client;
 * Or using the mysql client:
 *
 * shell> mysql -u root
 * mysql> create database TEST_DB;
 * mysql> use TEST_DB;
 * mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
 * mysql> create table t0
 *        (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
 *        primary key(c0, c2)) engine ndb charset latin1;
 *
 * In another window start ndbapi_event, wait until properly started
 *
   insert into t0 values (1, 2, 'a', 'b');
   insert into t0 values (3, 4, 'c', 'd');
 
   insert into t0 values (1, 2, 'a', 'b', null);
   insert into t0 values (3, 4, 'c', 'd', null);
   update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
   update t0 set c3 = 'f'; -- use scan
   update t0 set c3 = 'F'; -- use scan update to 'same'
@@ -81,7 +87,18 @@
   update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
   update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
   delete from t0;
 *

   insert ...; update ...; -- see events w/ same pk merged (if -m option)
   delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
   update ...; update ...;

   -- text requires -m flag
   set @a = repeat('a',256); -- inline size
   set @b = repeat('b',2000); -- part size
   set @c = repeat('c',2000*30); -- 30 parts

   -- update the text field using combinations of @a, @b, @c ...
 
 * you should see the data popping up in the example window
 *
 */
@@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb,
		  const char *eventName,
		  const char *eventTableName,
		  const char **eventColumnName,
		  const int noEventColumnName);
		  const int noEventColumnName,
                  bool merge_events);

int main(int argc, char** argv)
{
  ndb_init();
  bool merge_events = argc > 1 && strcmp(argv[1], "-m") == 0;
  bool merge_events = argc > 1 && strchr(argv[1], 'm') != 0;
#ifdef VM_TRACE
  bool dbug = argc > 1 && strchr(argv[1], 'd') != 0;
  if (dbug) DBUG_PUSH("d:t:");
  if (dbug) putenv("API_SIGNAL_LOG=-");
#endif

  Ndb_cluster_connection *cluster_connection=
    new Ndb_cluster_connection(); // Object representing the cluster
@@ -134,12 +157,13 @@ int main(int argc, char** argv)

  const char *eventName= "CHNG_IN_t0";
  const char *eventTableName= "t0";
  const int noEventColumnName= 4;
  const int noEventColumnName= 5;
  const char *eventColumnName[noEventColumnName]=
    {"c0",
     "c1",
     "c2",
     "c3"
     "c3",
     "c4"
    };
  
  // Create events
@@ -147,9 +171,14 @@ int main(int argc, char** argv)
		eventName,
		eventTableName,
		eventColumnName,
		noEventColumnName);
		noEventColumnName,
                merge_events);

  int j= 0;
  // Normal values and blobs are unfortunately handled differently..
  typedef union { NdbRecAttr* ra; NdbBlob* bh; } RA_BH;

  int i, j, k, l;
  j = 0;
  while (j < 99) {

    // Start "transaction" for handling events
@@ -160,12 +189,17 @@ int main(int argc, char** argv)
    op->mergeEvents(merge_events);

    printf("get values\n");
    NdbRecAttr* recAttr[noEventColumnName];
    NdbRecAttr* recAttrPre[noEventColumnName];
    RA_BH recAttr[noEventColumnName];
    RA_BH recAttrPre[noEventColumnName];
    // primary keys should always be a part of the result
    for (int i = 0; i < noEventColumnName; i++) {
      recAttr[i]    = op->getValue(eventColumnName[i]);
      recAttrPre[i] = op->getPreValue(eventColumnName[i]);
    for (i = 0; i < noEventColumnName; i++) {
      if (i < 4) {
        recAttr[i].ra    = op->getValue(eventColumnName[i]);
        recAttrPre[i].ra = op->getPreValue(eventColumnName[i]);
      } else if (merge_events) {
        recAttr[i].bh    = op->getBlobHandle(eventColumnName[i]);
        recAttrPre[i].bh = op->getPreBlobHandle(eventColumnName[i]);
      }
    }

    // set up the callbacks
@@ -174,13 +208,16 @@ int main(int argc, char** argv)
    if (op->execute())
      APIERROR(op->getNdbError());

    int i= 0;
    NdbEventOperation* the_op = op;

    i= 0;
    while (i < 40) {
      // printf("now waiting for event...\n");
      int r = myNdb->pollEvents(1000); // wait for event or 1000 ms
      if (r > 0) {
	// printf("got data! %d\n", r);
	while ((op= myNdb->nextEvent())) {
          assert(the_op == op);
	  i++;
	  switch (op->getEventType()) {
	  case NdbDictionary::Event::TE_INSERT:
@@ -195,40 +232,66 @@ int main(int argc, char** argv)
	  default:
	    abort(); // should not happen
	  }
          printf(" gci=%d\n", op->getGCI());
          printf("post:  ");
	  for (int i = 0; i < noEventColumnName; i++) {
	    if (recAttr[i]->isNULL() >= 0) { // we have a value
	      if (recAttr[i]->isNULL() == 0) { // we have a non-null value
                if (i < 2)
                  printf("%-5u", recAttr[i]->u_32_value());
          printf(" gci=%d\n", (int)op->getGCI());
          for (k = 0; k <= 1; k++) {
            printf(k == 0 ? "post: " : "pre : ");
            for (l = 0; l < noEventColumnName; l++) {
              if (l < 4) {
                NdbRecAttr* ra = k == 0 ? recAttr[l].ra : recAttrPre[l].ra;
                if (ra->isNULL() >= 0) { // we have a value
                  if (ra->isNULL() == 0) { // we have a non-null value
                    if (l < 2)
                      printf("%-5u", ra->u_32_value());
                    else
                  printf("%-5.4s", recAttr[i]->aRef());
              } else                           // we have a null value
                      printf("%-5.4s", ra->aRef());
                  } else
                    printf("%-5s", "NULL");
                } else
              printf("%-5s", "-");
                  printf("%-5s", "-"); // no value
              } else if (merge_events) {
                int isNull;
                NdbBlob* bh = k == 0 ? recAttr[l].bh : recAttrPre[l].bh;
                bh->getDefined(isNull);
                if (isNull >= 0) { // we have a value
                  if (! isNull) { // we have a non-null value
                    Uint64 length = 0;
                    bh->getLength(length);
                    // read into buffer
                    unsigned char* buf = new unsigned char [length];
                    memset(buf, 'X', length);
                    Uint32 n = length;
                    bh->readData(buf, n); // n is in/out
                    assert(n == length);
                    // pretty-print
                    bool first = true;
                    Uint32 i = 0;
                    while (i < n) {
                      unsigned char c = buf[i++];
                      Uint32 m = 1;
                      while (i < n && buf[i] == c)
                        i++, m++;
                      if (! first)
                        printf("+");
                      printf("%u%c", m, c);
                      first = false;
                    }
          printf("\npre :  ");
	  for (int i = 0; i < noEventColumnName; i++) {
	    if (recAttrPre[i]->isNULL() >= 0) { // we have a value
	      if (recAttrPre[i]->isNULL() == 0) { // we have a non-null value
                if (i < 2)
                  printf("%-5u", recAttrPre[i]->u_32_value());
                else
                  printf("%-5.4s", recAttrPre[i]->aRef());
              } else                              // we have a null value
                    printf("[%u]", n);
                    delete [] buf;
                  } else
                    printf("%-5s", "NULL");
                } else
              printf("%-5s", "-");
                  printf("%-5s", "-"); // no value
              }
            }
            printf("\n");
          }
	}
      } else
	;//printf("timed out\n");
    }
    // don't want to listen to events anymore
    if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError());
    if (myNdb->dropEventOperation(the_op)) APIERROR(myNdb->getNdbError());
    the_op = 0;

    j++;
  }
@@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb,
		  const char *eventName,
		  const char *eventTableName,
		  const char **eventColumnNames,
		  const int noEventColumnNames)
		  const int noEventColumnNames,
                  bool merge_events)
{
  NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
  if (!myDict) APIERROR(myNdb->getNdbError());
@@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb,
  //  myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);

  myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
  myEvent.mergeEvents(merge_events);

  // Add event to database
  if (myDict->createEvent(myEvent) == 0)
Loading