Loading ndb/include/ndbapi/NdbIndexScanOperation.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -113,7 +113,7 @@ public: * Reset bounds and put operation in list that will be * sent on next execute */ int reset_bounds(); int reset_bounds(bool forceSend = false); bool getSorted() const { return m_ordered; } private: Loading @@ -127,8 +127,8 @@ private: virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*); void fix_get_values(); int next_result_ordered(bool fetchAllowed); int send_next_scan_ordered(Uint32 idx); int next_result_ordered(bool fetchAllowed, bool forceSend = false); int send_next_scan_ordered(Uint32 idx, bool forceSend = false); int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*); Uint32 m_sort_columns; Loading ndb/include/ndbapi/NdbResultSet.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -89,17 +89,17 @@ public: * - 1: if there are no more tuples to scan. * - 2: if there are no more cached records in NdbApi */ int nextResult(bool fetchAllowed = true); int nextResult(bool fetchAllowed = true, bool forceSend = false); /** * Close result set (scan) */ void close(); void close(bool forceSend = false); /** * Restart */ int restart(); int restart(bool forceSend = false); /** * Transfer scan operation to an updating transaction. Use this function Loading ndb/include/ndbapi/NdbScanOperation.hpp +6 −5 Original line number Diff line number Diff line Loading @@ -90,11 +90,11 @@ protected: NdbScanOperation(Ndb* aNdb); virtual ~NdbScanOperation(); int nextResult(bool fetchAllowed = true); int nextResult(bool fetchAllowed = true, bool forceSend = false); virtual void release(); void closeScan(); int close_impl(class TransporterFacade*); void closeScan(bool forceSend = false); int close_impl(class TransporterFacade*, bool forceSend = false); // Overloaded methods from NdbCursorOperation int executeCursor(int ProcessorId); Loading @@ -103,6 +103,7 @@ protected: int init(const NdbTableImpl* tab, NdbConnection* myConnection); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int doSend(int ProcessorId); void checkForceSend(bool forceSend); virtual void setErrorCode(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode); Loading Loading @@ -138,7 +139,7 @@ protected: Uint32 m_sent_receivers_count; // NOTE needs mutex to access NdbReceiver** m_sent_receivers; // receive thread puts them here int send_next_scan(Uint32 cnt, bool close); int send_next_scan(Uint32 cnt, bool close, bool forceSend = false); void receiver_delivered(NdbReceiver*); void receiver_completed(NdbReceiver*); void execCLOSE_SCAN_REP(); Loading @@ -148,7 +149,7 @@ protected: Uint32 m_ordered; int restart(); int restart(bool forceSend = false); }; inline Loading ndb/src/ndbapi/NdbResultSet.cpp +6 −6 Original line number Diff line number Diff line Loading @@ -44,10 +44,10 @@ void NdbResultSet::init() { } int NdbResultSet::nextResult(bool fetchAllowed) int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend) { int res; if ((res = m_operation->nextResult(fetchAllowed)) == 0) { if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) { // handle blobs NdbBlob* tBlob = m_operation->theBlobList; while (tBlob != 0) { Loading @@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed) return res; } void NdbResultSet::close() void NdbResultSet::close(bool forceSend) { m_operation->closeScan(); m_operation->closeScan(forceSend); } NdbOperation* Loading Loading @@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ } int NdbResultSet::restart(){ return m_operation->restart(); NdbResultSet::restart(bool forceSend){ return m_operation->restart(forceSend); } ndb/src/ndbapi/NdbScanOperation.cpp +36 −18 Original line number Diff line number Diff line Loading @@ -453,10 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){ return -1; } int NdbScanOperation::nextResult(bool fetchAllowed) int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) { if(m_ordered) return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, forceSend); /** * Check current receiver Loading Loading @@ -493,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, forceSend) == 0){ idx = m_current_api_receiver; last = m_api_receivers_count; Loading Loading @@ -584,9 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed) } int NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ if(cnt > 0) { NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, bool forceSend){ if(cnt > 0){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); Loading Loading @@ -633,6 +636,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ } } if (!ret) checkForceSend(forceSend); m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; Loading @@ -642,6 +647,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ return 0; } void NdbScanOperation::checkForceSend(bool forceSend) { if (forceSend) { TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); } else { TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); }//if } int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { Loading @@ -657,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId) return 0; } void NdbScanOperation::closeScan() void NdbScanOperation::closeScan(bool forceSend) { if(m_transConnection){ if(DEBUG_NEXT_RESULT) Loading @@ -672,7 +686,7 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); close_impl(tp); close_impl(tp, forceSend); } while(0); Loading Loading @@ -1309,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, } int NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, bool forceSend){ Uint32 u_idx = 0, u_last = 0; Uint32 s_idx = m_current_api_receiver; // first sorted Loading @@ -1335,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx, forceSend)){ Uint32 tmp = m_sent_receivers_count; s_idx = m_current_api_receiver; while(m_sent_receivers_count > 0 && !theError.code){ Loading Loading @@ -1424,7 +1440,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ } int NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ if(idx == theParallelism) return 0; Loading Loading @@ -1461,11 +1477,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); tSignal.setLength(4+1); return tp->sendSignal(&tSignal, nodeId); int ret= tp->sendSignal(&tSignal, nodeId); if (!ret) checkForceSend(forceSend); return ret; } int NdbScanOperation::close_impl(TransporterFacade* tp){ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; Loading Loading @@ -1532,7 +1550,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ } // Send close scan if(send_next_scan(api+conf, true) == -1) if(send_next_scan(api+conf, true, forceSend) == -1) { theNdbCon->theReleaseOnClose = true; return -1; Loading Loading @@ -1581,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ } int NdbScanOperation::restart() NdbScanOperation::restart(bool forceSend) { TransporterFacade* tp = TransporterFacade::instance(); Loading @@ -1590,7 +1608,7 @@ NdbScanOperation::restart() { int res; if((res= close_impl(tp))) if((res= close_impl(tp, forceSend))) { return res; } Loading @@ -1609,13 +1627,13 @@ NdbScanOperation::restart() } int NdbIndexScanOperation::reset_bounds(){ NdbIndexScanOperation::reset_bounds(bool forceSend){ int res; { TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); res= close_impl(tp); res= close_impl(tp, forceSend); } if(!res) Loading Loading
ndb/include/ndbapi/NdbIndexScanOperation.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -113,7 +113,7 @@ public: * Reset bounds and put operation in list that will be * sent on next execute */ int reset_bounds(); int reset_bounds(bool forceSend = false); bool getSorted() const { return m_ordered; } private: Loading @@ -127,8 +127,8 @@ private: virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*); void fix_get_values(); int next_result_ordered(bool fetchAllowed); int send_next_scan_ordered(Uint32 idx); int next_result_ordered(bool fetchAllowed, bool forceSend = false); int send_next_scan_ordered(Uint32 idx, bool forceSend = false); int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*); Uint32 m_sort_columns; Loading
ndb/include/ndbapi/NdbResultSet.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -89,17 +89,17 @@ public: * - 1: if there are no more tuples to scan. * - 2: if there are no more cached records in NdbApi */ int nextResult(bool fetchAllowed = true); int nextResult(bool fetchAllowed = true, bool forceSend = false); /** * Close result set (scan) */ void close(); void close(bool forceSend = false); /** * Restart */ int restart(); int restart(bool forceSend = false); /** * Transfer scan operation to an updating transaction. Use this function Loading
ndb/include/ndbapi/NdbScanOperation.hpp +6 −5 Original line number Diff line number Diff line Loading @@ -90,11 +90,11 @@ protected: NdbScanOperation(Ndb* aNdb); virtual ~NdbScanOperation(); int nextResult(bool fetchAllowed = true); int nextResult(bool fetchAllowed = true, bool forceSend = false); virtual void release(); void closeScan(); int close_impl(class TransporterFacade*); void closeScan(bool forceSend = false); int close_impl(class TransporterFacade*, bool forceSend = false); // Overloaded methods from NdbCursorOperation int executeCursor(int ProcessorId); Loading @@ -103,6 +103,7 @@ protected: int init(const NdbTableImpl* tab, NdbConnection* myConnection); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int doSend(int ProcessorId); void checkForceSend(bool forceSend); virtual void setErrorCode(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode); Loading Loading @@ -138,7 +139,7 @@ protected: Uint32 m_sent_receivers_count; // NOTE needs mutex to access NdbReceiver** m_sent_receivers; // receive thread puts them here int send_next_scan(Uint32 cnt, bool close); int send_next_scan(Uint32 cnt, bool close, bool forceSend = false); void receiver_delivered(NdbReceiver*); void receiver_completed(NdbReceiver*); void execCLOSE_SCAN_REP(); Loading @@ -148,7 +149,7 @@ protected: Uint32 m_ordered; int restart(); int restart(bool forceSend = false); }; inline Loading
ndb/src/ndbapi/NdbResultSet.cpp +6 −6 Original line number Diff line number Diff line Loading @@ -44,10 +44,10 @@ void NdbResultSet::init() { } int NdbResultSet::nextResult(bool fetchAllowed) int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend) { int res; if ((res = m_operation->nextResult(fetchAllowed)) == 0) { if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) { // handle blobs NdbBlob* tBlob = m_operation->theBlobList; while (tBlob != 0) { Loading @@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed) return res; } void NdbResultSet::close() void NdbResultSet::close(bool forceSend) { m_operation->closeScan(); m_operation->closeScan(forceSend); } NdbOperation* Loading Loading @@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ } int NdbResultSet::restart(){ return m_operation->restart(); NdbResultSet::restart(bool forceSend){ return m_operation->restart(forceSend); }
ndb/src/ndbapi/NdbScanOperation.cpp +36 −18 Original line number Diff line number Diff line Loading @@ -453,10 +453,12 @@ NdbScanOperation::executeCursor(int nodeId){ return -1; } int NdbScanOperation::nextResult(bool fetchAllowed) int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) { if(m_ordered) return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, forceSend); /** * Check current receiver Loading Loading @@ -493,7 +495,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, forceSend) == 0){ idx = m_current_api_receiver; last = m_api_receivers_count; Loading Loading @@ -584,9 +587,9 @@ int NdbScanOperation::nextResult(bool fetchAllowed) } int NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ if(cnt > 0) { NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, bool forceSend){ if(cnt > 0){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); Loading Loading @@ -633,6 +636,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ } } if (!ret) checkForceSend(forceSend); m_sent_receivers_count = last + sent; m_api_receivers_count -= cnt; m_current_api_receiver = 0; Loading @@ -642,6 +647,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ return 0; } void NdbScanOperation::checkForceSend(bool forceSend) { if (forceSend) { TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); } else { TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); }//if } int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { Loading @@ -657,7 +671,7 @@ NdbScanOperation::doSend(int ProcessorId) return 0; } void NdbScanOperation::closeScan() void NdbScanOperation::closeScan(bool forceSend) { if(m_transConnection){ if(DEBUG_NEXT_RESULT) Loading @@ -672,7 +686,7 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); close_impl(tp); close_impl(tp, forceSend); } while(0); Loading Loading @@ -1309,7 +1323,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, } int NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, bool forceSend){ Uint32 u_idx = 0, u_last = 0; Uint32 s_idx = m_current_api_receiver; // first sorted Loading @@ -1335,7 +1350,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx, forceSend)){ Uint32 tmp = m_sent_receivers_count; s_idx = m_current_api_receiver; while(m_sent_receivers_count > 0 && !theError.code){ Loading Loading @@ -1424,7 +1440,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ } int NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ if(idx == theParallelism) return 0; Loading Loading @@ -1461,11 +1477,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); tSignal.setLength(4+1); return tp->sendSignal(&tSignal, nodeId); int ret= tp->sendSignal(&tSignal, nodeId); if (!ret) checkForceSend(forceSend); return ret; } int NdbScanOperation::close_impl(TransporterFacade* tp){ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; Loading Loading @@ -1532,7 +1550,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ } // Send close scan if(send_next_scan(api+conf, true) == -1) if(send_next_scan(api+conf, true, forceSend) == -1) { theNdbCon->theReleaseOnClose = true; return -1; Loading Loading @@ -1581,7 +1599,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ } int NdbScanOperation::restart() NdbScanOperation::restart(bool forceSend) { TransporterFacade* tp = TransporterFacade::instance(); Loading @@ -1590,7 +1608,7 @@ NdbScanOperation::restart() { int res; if((res= close_impl(tp))) if((res= close_impl(tp, forceSend))) { return res; } Loading @@ -1609,13 +1627,13 @@ NdbScanOperation::restart() } int NdbIndexScanOperation::reset_bounds(){ NdbIndexScanOperation::reset_bounds(bool forceSend){ int res; { TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); res= close_impl(tp); res= close_impl(tp, forceSend); } if(!res) Loading