Loading storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +125 −46 Original line number Diff line number Diff line Loading @@ -65,6 +65,39 @@ print_std(const SubTableData * sdata, LinearSectionPtr ptr[3]) } #endif // EventBufData Uint32 EventBufData::get_blob_part_no() const { assert(ptr[0].sz > 2); Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() + AttributeHeader(ptr[0].p[1]).getDataSize(); Uint32 no = ptr[1].p[pos]; return no; } void EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const { Uint32 tmp_count = 0; Uint32 tmp_sz = 0; const EventBufData* data2 = m_next_blob; while (data2 != 0) { tmp_count++; tmp_sz += data2->sz; const EventBufData* data3 = data2->m_next; while (data3 != 0) { tmp_count++; tmp_sz += data3->sz; data3 = data3->m_next; } data2 = data2->m_next_blob; } full_count += tmp_count; full_sz += tmp_sz; } /* * Class NdbEventOperationImpl * Loading Loading @@ -1162,11 +1195,12 @@ NdbEventBuffer::nextEvent() // set NdbEventOperation data op->m_data_item= data; // remove item from m_available_data m_available_data.remove_first(); // remove item from m_available_data and return size Uint32 full_count, full_sz; m_available_data.remove_first(full_count, full_sz); // add it to used list m_used_data.append_used_data(data); m_used_data.append_used_data(data, full_count, full_sz); #ifdef VM_TRACE op->m_data_done_count++; Loading Loading @@ -1840,7 +1874,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, op->m_has_error = 2; DBUG_RETURN_EVENT(-1); } if (unlikely(copy_data(sdata, ptr, data))) if (unlikely(copy_data(sdata, ptr, data, NULL))) { op->m_has_error = 3; DBUG_RETURN_EVENT(-1); Loading Loading @@ -1872,7 +1906,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, } } // link blob event under main event add_blob_data(main_data, data); add_blob_data(bucket, main_data, data); } if (use_hash) { Loading @@ -1886,7 +1920,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, else { // event with same op, PK found, merge into old buffer if (unlikely(merge_data(sdata, ptr, data))) if (unlikely(merge_data(sdata, ptr, data, & bucket->m_data.m_sz))) { op->m_has_error = 3; DBUG_RETURN_EVENT(-1); Loading @@ -1909,6 +1943,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, } } } #ifdef NDB_EVENT_VERIFY_SIZE verify_size(bucket->m_data); #endif DBUG_RETURN_EVENT(0); } Loading Loading @@ -1962,8 +1999,21 @@ NdbEventBuffer::alloc_data() } // remove data from free list if (data->m_next_blob == 0) m_free_data = data->m_next; else { EventBufData* data2 = data->m_next_blob; if (data2->m_next == 0) { data->m_next_blob = data2->m_next_blob; data = data2; } else { EventBufData* data3 = data2->m_next; data2->m_next = data3->m_next; data = data3; } } data->m_next = 0; data->m_next_blob = 0; #ifdef VM_TRACE m_free_data_count--; assert(m_free_data_sz >= data->sz); Loading @@ -1975,7 +2025,9 @@ NdbEventBuffer::alloc_data() // allocate initial or bigger memory area in EventBufData // takes sizes from given ptr and sets up data->ptr int NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3], Uint32 * change_sz) { DBUG_ENTER("NdbEventBuffer::alloc_mem"); DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz)); Loading @@ -1988,6 +2040,8 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) if (data->sz < alloc_size) { Uint32 add_sz = alloc_size - data->sz; NdbMem_Free((char*)data->memory); assert(m_total_alloc >= data->sz); m_total_alloc -= data->sz; Loading @@ -1999,6 +2053,9 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) DBUG_RETURN(-1); data->sz = alloc_size; m_total_alloc += data->sz; if (change_sz != NULL) *change_sz += add_sz; } Uint32* memptr = data->memory; Loading @@ -2014,14 +2071,30 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) DBUG_RETURN(0); } void NdbEventBuffer::dealloc_mem(EventBufData* data, Uint32 * change_sz) { NdbMem_Free((char*)data->memory); assert(m_total_alloc >= data->sz); m_total_alloc -= data->sz; if (change_sz != NULL) { assert(*change_sz >= data->sz); *change_sz -= data->sz; } data->memory = 0; data->sz = 0; } int NdbEventBuffer::copy_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data) EventBufData* data, Uint32 * change_sz) { DBUG_ENTER_EVENT("NdbEventBuffer::copy_data"); if (alloc_mem(data, ptr) != 0) if (alloc_mem(data, ptr, change_sz) != 0) DBUG_RETURN_EVENT(-1); memcpy(data->sdata, sdata, sizeof(SubTableData)); int i; Loading Loading @@ -2093,7 +2166,8 @@ copy_attr(AttributeHeader ah, int NdbEventBuffer::merge_data(const SubTableData * const sdata, LinearSectionPtr ptr2[3], EventBufData* data) EventBufData* data, Uint32 * change_sz) { DBUG_ENTER_EVENT("NdbEventBuffer::merge_data"); Loading @@ -2102,7 +2176,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, int t1 = SubTableData::getOperation(data->sdata->requestInfo); int t2 = SubTableData::getOperation(sdata->requestInfo); if (t1 == Ev_t::enum_NUL) DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data)); DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data, change_sz)); Ev_t* tp = 0; int i; Loading Loading @@ -2142,6 +2216,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, DBUG_RETURN_EVENT(0); } // TODO: use old data items, avoid malloc/free on each merge // save old data EventBufData olddata = *data; data->memory = 0; Loading @@ -2158,7 +2234,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, { if (loop == 1) { if (alloc_mem(data, ptr) != 0) if (alloc_mem(data, ptr, change_sz) != 0) { result = -1; goto end; Loading Loading @@ -2277,11 +2353,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, } end: // free old data NdbMem_Free((char*)olddata.memory); assert(m_total_alloc >= olddata.sz); m_total_alloc -= olddata.sz; dealloc_mem(&olddata, change_sz); DBUG_RETURN_EVENT(result); } Loading Loading @@ -2357,7 +2429,7 @@ NdbEventBuffer::get_main_data(Gci_container* bucket, SubTableData sdata = *blob_data->sdata; sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id; SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL); if (copy_data(&sdata, ptr, main_data) != 0) if (copy_data(&sdata, ptr, main_data, NULL) != 0) DBUG_RETURN_EVENT(-1); hpos.data = main_data; Loading @@ -2365,7 +2437,8 @@ NdbEventBuffer::get_main_data(Gci_container* bucket, } void NdbEventBuffer::add_blob_data(EventBufData* main_data, NdbEventBuffer::add_blob_data(Gci_container* bucket, EventBufData* main_data, EventBufData* blob_data) { DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data"); Loading @@ -2389,6 +2462,9 @@ NdbEventBuffer::add_blob_data(EventBufData* main_data, blob_data->m_next = head->m_next; head->m_next = blob_data; } // adjust data list size bucket->m_data.m_count += 1; bucket->m_data.m_sz += blob_data->sz; DBUG_VOID_RETURN_EVENT; } Loading Loading @@ -2424,6 +2500,9 @@ NdbEventBuffer::move_data() void NdbEventBuffer::free_list(EventBufData_list &list) { #ifdef NDB_EVENT_VERIFY_SIZE verify_size(list); #endif // return list to m_free_data list.m_tail->m_next= m_free_data; m_free_data= list.m_head; Loading @@ -2432,38 +2511,15 @@ NdbEventBuffer::free_list(EventBufData_list &list) #endif m_free_data_sz+= list.m_sz; // free blobs XXX unacceptable performance, fix later { EventBufData* data = list.m_head; while (1) { while (data->m_next_blob != NULL) { EventBufData* blob_head = data->m_next_blob; data->m_next_blob = blob_head->m_next_blob; blob_head->m_next_blob = NULL; while (blob_head != NULL) { EventBufData* blob_part = blob_head; blob_head = blob_head->m_next; blob_part->m_next = m_free_data; m_free_data = blob_part; #ifdef VM_TRACE m_free_data_count++; #endif m_free_data_sz += blob_part->sz; } } if (data == list.m_tail) break; data = data->m_next; } } // list returned to m_free_data list.m_head = list.m_tail = NULL; list.m_count = list.m_sz = 0; } void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) { #ifdef NDB_EVENT_VERIFY_SIZE NdbEventBuffer::verify_size(*list); #endif move_gci_ops(list, gci); if (m_tail) Loading Loading @@ -2702,6 +2758,29 @@ NdbEventBuffer::reportStatus() #endif } #ifdef VM_TRACE void NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz) { Uint32 tmp_count = 0; Uint32 tmp_sz = 0; while (data != 0) { Uint32 full_count, full_sz; data->get_full_size(full_count, full_sz); tmp_count += full_count; tmp_sz += full_sz; data = data->m_next; } assert(tmp_count == count); assert(tmp_sz == sz); } void NdbEventBuffer::verify_size(const EventBufData_list & list) { verify_size(list.m_head, list.m_count, list.m_sz); } #endif // hash table routines // could optimize the all-fixed case Loading storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +79 −31 Original line number Diff line number Diff line Loading @@ -40,6 +40,12 @@ #define DBUG_DUMP_EVENT(A,B,C) #endif #undef NDB_EVENT_VERIFY_SIZE #ifdef VM_TRACE // not much effect on performance, leave on #define NDB_EVENT_VERIFY_SIZE #endif class NdbEventOperationImpl; struct EventBufData Loading @@ -54,9 +60,13 @@ struct EventBufData /* * Blobs are stored in blob list (m_next_blob) where each entry * is list of parts (m_next) in part number order. * is list of parts (m_next). TODO order by part number * * Processed data (m_used_data, m_free_data) keeps the old blob * list intact. It is reconsumed when new data items are needed. * * TODO order by part no and link for fast read and free_list * Data item lists keep track of item count and sum(sz) and * these include both main items and blob parts. */ EventBufData *m_next; // Next wrt to global order or Next blob part Loading @@ -66,14 +76,22 @@ struct EventBufData Uint32 m_pkhash; // PK hash (without op) for fast compare EventBufData() {} // Get blob part number from blob data Uint32 get_blob_part_no() { assert(ptr[0].sz > 2); Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() + AttributeHeader(ptr[0].p[1]).getDataSize(); Uint32 no = ptr[1].p[pos]; return no; Uint32 get_blob_part_no() const; /* * Main item does not include summary of parts (space / performance * tradeoff). The summary is needed when moving single data item. * It is not needed when moving entire list. */ void get_full_size(Uint32 & full_count, Uint32 & full_sz) const { full_count = 1; full_sz = sz; if (m_next_blob != 0) add_part_size(full_count, full_sz); } void add_part_size(Uint32 & full_count, Uint32 & full_sz) const; }; class EventBufData_list Loading @@ -82,19 +100,22 @@ public: EventBufData_list(); ~EventBufData_list(); void remove_first(); // append data and insert data into Gci_op list with add_gci_op void append_data(EventBufData *data); // remove first and return its size void remove_first(Uint32 & full_count, Uint32 & full_sz); // for remove+append avoid double call to get_full_size() void append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz); // append data and insert data but ignore Gci_op list void append_used_data(EventBufData *data); // append data and insert data into Gci_op list with add_gci_op void append_data(EventBufData *data); // append list to another, will call move_gci_ops void append_list(EventBufData_list *list, Uint64 gci); int is_empty(); EventBufData *m_head, *m_tail; unsigned m_count; unsigned m_sz; Uint32 m_count; Uint32 m_sz; /* distinct ops per gci (assume no hash needed) Loading Loading @@ -193,17 +214,22 @@ int EventBufData_list::is_empty() } inline void EventBufData_list::remove_first() void EventBufData_list::remove_first(Uint32 & full_count, Uint32 & full_sz) { m_count--; m_sz-= m_head->sz; m_head->get_full_size(full_count, full_sz); #ifdef VM_TRACE assert(m_count >= full_count); assert(m_sz >= full_sz); #endif m_count -= full_count; m_sz -= full_sz; m_head = m_head->m_next; if (m_head == 0) m_tail = 0; } inline void EventBufData_list::append_used_data(EventBufData *data) void EventBufData_list::append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz) { data->m_next = 0; if (m_tail) Loading @@ -211,6 +237,7 @@ void EventBufData_list::append_used_data(EventBufData *data) else { #ifdef VM_TRACE assert(m_head == 0); assert(m_count == 0); assert(m_sz == 0); #endif Loading @@ -218,8 +245,16 @@ void EventBufData_list::append_used_data(EventBufData *data) } m_tail = data; m_count++; m_sz+= data->sz; m_count += full_count; m_sz += full_sz; } inline void EventBufData_list::append_used_data(EventBufData *data) { Uint32 full_count, full_sz; data->get_full_size(full_count, full_sz); append_used_data(data, full_count, full_sz); } inline Loading Loading @@ -442,17 +477,24 @@ public: // routines to copy/merge events EventBufData* alloc_data(); int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]); int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3], Uint32 * change_sz); void dealloc_mem(EventBufData* data, Uint32 * change_sz); int copy_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data); EventBufData* data, Uint32 * change_sz); int merge_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data); EventBufData* data, Uint32 * change_sz); int get_main_data(Gci_container* bucket, EventBufData_hash::Pos& hpos, EventBufData* blob_data); void add_blob_data(EventBufData* main_data, void add_blob_data(Gci_container* bucket, EventBufData* main_data, EventBufData* blob_data); void free_list(EventBufData_list &list); Loading @@ -478,9 +520,9 @@ public: Gci_container m_complete_data; EventBufData *m_free_data; #ifdef VM_TRACE unsigned m_free_data_count; Uint32 m_free_data_count; #endif unsigned m_free_data_sz; Uint32 m_free_data_sz; // user thread EventBufData_list m_available_data; Loading @@ -493,6 +535,12 @@ public: unsigned m_gci_slip_thresh; NdbError m_error; #ifdef VM_TRACE static void verify_size(const EventBufData* data, Uint32 count, Uint32 sz); static void verify_size(const EventBufData_list & list); #endif private: int expand(unsigned sz); Loading storage/ndb/test/ndbapi/test_event_merge.cpp +36 −25 Original line number Diff line number Diff line Loading @@ -161,6 +161,7 @@ static void errdb() { uint any = 0; // g_ncc return no error... if (g_ndb != 0) { const NdbError& e = g_ndb->getNdbError(); if (e.code != 0) Loading Loading @@ -359,9 +360,9 @@ createtable(Tab& t) } static int createtable() createtables() { ll1("createtable"); ll1("createtables"); for (uint i = 0; i < maxtab(); i++) chkrc(createtable(tab(i)) == 0); return 0; Loading @@ -381,9 +382,9 @@ droptable(Tab& t) } static int droptable() droptables() { ll1("droptable"); ll1("droptables"); for (uint i = 0; i < maxtab(); i++) chkrc(droptable(tab(i)) == 0); return 0; Loading Loading @@ -419,9 +420,9 @@ createevent(Tab& t) } static int createevent() createevents() { ll1("createevent"); ll1("createevents"); for (uint i = 0; i < maxtab(); i++) chkrc(createevent(tab(i)) == 0); return 0; Loading @@ -439,11 +440,14 @@ dropevent(Tab& t, bool force = false) } static int dropevent(bool force = false) dropevents(bool force = false) { ll1("dropevent"); for (uint i = 0; i < maxtab(); i++) ll1("dropevents"); for (uint i = 0; i < maxtab(); i++) { if (force && g_tablst[i] == 0) continue; chkrc(dropevent(tab(i), force) == 0 || force); } return 0; } Loading Loading @@ -1173,8 +1177,11 @@ static int dropeventops(bool force = false) { ll1("dropeventops"); for (uint i = 0; i < maxrun(); i++) for (uint i = 0; i < maxrun(); i++) { if (force && g_runlst[i] == 0) continue; chkrc(dropeventop(run(i), force) == 0 || force); } return 0; } Loading Loading @@ -2139,8 +2146,8 @@ runtest() { setseed(-1); initrun(); chkrc(createtable() == 0); chkrc(createevent() == 0); chkrc(createtables() == 0); chkrc(createevents() == 0); for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) { ll0("=== loop " << g_loop << " ==="); setseed(g_loop); Loading @@ -2164,8 +2171,8 @@ runtest() // time erases everything.. chkrc(waitgci(1) == 0); } chkrc(dropevent() == 0); chkrc(droptable() == 0); chkrc(dropevents() == 0); chkrc(droptables() == 0); resetmem(); deleteops(); return 0; Loading Loading @@ -2287,6 +2294,16 @@ checkopts() return 0; } static int doconnect() { g_ncc = new Ndb_cluster_connection(); chkdb(g_ncc->connect(30) == 0); g_ndb = new Ndb(g_ncc, "TEST_DB"); chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0); return 0; } int main(int argc, char** argv) { Loading @@ -2302,19 +2319,13 @@ main(int argc, char** argv) ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option); if (ret != 0 || argc != 0 || checkopts() != 0) return NDBT_ProgramExit(NDBT_WRONGARGS); g_ncc = new Ndb_cluster_connection(); if (g_ncc->connect(30) == 0) { g_ndb = new Ndb(g_ncc, "TEST_DB"); if (g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0) { if (runtest() == 0) { if (doconnect() == 0 && runtest() == 0) { delete g_ndb; delete g_ncc; return NDBT_ProgramExit(NDBT_OK); } } } dropeventops(true); dropevent(true); dropevents(true); delete g_ndb; delete g_ncc; return NDBT_ProgramExit(NDBT_FAILED); Loading Loading
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp +125 −46 Original line number Diff line number Diff line Loading @@ -65,6 +65,39 @@ print_std(const SubTableData * sdata, LinearSectionPtr ptr[3]) } #endif // EventBufData Uint32 EventBufData::get_blob_part_no() const { assert(ptr[0].sz > 2); Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() + AttributeHeader(ptr[0].p[1]).getDataSize(); Uint32 no = ptr[1].p[pos]; return no; } void EventBufData::add_part_size(Uint32 & full_count, Uint32 & full_sz) const { Uint32 tmp_count = 0; Uint32 tmp_sz = 0; const EventBufData* data2 = m_next_blob; while (data2 != 0) { tmp_count++; tmp_sz += data2->sz; const EventBufData* data3 = data2->m_next; while (data3 != 0) { tmp_count++; tmp_sz += data3->sz; data3 = data3->m_next; } data2 = data2->m_next_blob; } full_count += tmp_count; full_sz += tmp_sz; } /* * Class NdbEventOperationImpl * Loading Loading @@ -1162,11 +1195,12 @@ NdbEventBuffer::nextEvent() // set NdbEventOperation data op->m_data_item= data; // remove item from m_available_data m_available_data.remove_first(); // remove item from m_available_data and return size Uint32 full_count, full_sz; m_available_data.remove_first(full_count, full_sz); // add it to used list m_used_data.append_used_data(data); m_used_data.append_used_data(data, full_count, full_sz); #ifdef VM_TRACE op->m_data_done_count++; Loading Loading @@ -1840,7 +1874,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, op->m_has_error = 2; DBUG_RETURN_EVENT(-1); } if (unlikely(copy_data(sdata, ptr, data))) if (unlikely(copy_data(sdata, ptr, data, NULL))) { op->m_has_error = 3; DBUG_RETURN_EVENT(-1); Loading Loading @@ -1872,7 +1906,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, } } // link blob event under main event add_blob_data(main_data, data); add_blob_data(bucket, main_data, data); } if (use_hash) { Loading @@ -1886,7 +1920,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, else { // event with same op, PK found, merge into old buffer if (unlikely(merge_data(sdata, ptr, data))) if (unlikely(merge_data(sdata, ptr, data, & bucket->m_data.m_sz))) { op->m_has_error = 3; DBUG_RETURN_EVENT(-1); Loading @@ -1909,6 +1943,9 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, } } } #ifdef NDB_EVENT_VERIFY_SIZE verify_size(bucket->m_data); #endif DBUG_RETURN_EVENT(0); } Loading Loading @@ -1962,8 +1999,21 @@ NdbEventBuffer::alloc_data() } // remove data from free list if (data->m_next_blob == 0) m_free_data = data->m_next; else { EventBufData* data2 = data->m_next_blob; if (data2->m_next == 0) { data->m_next_blob = data2->m_next_blob; data = data2; } else { EventBufData* data3 = data2->m_next; data2->m_next = data3->m_next; data = data3; } } data->m_next = 0; data->m_next_blob = 0; #ifdef VM_TRACE m_free_data_count--; assert(m_free_data_sz >= data->sz); Loading @@ -1975,7 +2025,9 @@ NdbEventBuffer::alloc_data() // allocate initial or bigger memory area in EventBufData // takes sizes from given ptr and sets up data->ptr int NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3], Uint32 * change_sz) { DBUG_ENTER("NdbEventBuffer::alloc_mem"); DBUG_PRINT("info", ("ptr sz %u + %u + %u", ptr[0].sz, ptr[1].sz, ptr[2].sz)); Loading @@ -1988,6 +2040,8 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) if (data->sz < alloc_size) { Uint32 add_sz = alloc_size - data->sz; NdbMem_Free((char*)data->memory); assert(m_total_alloc >= data->sz); m_total_alloc -= data->sz; Loading @@ -1999,6 +2053,9 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) DBUG_RETURN(-1); data->sz = alloc_size; m_total_alloc += data->sz; if (change_sz != NULL) *change_sz += add_sz; } Uint32* memptr = data->memory; Loading @@ -2014,14 +2071,30 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]) DBUG_RETURN(0); } void NdbEventBuffer::dealloc_mem(EventBufData* data, Uint32 * change_sz) { NdbMem_Free((char*)data->memory); assert(m_total_alloc >= data->sz); m_total_alloc -= data->sz; if (change_sz != NULL) { assert(*change_sz >= data->sz); *change_sz -= data->sz; } data->memory = 0; data->sz = 0; } int NdbEventBuffer::copy_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data) EventBufData* data, Uint32 * change_sz) { DBUG_ENTER_EVENT("NdbEventBuffer::copy_data"); if (alloc_mem(data, ptr) != 0) if (alloc_mem(data, ptr, change_sz) != 0) DBUG_RETURN_EVENT(-1); memcpy(data->sdata, sdata, sizeof(SubTableData)); int i; Loading Loading @@ -2093,7 +2166,8 @@ copy_attr(AttributeHeader ah, int NdbEventBuffer::merge_data(const SubTableData * const sdata, LinearSectionPtr ptr2[3], EventBufData* data) EventBufData* data, Uint32 * change_sz) { DBUG_ENTER_EVENT("NdbEventBuffer::merge_data"); Loading @@ -2102,7 +2176,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, int t1 = SubTableData::getOperation(data->sdata->requestInfo); int t2 = SubTableData::getOperation(sdata->requestInfo); if (t1 == Ev_t::enum_NUL) DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data)); DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data, change_sz)); Ev_t* tp = 0; int i; Loading Loading @@ -2142,6 +2216,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, DBUG_RETURN_EVENT(0); } // TODO: use old data items, avoid malloc/free on each merge // save old data EventBufData olddata = *data; data->memory = 0; Loading @@ -2158,7 +2234,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, { if (loop == 1) { if (alloc_mem(data, ptr) != 0) if (alloc_mem(data, ptr, change_sz) != 0) { result = -1; goto end; Loading Loading @@ -2277,11 +2353,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata, } end: // free old data NdbMem_Free((char*)olddata.memory); assert(m_total_alloc >= olddata.sz); m_total_alloc -= olddata.sz; dealloc_mem(&olddata, change_sz); DBUG_RETURN_EVENT(result); } Loading Loading @@ -2357,7 +2429,7 @@ NdbEventBuffer::get_main_data(Gci_container* bucket, SubTableData sdata = *blob_data->sdata; sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id; SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL); if (copy_data(&sdata, ptr, main_data) != 0) if (copy_data(&sdata, ptr, main_data, NULL) != 0) DBUG_RETURN_EVENT(-1); hpos.data = main_data; Loading @@ -2365,7 +2437,8 @@ NdbEventBuffer::get_main_data(Gci_container* bucket, } void NdbEventBuffer::add_blob_data(EventBufData* main_data, NdbEventBuffer::add_blob_data(Gci_container* bucket, EventBufData* main_data, EventBufData* blob_data) { DBUG_ENTER_EVENT("NdbEventBuffer::add_blob_data"); Loading @@ -2389,6 +2462,9 @@ NdbEventBuffer::add_blob_data(EventBufData* main_data, blob_data->m_next = head->m_next; head->m_next = blob_data; } // adjust data list size bucket->m_data.m_count += 1; bucket->m_data.m_sz += blob_data->sz; DBUG_VOID_RETURN_EVENT; } Loading Loading @@ -2424,6 +2500,9 @@ NdbEventBuffer::move_data() void NdbEventBuffer::free_list(EventBufData_list &list) { #ifdef NDB_EVENT_VERIFY_SIZE verify_size(list); #endif // return list to m_free_data list.m_tail->m_next= m_free_data; m_free_data= list.m_head; Loading @@ -2432,38 +2511,15 @@ NdbEventBuffer::free_list(EventBufData_list &list) #endif m_free_data_sz+= list.m_sz; // free blobs XXX unacceptable performance, fix later { EventBufData* data = list.m_head; while (1) { while (data->m_next_blob != NULL) { EventBufData* blob_head = data->m_next_blob; data->m_next_blob = blob_head->m_next_blob; blob_head->m_next_blob = NULL; while (blob_head != NULL) { EventBufData* blob_part = blob_head; blob_head = blob_head->m_next; blob_part->m_next = m_free_data; m_free_data = blob_part; #ifdef VM_TRACE m_free_data_count++; #endif m_free_data_sz += blob_part->sz; } } if (data == list.m_tail) break; data = data->m_next; } } // list returned to m_free_data list.m_head = list.m_tail = NULL; list.m_count = list.m_sz = 0; } void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci) { #ifdef NDB_EVENT_VERIFY_SIZE NdbEventBuffer::verify_size(*list); #endif move_gci_ops(list, gci); if (m_tail) Loading Loading @@ -2702,6 +2758,29 @@ NdbEventBuffer::reportStatus() #endif } #ifdef VM_TRACE void NdbEventBuffer::verify_size(const EventBufData* data, Uint32 count, Uint32 sz) { Uint32 tmp_count = 0; Uint32 tmp_sz = 0; while (data != 0) { Uint32 full_count, full_sz; data->get_full_size(full_count, full_sz); tmp_count += full_count; tmp_sz += full_sz; data = data->m_next; } assert(tmp_count == count); assert(tmp_sz == sz); } void NdbEventBuffer::verify_size(const EventBufData_list & list) { verify_size(list.m_head, list.m_count, list.m_sz); } #endif // hash table routines // could optimize the all-fixed case Loading
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp +79 −31 Original line number Diff line number Diff line Loading @@ -40,6 +40,12 @@ #define DBUG_DUMP_EVENT(A,B,C) #endif #undef NDB_EVENT_VERIFY_SIZE #ifdef VM_TRACE // not much effect on performance, leave on #define NDB_EVENT_VERIFY_SIZE #endif class NdbEventOperationImpl; struct EventBufData Loading @@ -54,9 +60,13 @@ struct EventBufData /* * Blobs are stored in blob list (m_next_blob) where each entry * is list of parts (m_next) in part number order. * is list of parts (m_next). TODO order by part number * * Processed data (m_used_data, m_free_data) keeps the old blob * list intact. It is reconsumed when new data items are needed. * * TODO order by part no and link for fast read and free_list * Data item lists keep track of item count and sum(sz) and * these include both main items and blob parts. */ EventBufData *m_next; // Next wrt to global order or Next blob part Loading @@ -66,14 +76,22 @@ struct EventBufData Uint32 m_pkhash; // PK hash (without op) for fast compare EventBufData() {} // Get blob part number from blob data Uint32 get_blob_part_no() { assert(ptr[0].sz > 2); Uint32 pos = AttributeHeader(ptr[0].p[0]).getDataSize() + AttributeHeader(ptr[0].p[1]).getDataSize(); Uint32 no = ptr[1].p[pos]; return no; Uint32 get_blob_part_no() const; /* * Main item does not include summary of parts (space / performance * tradeoff). The summary is needed when moving single data item. * It is not needed when moving entire list. */ void get_full_size(Uint32 & full_count, Uint32 & full_sz) const { full_count = 1; full_sz = sz; if (m_next_blob != 0) add_part_size(full_count, full_sz); } void add_part_size(Uint32 & full_count, Uint32 & full_sz) const; }; class EventBufData_list Loading @@ -82,19 +100,22 @@ public: EventBufData_list(); ~EventBufData_list(); void remove_first(); // append data and insert data into Gci_op list with add_gci_op void append_data(EventBufData *data); // remove first and return its size void remove_first(Uint32 & full_count, Uint32 & full_sz); // for remove+append avoid double call to get_full_size() void append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz); // append data and insert data but ignore Gci_op list void append_used_data(EventBufData *data); // append data and insert data into Gci_op list with add_gci_op void append_data(EventBufData *data); // append list to another, will call move_gci_ops void append_list(EventBufData_list *list, Uint64 gci); int is_empty(); EventBufData *m_head, *m_tail; unsigned m_count; unsigned m_sz; Uint32 m_count; Uint32 m_sz; /* distinct ops per gci (assume no hash needed) Loading Loading @@ -193,17 +214,22 @@ int EventBufData_list::is_empty() } inline void EventBufData_list::remove_first() void EventBufData_list::remove_first(Uint32 & full_count, Uint32 & full_sz) { m_count--; m_sz-= m_head->sz; m_head->get_full_size(full_count, full_sz); #ifdef VM_TRACE assert(m_count >= full_count); assert(m_sz >= full_sz); #endif m_count -= full_count; m_sz -= full_sz; m_head = m_head->m_next; if (m_head == 0) m_tail = 0; } inline void EventBufData_list::append_used_data(EventBufData *data) void EventBufData_list::append_used_data(EventBufData *data, Uint32 full_count, Uint32 full_sz) { data->m_next = 0; if (m_tail) Loading @@ -211,6 +237,7 @@ void EventBufData_list::append_used_data(EventBufData *data) else { #ifdef VM_TRACE assert(m_head == 0); assert(m_count == 0); assert(m_sz == 0); #endif Loading @@ -218,8 +245,16 @@ void EventBufData_list::append_used_data(EventBufData *data) } m_tail = data; m_count++; m_sz+= data->sz; m_count += full_count; m_sz += full_sz; } inline void EventBufData_list::append_used_data(EventBufData *data) { Uint32 full_count, full_sz; data->get_full_size(full_count, full_sz); append_used_data(data, full_count, full_sz); } inline Loading Loading @@ -442,17 +477,24 @@ public: // routines to copy/merge events EventBufData* alloc_data(); int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3]); int alloc_mem(EventBufData* data, LinearSectionPtr ptr[3], Uint32 * change_sz); void dealloc_mem(EventBufData* data, Uint32 * change_sz); int copy_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data); EventBufData* data, Uint32 * change_sz); int merge_data(const SubTableData * const sdata, LinearSectionPtr ptr[3], EventBufData* data); EventBufData* data, Uint32 * change_sz); int get_main_data(Gci_container* bucket, EventBufData_hash::Pos& hpos, EventBufData* blob_data); void add_blob_data(EventBufData* main_data, void add_blob_data(Gci_container* bucket, EventBufData* main_data, EventBufData* blob_data); void free_list(EventBufData_list &list); Loading @@ -478,9 +520,9 @@ public: Gci_container m_complete_data; EventBufData *m_free_data; #ifdef VM_TRACE unsigned m_free_data_count; Uint32 m_free_data_count; #endif unsigned m_free_data_sz; Uint32 m_free_data_sz; // user thread EventBufData_list m_available_data; Loading @@ -493,6 +535,12 @@ public: unsigned m_gci_slip_thresh; NdbError m_error; #ifdef VM_TRACE static void verify_size(const EventBufData* data, Uint32 count, Uint32 sz); static void verify_size(const EventBufData_list & list); #endif private: int expand(unsigned sz); Loading
storage/ndb/test/ndbapi/test_event_merge.cpp +36 −25 Original line number Diff line number Diff line Loading @@ -161,6 +161,7 @@ static void errdb() { uint any = 0; // g_ncc return no error... if (g_ndb != 0) { const NdbError& e = g_ndb->getNdbError(); if (e.code != 0) Loading Loading @@ -359,9 +360,9 @@ createtable(Tab& t) } static int createtable() createtables() { ll1("createtable"); ll1("createtables"); for (uint i = 0; i < maxtab(); i++) chkrc(createtable(tab(i)) == 0); return 0; Loading @@ -381,9 +382,9 @@ droptable(Tab& t) } static int droptable() droptables() { ll1("droptable"); ll1("droptables"); for (uint i = 0; i < maxtab(); i++) chkrc(droptable(tab(i)) == 0); return 0; Loading Loading @@ -419,9 +420,9 @@ createevent(Tab& t) } static int createevent() createevents() { ll1("createevent"); ll1("createevents"); for (uint i = 0; i < maxtab(); i++) chkrc(createevent(tab(i)) == 0); return 0; Loading @@ -439,11 +440,14 @@ dropevent(Tab& t, bool force = false) } static int dropevent(bool force = false) dropevents(bool force = false) { ll1("dropevent"); for (uint i = 0; i < maxtab(); i++) ll1("dropevents"); for (uint i = 0; i < maxtab(); i++) { if (force && g_tablst[i] == 0) continue; chkrc(dropevent(tab(i), force) == 0 || force); } return 0; } Loading Loading @@ -1173,8 +1177,11 @@ static int dropeventops(bool force = false) { ll1("dropeventops"); for (uint i = 0; i < maxrun(); i++) for (uint i = 0; i < maxrun(); i++) { if (force && g_runlst[i] == 0) continue; chkrc(dropeventop(run(i), force) == 0 || force); } return 0; } Loading Loading @@ -2139,8 +2146,8 @@ runtest() { setseed(-1); initrun(); chkrc(createtable() == 0); chkrc(createevent() == 0); chkrc(createtables() == 0); chkrc(createevents() == 0); for (g_loop = 0; g_opts.loop == 0 || g_loop < g_opts.loop; g_loop++) { ll0("=== loop " << g_loop << " ==="); setseed(g_loop); Loading @@ -2164,8 +2171,8 @@ runtest() // time erases everything.. chkrc(waitgci(1) == 0); } chkrc(dropevent() == 0); chkrc(droptable() == 0); chkrc(dropevents() == 0); chkrc(droptables() == 0); resetmem(); deleteops(); return 0; Loading Loading @@ -2287,6 +2294,16 @@ checkopts() return 0; } static int doconnect() { g_ncc = new Ndb_cluster_connection(); chkdb(g_ncc->connect(30) == 0); g_ndb = new Ndb(g_ncc, "TEST_DB"); chkdb(g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0); return 0; } int main(int argc, char** argv) { Loading @@ -2302,19 +2319,13 @@ main(int argc, char** argv) ret = handle_options(&argc, &argv, my_long_options, ndb_std_get_one_option); if (ret != 0 || argc != 0 || checkopts() != 0) return NDBT_ProgramExit(NDBT_WRONGARGS); g_ncc = new Ndb_cluster_connection(); if (g_ncc->connect(30) == 0) { g_ndb = new Ndb(g_ncc, "TEST_DB"); if (g_ndb->init() == 0 && g_ndb->waitUntilReady(30) == 0) { if (runtest() == 0) { if (doconnect() == 0 && runtest() == 0) { delete g_ndb; delete g_ncc; return NDBT_ProgramExit(NDBT_OK); } } } dropeventops(true); dropevent(true); dropevents(true); delete g_ndb; delete g_ncc; return NDBT_ProgramExit(NDBT_FAILED); Loading