Commit 8fcb05ee authored by unknown's avatar unknown
Browse files

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new

parents f8e16cfb 87e75980
Loading
Loading
Loading
Loading
+69 −8
Original line number Diff line number Diff line
@@ -1117,7 +1117,7 @@ NdbEventBuffer::nextEvent()
    m_available_data.remove_first();

    // add it to used list
    m_used_data.append(data);
    m_used_data.append_used_data(data);

#ifdef VM_TRACE
    op->m_data_done_count++;
@@ -1144,6 +1144,10 @@ NdbEventBuffer::nextEvent()
          (void)tBlob->atNextEvent();
          tBlob = tBlob->theNext;
        }
        EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
        while (gci_ops && op->getGCI() > gci_ops->m_gci)
          gci_ops = m_available_data.next_gci_ops();
        assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
	DBUG_RETURN_EVENT(op->m_facade);
      }
      // the next event belonged to an event op that is no
@@ -1158,15 +1162,21 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE
  m_latest_command= m_latest_command_save;
#endif

  // free all "per gci unique" collected operations
  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
  while (gci_ops)
    gci_ops = m_available_data.next_gci_ops();
  DBUG_RETURN_EVENT(0);
}

NdbEventOperationImpl*
NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{
  if (*iter < m_available_data.m_gci_op_count)
  EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
  if (*iter < gci_ops->m_gci_op_count)
  {
    EventBufData_list::Gci_op g = m_available_data.m_gci_op_list[(*iter)++];
    EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
    if (event_types != NULL)
      *event_types = g.event_types;
    return g.op;
@@ -1318,7 +1328,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
#ifdef VM_TRACE
	assert(bucket->m_data.m_count);
#endif
	m_complete_data.m_data.append(bucket->m_data);
	m_complete_data.m_data.append_list(&bucket->m_data, gci);
      }
      reportStatus();
      bzero(bucket, sizeof(Gci_container));
@@ -1389,7 +1399,7 @@ NdbEventBuffer::complete_outof_order_gcis()
#ifdef VM_TRACE
      assert(bucket->m_data.m_count);
#endif
      m_complete_data.m_data.append(bucket->m_data);
      m_complete_data.m_data.append_list(&bucket->m_data, start_gci);
#ifdef VM_TRACE
      ndbout_c(" moved %lld rows -> %lld", bucket->m_data.m_count,
	       m_complete_data.m_data.m_count);
@@ -1599,7 +1609,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
      data->m_event_op = op;
      if (! is_blob_event || ! is_data_event)
      {
        bucket->m_data.append(data);
        bucket->m_data.append_data(data);
      }
      else
      {
@@ -1615,7 +1625,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
        if (ret != 0) // main event was created
        {
          main_data->m_event_op = op->theMainOp;
          bucket->m_data.append(main_data);
          bucket->m_data.append_data(main_data);
          if (use_hash)
          {
            main_data->m_pkhash = main_hpos.pkhash;
@@ -2097,7 +2107,7 @@ NdbEventBuffer::move_data()
  if (!m_complete_data.m_data.is_empty())
  {
    // move this list to last in m_available_data
    m_available_data.append(m_complete_data.m_data);
    m_available_data.append_list(&m_complete_data.m_data, 0);

    bzero(&m_complete_data, sizeof(m_complete_data));
  }
@@ -2160,6 +2170,19 @@ NdbEventBuffer::free_list(EventBufData_list &list)
  list.m_count = list.m_sz = 0;
}

void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
{
  move_gci_ops(list, gci);

  if (m_tail)
    m_tail->m_next= list->m_head;
  else
    m_head= list->m_head;
  m_tail= list->m_tail;
  m_count+= list->m_count;
  m_sz+= list->m_sz;
}

void
EventBufData_list::add_gci_op(Gci_op g)
{
@@ -2188,6 +2211,44 @@ EventBufData_list::add_gci_op(Gci_op g)
  }
}

void
EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
{
  assert(!m_is_not_multi_list);
  if (!list->m_is_not_multi_list)
  {
    assert(gci == 0);
    if (m_gci_ops_list_tail)
      m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
    else
    {
      m_gci_ops_list =  list->m_gci_ops_list;
    }
    m_gci_ops_list_tail = list->m_gci_ops_list_tail;
    goto end;
  }
  {
    Gci_ops *new_gci_ops = new Gci_ops;
    if (m_gci_ops_list_tail)
      m_gci_ops_list_tail->m_next = new_gci_ops;
    else
    {
      assert(m_gci_ops_list == 0);
      m_gci_ops_list = new_gci_ops;
    }
    m_gci_ops_list_tail = new_gci_ops;
    
    new_gci_ops->m_gci_op_list = list->m_gci_op_list;
    new_gci_ops->m_gci_op_count = list->m_gci_op_count;
    new_gci_ops->m_gci = gci;
    new_gci_ops->m_next = 0;
  }
end:
  list->m_gci_op_list = 0;
  list->m_gci_ops_list_tail = 0;
  list->m_gci_op_alloc = 0;
}

NdbEventOperation*
NdbEventBuffer::createEventOperation(const char* eventName,
				     NdbError &theError)
+93 −24
Original line number Diff line number Diff line
@@ -68,8 +68,12 @@ public:
  ~EventBufData_list();

  void remove_first();
  void append(EventBufData *data);
  void append(const EventBufData_list &list);
  // append data and insert data into Gci_op list with add_gci_op
  void append_data(EventBufData *data);
  // append data and insert data but ignore Gci_op list
  void append_used_data(EventBufData *data);
  // append list to another, will call move_gci_ops
  void append_list(EventBufData_list *list, Uint64 gci);

  int is_empty();

@@ -77,13 +81,60 @@ public:
  unsigned m_count;
  unsigned m_sz;

  // distinct ops per gci (assume no hash needed)
  struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; };
  /*
    distinct ops per gci (assume no hash needed)

    list may be in 2 versions

    1. single list with on gci only
    - one linear array
    Gci_op  *m_gci_op_list;
    Uint32 m_gci_op_count;
  Uint32 m_gci_op_alloc;
    Uint32 m_gci_op_alloc != 0;

    2. multi list with several gcis
    - linked list of gci's
    - one linear array per gci
    Gci_ops *m_gci_ops_list;
    Gci_ops *m_gci_ops_list_tail;
    Uint32 m_is_not_multi_list == 0;

  */
  struct Gci_op                 // 1 + 2
  {
    NdbEventOperationImpl* op;
    Uint32 event_types;
  };
  struct Gci_ops                // 2
  {
    Uint64 m_gci;
    Gci_op *m_gci_op_list;
    Gci_ops *m_next;
    Uint32 m_gci_op_count;
  };
  union
  {
    Gci_op  *m_gci_op_list;      // 1
    Gci_ops *m_gci_ops_list;     // 2
  };
  union
  {
    Uint32 m_gci_op_count;       // 1
    Gci_ops *m_gci_ops_list_tail;// 2
  };
  union
  {
    Uint32 m_gci_op_alloc;       // 1
    Uint32 m_is_not_multi_list;  // 2
  };
  Gci_ops *first_gci_ops();
  Gci_ops *next_gci_ops();
private:
  // case 1 above; add Gci_op to single list
  void add_gci_op(Gci_op g);
  // case 2 above; move single list or multi list from
  // one list to another
  void move_gci_ops(EventBufData_list *list, Uint64 gci);
};

inline
@@ -92,7 +143,7 @@ EventBufData_list::EventBufData_list()
    m_count(0),
    m_sz(0),
    m_gci_op_list(NULL),
    m_gci_op_count(0),
    m_gci_ops_list_tail(0),
    m_gci_op_alloc(0)
{
}
@@ -100,7 +151,14 @@ EventBufData_list::EventBufData_list()
inline
EventBufData_list::~EventBufData_list()
{
  if (m_is_not_multi_list)
    delete [] m_gci_op_list;
  else
  {
    Gci_ops *op = first_gci_ops();
    while (op)
      op = next_gci_ops();
  }
}

inline
@@ -120,11 +178,8 @@ void EventBufData_list::remove_first()
}

inline
void EventBufData_list::append(EventBufData *data)
void EventBufData_list::append_used_data(EventBufData *data)
{
  Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
  add_gci_op(g);

  data->m_next= 0;
  if (m_tail)
    m_tail->m_next= data;
@@ -143,19 +198,33 @@ void EventBufData_list::append(EventBufData *data)
}

inline
void EventBufData_list::append(const EventBufData_list &list)
void EventBufData_list::append_data(EventBufData *data)
{
  Uint32 i;
  for (i = 0; i < list.m_gci_op_count; i++)
    add_gci_op(list.m_gci_op_list[i]);
  Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
  add_gci_op(g);

  if (m_tail)
    m_tail->m_next= list.m_head;
  else
    m_head= list.m_head;
  m_tail= list.m_tail;
  m_count+= list.m_count;
  m_sz+= list.m_sz;
  append_used_data(data);
}

inline EventBufData_list::Gci_ops *
EventBufData_list::first_gci_ops()
{
  assert(!m_is_not_multi_list);
  return m_gci_ops_list;
}

inline EventBufData_list::Gci_ops *
EventBufData_list::next_gci_ops()
{
  assert(!m_is_not_multi_list);
  Gci_ops *first = m_gci_ops_list;
  m_gci_ops_list = first->m_next;
  if (first->m_gci_op_list)
    delete [] first->m_gci_op_list;
  delete first;
  if (m_gci_ops_list == 0)
    m_gci_ops_list_tail = 0;
  return m_gci_ops_list;
}

// GCI bucket has also a hash over data, with key event op, table PK.