Commit 6302d0d0 authored by unknown's avatar unknown
Browse files

Added support for HA_READ_ORDER +

Added table statistics

parent b7ffde86
Loading
Loading
Loading
Loading
+95 −11
Original line number Diff line number Diff line
@@ -86,6 +86,9 @@ static int packfrm(const void *data, uint len, const void **pack_data, uint *pac
static int unpackfrm(const void **data, uint *len,
		     const void* pack_data);

static int ndb_get_table_statistics(Ndb*, const char *, 
				     Uint64* rows, Uint64* commits);

/*
  Error handling functions
*/
@@ -551,6 +554,10 @@ int ha_ndbcluster::get_metadata(const char *path)

  // All checks OK, lets use the table
  m_table= (void*)tab;
  Uint64 rows;
  if(ndb_get_table_statistics(m_ndb, m_tabname, &rows, 0) == 0){
    records= rows;
  }
  
  DBUG_RETURN(build_index_list(table, ILBP_OPEN));  
}
@@ -710,18 +717,21 @@ static const ulong index_type_flags[]=
  */
  // HA_KEYREAD_ONLY | 
  HA_READ_NEXT |
  HA_READ_RANGE,
  HA_READ_RANGE |
  HA_READ_ORDER,

  /* UNIQUE_INDEX */
  HA_ONLY_WHOLE_INDEX,

  /* UNIQUE_ORDERED_INDEX */
  HA_READ_NEXT |
  HA_READ_RANGE,
  HA_READ_RANGE |
  HA_READ_ORDER,

  /* ORDERED_INDEX */
  HA_READ_NEXT |
  HA_READ_RANGE,
  HA_READ_RANGE |
  HA_READ_ORDER
};

static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
@@ -1956,7 +1966,10 @@ int ha_ndbcluster::index_first(byte *buf)
{
  DBUG_ENTER("index_first");
  statistic_increment(ha_read_first_count,&LOCK_status);
  DBUG_RETURN(1);
  // Start the ordered index scan and fetch the first row

  // Only HA_READ_ORDER indexes get called by index_first
  DBUG_RETURN(ordered_index_scan(0, 0, true, buf));
}


@@ -1964,6 +1977,16 @@ int ha_ndbcluster::index_last(byte *buf)
{
  DBUG_ENTER("index_last");
  statistic_increment(ha_read_last_count,&LOCK_status);
  int res;
  if((res= ordered_index_scan(0, 0, true, buf)) == 0){
    NdbResultSet *cursor= m_active_cursor; 
    while((res= cursor->nextResult(true)) == 0);
    if(res == 1){
      unpack_record(buf);
      table->status= 0;     
      DBUG_RETURN(0);
    }
  }
  DBUG_RETURN(1);
}

@@ -2397,7 +2420,11 @@ const char **ha_ndbcluster::bas_ext() const

double ha_ndbcluster::scan_time()
{
  return rows2double(records*1000);
  DBUG_ENTER("ha_ndbcluster::scan_time()");
  double res= rows2double(records*1000);
  DBUG_PRINT("exit", ("table: %s value: %f", 
		      m_tabname, res));
  DBUG_RETURN(res);
}


@@ -3579,8 +3606,6 @@ ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
  NDB_INDEX_TYPE idx_type= get_index_type(inx);  

  DBUG_ENTER("records_in_range");
  DBUG_PRINT("enter", ("inx: %u", inx));

  // Prevent partial read of hash indexes by returning HA_POS_ERROR
  if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
      ((min_key && min_key->length < key_length) ||
@@ -3755,4 +3780,63 @@ static int unpackfrm(const void **unpack_data, uint *unpack_len,

   DBUG_RETURN(0);
}

static 
int
ndb_get_table_statistics(Ndb* ndb, const char * table, 
			 Uint64* row_count, Uint64* commit_count)
{
  DBUG_ENTER("ndb_get_table_statistics");
  DBUG_PRINT("enter", ("table: %s", table));
  
  do 
  {
    NdbConnection* pTrans= ndb->startTransaction();
    if (pTrans == NULL)
      break;
    
    NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
    if (pOp == NULL)
      break;
    
    NdbResultSet* rs= pOp->readTuples(NdbScanOperation::LM_Dirty); 
    if (rs == 0)
      break;
    
    int check= pOp->interpret_exit_last_row();
    if (check == -1)
      break;
    
    Uint64 rows, commits;
    pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
    pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
    
    check= pTrans->execute(NoCommit);
    if (check == -1)
      break;
    
    Uint64 sum_rows= 0;
    Uint64 sum_commits= 0;
    while((check= rs->nextResult(true)) == 0)
    {
      sum_rows+= rows;
      sum_commits+= commits;
    }
    
    if (check == -1)
      break;

    ndb->closeTransaction(pTrans);
    if(row_count)
      * row_count= sum_rows;
    if(commit_count)
      * commit_count= sum_commits;
    DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
    DBUG_RETURN(0);
  } while(0);

  DBUG_PRINT("exit", ("failed"));
  DBUG_RETURN(-1);
}

#endif /* HAVE_NDBCLUSTER_DB */