Commit cfca0085 authored by unknown's avatar unknown
Browse files

added force send interface to scan

prepared for using query cache in ndb


ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  added force send interface to scan
ndb/include/ndbapi/NdbResultSet.hpp:
  added force send interface to scan
ndb/include/ndbapi/NdbScanOperation.hpp:
  added force send interface to scan
ndb/src/ndbapi/NdbResultSet.cpp:
  added force send interface to scan
ndb/src/ndbapi/NdbScanOperation.cpp:
  added force send interface to scan
parent 6aebd056
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -113,7 +113,7 @@ public:
   * Reset bounds and put operation in list that will be
   *   sent on next execute
   */
  int reset_bounds();
  int reset_bounds(bool forceSend = false);
  
  bool getSorted() const { return m_ordered; }
private:
@@ -127,8 +127,8 @@ private:
  virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);

  void fix_get_values();
  int next_result_ordered(bool fetchAllowed);
  int send_next_scan_ordered(Uint32 idx);
  int next_result_ordered(bool fetchAllowed, bool forceSend = false);
  int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
  int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);

  Uint32 m_sort_columns;
+3 −3
Original line number Diff line number Diff line
@@ -89,17 +89,17 @@ public:
   * -   1: if there are no more tuples to scan.
   * -   2: if there are no more cached records in NdbApi
   */
  int nextResult(bool fetchAllowed = true);
  int nextResult(bool fetchAllowed = true, bool forceSend = false);

  /**
   * Close result set (scan)
   */
  void close();
  void close(bool forceSend = false);

  /**
   * Restart
   */
  int restart();
  int restart(bool forceSend = false);
  
  /**
   * Transfer scan operation to an updating transaction. Use this function 
+6 −5
Original line number Diff line number Diff line
@@ -90,11 +90,11 @@ protected:
  NdbScanOperation(Ndb* aNdb);
  virtual ~NdbScanOperation();

  int nextResult(bool fetchAllowed = true);
  int nextResult(bool fetchAllowed = true, bool forceSend = false);
  virtual void release();
  
  void closeScan();
  int close_impl(class TransporterFacade*);
  void closeScan(bool forceSend = false);
  int close_impl(class TransporterFacade*, bool forceSend = false);

  // Overloaded methods from NdbCursorOperation
  int executeCursor(int ProcessorId);
@@ -103,6 +103,7 @@ protected:
  int init(const NdbTableImpl* tab, NdbConnection* myConnection);
  int prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId);
  int doSend(int ProcessorId);
  void checkForceSend(bool forceSend);

  virtual void setErrorCode(int aErrorCode);
  virtual void setErrorCodeAbort(int aErrorCode);
@@ -138,7 +139,7 @@ protected:
  Uint32 m_sent_receivers_count;  // NOTE needs mutex to access
  NdbReceiver** m_sent_receivers; // receive thread puts them here
  
  int send_next_scan(Uint32 cnt, bool close);
  int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
  void receiver_delivered(NdbReceiver*);
  void receiver_completed(NdbReceiver*);
  void execCLOSE_SCAN_REP();
@@ -148,7 +149,7 @@ protected:
  
  Uint32 m_ordered;

  int restart();
  int restart(bool forceSend = false);
};

inline
+6 −6
Original line number Diff line number Diff line
@@ -44,10 +44,10 @@ void NdbResultSet::init()
{
}

int NdbResultSet::nextResult(bool fetchAllowed)
int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend)
{
  int res;
  if ((res = m_operation->nextResult(fetchAllowed)) == 0) {
  if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) {
    // handle blobs
    NdbBlob* tBlob = m_operation->theBlobList;
    while (tBlob != 0) {
@@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed)
  return res;
}

void NdbResultSet::close()
void NdbResultSet::close(bool forceSend)
{
  m_operation->closeScan();
  m_operation->closeScan(forceSend);
}

NdbOperation* 
@@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
}

int
NdbResultSet::restart(){
  return m_operation->restart();
NdbResultSet::restart(bool forceSend){
  return m_operation->restart(forceSend);
}
+34 −16
Original line number Diff line number Diff line
@@ -447,10 +447,11 @@ NdbScanOperation::executeCursor(int nodeId){

#define DEBUG_NEXT_RESULT 0

int NdbScanOperation::nextResult(bool fetchAllowed)
int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
{
  if(m_ordered)
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed);
    return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
							       forceSend);
  
  /**
   * Check current receiver
@@ -487,7 +488,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
  TransporterFacade* tp = TransporterFacade::instance();
  Guard guard(tp->theMutexPtr);
  Uint32 seq = theNdbCon->theNodeSequence;
  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){
  if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
							  forceSend) == 0){
      
    idx = m_current_api_receiver;
    last = m_api_receivers_count;
@@ -578,7 +580,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
}

int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){  
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
				 bool forceSend){  
  if(cnt > 0 || stopScanFlag){
    NdbApiSignal tSignal(theNdb->theMyRef);
    tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -618,6 +621,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
      ret = tp->sendSignal(&tSignal, nodeId);
    }

    if (!ret) checkForceSend(forceSend);

    m_sent_receivers_count = last + cnt + stopScanFlag;
    m_api_receivers_count -= cnt;
    m_current_api_receiver = 0;
@@ -627,6 +632,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
  return 0;
}

void NdbScanOperation::checkForceSend(bool forceSend)
{
  if (forceSend) {
    TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
  } else {
    TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
  }//if
}

int 
NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
{
@@ -642,7 +656,7 @@ NdbScanOperation::doSend(int ProcessorId)
  return 0;
}

void NdbScanOperation::closeScan()
void NdbScanOperation::closeScan(bool forceSend)
{
  if(m_transConnection){
    if(DEBUG_NEXT_RESULT)
@@ -657,7 +671,7 @@ void NdbScanOperation::closeScan()
    
    TransporterFacade* tp = TransporterFacade::instance();
    Guard guard(tp->theMutexPtr);
    close_impl(tp);
    close_impl(tp, forceSend);
    
  } while(0);
  
@@ -1293,7 +1307,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
}

int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
					   bool forceSend){
  
  Uint32 u_idx = 0, u_last = 0;
  Uint32 s_idx   = m_current_api_receiver; // first sorted
@@ -1319,7 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
      Guard guard(tp->theMutexPtr);
      Uint32 seq = theNdbCon->theNodeSequence;
      Uint32 nodeId = theNdbCon->theDBnode;
      if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){
      if(seq == tp->getNodeSequence(nodeId) &&
	 !send_next_scan_ordered(s_idx, forceSend)){
	Uint32 tmp = m_sent_receivers_count;
	s_idx = m_current_api_receiver; 
	while(m_sent_receivers_count > 0 && !theError.code){
@@ -1408,7 +1424,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
}

int
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){  
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){  
  if(idx == theParallelism)
    return 0;
  
@@ -1440,11 +1456,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
  Uint32 nodeId = theNdbCon->theDBnode;
  TransporterFacade * tp = TransporterFacade::instance();
  tSignal.setLength(4+1);
  return tp->sendSignal(&tSignal, nodeId);
  int ret= tp->sendSignal(&tSignal, nodeId);
  if (!ret) checkForceSend(forceSend);
  return ret;
}

int
NdbScanOperation::close_impl(TransporterFacade* tp){
NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
  Uint32 seq = theNdbCon->theNodeSequence;
  Uint32 nodeId = theNdbCon->theDBnode;
  
@@ -1473,7 +1491,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){

  if(m_api_receivers_count+m_conf_receivers_count){
    // Send close scan
    if(send_next_scan(0, true) == -1){ // Close scan
    if(send_next_scan(0, true, forceSend) == -1){ // Close scan
      theNdbCon->theReleaseOnClose = true;
      return -1;
    }
@@ -1520,7 +1538,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
}

int
NdbScanOperation::restart()
NdbScanOperation::restart(bool forceSend)
{
  
  TransporterFacade* tp = TransporterFacade::instance();
@@ -1529,7 +1547,7 @@ NdbScanOperation::restart()
  
  {
    int res;
    if((res= close_impl(tp)))
    if((res= close_impl(tp, forceSend)))
    {
      return res;
    }
@@ -1548,13 +1566,13 @@ NdbScanOperation::restart()
}

int
NdbIndexScanOperation::reset_bounds(){
NdbIndexScanOperation::reset_bounds(bool forceSend){
  int res;
  
  {
    TransporterFacade* tp = TransporterFacade::instance();
    Guard guard(tp->theMutexPtr);
    res= close_impl(tp);
    res= close_impl(tp, forceSend);
  }

  if(!res)
Loading