Commit e0cbeea1 authored by unknown's avatar unknown
Browse files

ndb - wl#3023 : pass tables per GCI to injector at epoch start


sql/ha_ndbcluster_binlog.cc:
  use_table at beginning of epoch
storage/ndb/include/ndbapi/Ndb.hpp:
  getGCIEventOperations: return distinct event ops at epoch start
storage/ndb/src/ndbapi/Ndb.cpp:
  getGCIEventOperations: return distinct event ops at epoch start
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  getGCIEventOperations: return distinct event ops at epoch start
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp:
  getGCIEventOperations: return distinct event ops at epoch start
parent 5cdf49f0
Loading
Loading
Loading
Loading
+31 −0
Original line number Diff line number Diff line
@@ -2829,10 +2829,41 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
        assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
        bzero((char*) &row, sizeof(row));
        injector::transaction trans= inj->new_trans(thd);
        { // pass table map before epoch
          Uint32 iter=0;
          const NdbEventOperation* gci_op;
          Uint32 event_types;
          while ((gci_op=ndb->getGCIEventOperations(&iter, &event_types))
              != NULL)
          {
            NDB_SHARE* share=(NDB_SHARE*)gci_op->getCustomData();
            DBUG_PRINT("info", ("per gci op %p share %p event types 0x%x",
                                gci_op, share, event_types));
            // this should not happen
            if (share == NULL || share->table == NULL)
            {
              DBUG_PRINT("info", ("no share or table !"));
              continue;
            }
            TABLE* table=share->table;
            const LEX_STRING& name=table->s->table_name;
            DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
            injector::transaction::table tbl(table, true);
            // TODO enable when mats patch pushed
            //trans.use_table(::server_id, tbl);
          }
        }
        gci= pOp->getGCI();
        if (apply_status_share)
        {
          TABLE *table= apply_status_share->table;

          const LEX_STRING& name=table->s->table_name;
          DBUG_PRINT("info", ("use_table: %.*s", name.length, name.str));
          injector::transaction::table tbl(table, true);
          // TODO enable when mats patch pushed
          //trans.use_table(::server_id, tbl);

          MY_BITMAP b;
          uint32 bitbuf;
          DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
+12 −0
Original line number Diff line number Diff line
@@ -1240,6 +1240,18 @@ public:
   */
  NdbEventOperation *nextEvent();

  /**
   * Iterate over distinct event operations which are part of current
   * GCI.  Valid after nextEvent.  Used to get summary information for
   * the epoch (e.g. list of all tables) before processing event data.
   *
   * Set *iter=0 to start.  Returns NULL when no more.  If event_types
   * is not NULL, it returns bitmask of received event types.
   */
  const NdbEventOperation*
    getGCIEventOperations(Uint32* iter, Uint32* event_types);
  

#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
  NdbEventOperation *getEventOperation(NdbEventOperation* eventOp= 0);
  Uint64 getLatestGCI();
+10 −0
Original line number Diff line number Diff line
@@ -1293,6 +1293,16 @@ NdbEventOperation *Ndb::nextEvent()
  return theEventBuffer->nextEvent();
}

const NdbEventOperation*
Ndb::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
  NdbEventOperationImpl* op =
    theEventBuffer->getGCIEventOperations(iter, event_types);
  if (op != NULL)
    return op->m_facade;
  return NULL;
}

Uint64 Ndb::getLatestGCI()
{
  return theEventBuffer->getLatestGCI();
+43 −1
Original line number Diff line number Diff line
@@ -1081,6 +1081,19 @@ NdbEventBuffer::nextEvent()
  DBUG_RETURN_EVENT(0);
}

NdbEventOperationImpl*
NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
  if (*iter < m_available_data.m_gci_op_count)
  {
    EventBufData_list::Gci_op g = m_available_data.m_gci_op_list[(*iter)++];
    if (event_types != NULL)
      *event_types = g.event_types;
    return g.op;
  }
  return NULL;
}

void
NdbEventBuffer::lock()
{
@@ -2061,7 +2074,36 @@ NdbEventBuffer::free_list(EventBufData_list &list)
  }

  // list returned to m_free_data
  new (&list) EventBufData_list;
  list.m_head = list.m_tail = NULL;
  list.m_count = list.m_sz = 0;
}

void
EventBufData_list::add_gci_op(Gci_op g)
{
  assert(g.op != NULL);
  Uint32 i;
  for (i = 0; i < m_gci_op_count; i++) {
    if (m_gci_op_list[i].op == g.op)
      break;
  }
  if (i < m_gci_op_count) {
    m_gci_op_list[i].event_types |= g.event_types;
  } else {
    if (m_gci_op_count == m_gci_op_alloc) {
      Uint32 n = 1 + 2 * m_gci_op_alloc;
      Gci_op* old_list = m_gci_op_list;
      m_gci_op_list = new Gci_op [n];
      if (m_gci_op_alloc != 0) {
        Uint32 bytes = m_gci_op_alloc * sizeof(Gci_op);
        memcpy(m_gci_op_list, old_list, bytes);
        delete [] old_list;
      }
      m_gci_op_alloc = n;
    }
    assert(m_gci_op_count < m_gci_op_alloc);
    m_gci_op_list[m_gci_op_count++] = g;
  }
}

NdbEventOperation*
+22 −2
Original line number Diff line number Diff line
@@ -76,19 +76,31 @@ public:
  EventBufData *m_head, *m_tail;
  unsigned m_count;
  unsigned m_sz;

  // distinct ops per gci (assume no hash needed)
  struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; };
  Gci_op* m_gci_op_list;
  Uint32 m_gci_op_count;
  Uint32 m_gci_op_alloc;
private:
  void add_gci_op(Gci_op g);
};

inline
EventBufData_list::EventBufData_list()
  : m_head(0), m_tail(0),
    m_count(0),
    m_sz(0)
    m_sz(0),
    m_gci_op_list(NULL),
    m_gci_op_count(0),
    m_gci_op_alloc(0)
{
}

inline
EventBufData_list::~EventBufData_list()
{
  delete [] m_gci_op_list;
}

inline
@@ -110,6 +122,9 @@ void EventBufData_list::remove_first()
inline
void EventBufData_list::append(EventBufData *data)
{
  Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
  add_gci_op(g);

  data->m_next= 0;
  if (m_tail)
    m_tail->m_next= data;
@@ -130,6 +145,10 @@ void EventBufData_list::append(EventBufData *data)
inline
void EventBufData_list::append(const EventBufData_list &list)
{
  Uint32 i;
  for (i = 0; i < list.m_gci_op_count; i++)
    add_gci_op(list.m_gci_op_list[i]);

  if (m_tail)
    m_tail->m_next= list.m_head;
  else
@@ -265,7 +284,6 @@ private:
  void receive_data(NdbRecAttr *r, const Uint32 *data, Uint32 sz);
};


class NdbEventBuffer {
public:
  NdbEventBuffer(Ndb*);
@@ -303,6 +321,8 @@ public:

  int pollEvents(int aMillisecondNumber, Uint64 *latestGCI= 0);
  NdbEventOperation *nextEvent();
  NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
                                               Uint32* event_types);

  NdbEventOperationImpl *move_data();