Commit f1056028 authored by unknown's avatar unknown
Browse files

ndb - bug#20252

  allow user to specify scan batch size in readTuples


ndb/include/ndbapi/NdbIndexScanOperation.hpp:
  Allow user to specify batch size
ndb/include/ndbapi/NdbScanOperation.hpp:
  Allow user to specify batch size
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp:
  Fix so that last row works even if batch is complete
ndb/src/ndbapi/NdbReceiver.cpp:
  Allow user yo specify batch size
ndb/src/ndbapi/NdbScanOperation.cpp:
  Allow user to specify batchsize
parent 841a5b7a
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
@@ -41,7 +41,9 @@ public:
   * @param parallel No of fragments to scan in parallel (0=max)
   */ 
  virtual int readTuples(LockMode lock_mode = LM_Read, 
                         Uint32 scan_flags = 0, Uint32 parallel = 0);
                         Uint32 scan_flags = 0, 
			 Uint32 parallel = 0,
			 Uint32 batch = 0);

#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
  /**
@@ -66,7 +68,7 @@ public:
      (SF_OrderBy & -(Int32)order_by) |
      (SF_Descending & -(Int32)order_desc) |
      (SF_ReadRangeNo & -(Int32)read_range_no);
    return readTuples(lock_mode, scan_flags, parallel);
    return readTuples(lock_mode, scan_flags, parallel, batch);
  }
#endif

+3 −1
Original line number Diff line number Diff line
@@ -56,7 +56,9 @@ public:
   */ 
  virtual
  int readTuples(LockMode lock_mode = LM_Read, 
                 Uint32 scan_flags = 0, Uint32 parallel = 0);
                 Uint32 scan_flags = 0, 
		 Uint32 parallel = 0,
		 Uint32 batch = 0);

#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
  /**
+4 −4
Original line number Diff line number Diff line
@@ -7349,15 +7349,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
      scanptr.p->m_curr_batch_size_rows = 0;
      scanptr.p->m_curr_batch_size_bytes = 0;
      closeScanLab(signal);
    } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
      jam();
      closeScanLab(signal);
      return;
    } else if (scanptr.p->check_scan_batch_completed() &&
               scanptr.p->scanLockHold != ZTRUE) {
      jam();
      scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
      sendScanFragConf(signal, ZFALSE);
    } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
      jam();
      closeScanLab(signal);
      return;
    } else {
      jam();
      /*
+9 −1
Original line number Diff line number Diff line
@@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
   * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
   * batch.
   */
  if (batch_size == 0)
  {
    batch_byte_size= max_batch_byte_size;
  }
  else
  {
    batch_byte_size= batch_size * tot_size;
  }
  
  if (batch_byte_size * parallelism > max_scan_batch_size) {
    batch_byte_size= max_scan_batch_size / parallelism;
  }
+15 −8
Original line number Diff line number Diff line
@@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
int 
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
			     Uint32 scan_flags, 
			     Uint32 parallel)
			     Uint32 parallel,
			     Uint32 batch)
{
  m_ordered = m_descending = false;
  Uint32 fragCount = m_currentTable->m_fragmentCount;
@@ -182,6 +183,9 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
  if (tupScan && rangeScan)
    tupScan = false;

  if (rangeScan && (scan_flags & SF_OrderBy))
    parallel = fragCount;
  
  theParallelism = parallel;    
  
  if(fix_receivers(parallel) == -1){
@@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
  req->tableSchemaVersion = m_accessTable->m_version;
  req->storedProcId = 0xFFFF;
  req->buddyConPtr = theNdbCon->theBuddyConPtr;
  req->first_batch_size = batch; // Save user specified batch size
  
  Uint32 reqInfo = 0;
  ScanTabReq::setParallelism(reqInfo, parallel);
@@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
   * The number of records sent by each LQH is calculated and the kernel
   * is informed of this number by updating the SCAN_TABREQ signal
   */
  Uint32 batch_size, batch_byte_size, first_batch_size;
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  Uint32 batch_size = req->first_batch_size; // User specified
  Uint32 batch_byte_size, first_batch_size;
  theReceiver.calculate_batch_size(key_size,
                                   theParallelism,
                                   batch_size,
                                   batch_byte_size,
                                   first_batch_size);
  ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  ScanTabReq::setScanBatch(req->requestInfo, batch_size);
  req->batch_byte_size= batch_byte_size;
  req->first_batch_size= first_batch_size;
@@ -1206,13 +1212,14 @@ NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
int
NdbIndexScanOperation::readTuples(LockMode lm,
				  Uint32 scan_flags,
				  Uint32 parallel)
				  Uint32 parallel,
				  Uint32 batch)
{
  const bool order_by = scan_flags & SF_OrderBy;
  const bool order_desc = scan_flags & SF_Descending;
  const bool read_range_no = scan_flags & SF_ReadRangeNo;
  
  int res = NdbScanOperation::readTuples(lm, scan_flags, 0);
  int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
  if(!res && read_range_no)
  {
    m_read_range_no = 1;