Commit 283c679d authored by pekka@clam.ndb.mysql.com/clam.(none)'s avatar pekka@clam.ndb.mysql.com/clam.(none)
Browse files

ndb - threads and timings to hugo* pk ops

parent 42000d3f
Loading
Loading
Loading
Loading
+15 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@
#include <NDBT.hpp>
#include <HugoCalculator.hpp>
#include <HugoOperations.hpp>

class NDBT_Stats;

class HugoTransactions : public HugoOperations {
public:
@@ -109,10 +109,24 @@ public:
  void setRetryMax(int retryMax = 100) { m_retryMax = retryMax; }
  
  Uint32 m_latest_gci;

  void setStatsLatency(NDBT_Stats* stats) { m_stats_latency = stats; }

  // allows multiple threads to update separate batches
  void setThrInfo(int thr_count, int thr_no) {
    m_thr_count = thr_count;
    m_thr_no = thr_no;
  }

protected:  
  NDBT_ResultRow row;
  int m_defaultScanUpdateMethod;
  int m_retryMax;

  NDBT_Stats* m_stats_latency;

  int m_thr_count;      // 0 if no separation between threads
  int m_thr_no;
};


+226 −0
Original line number Diff line number Diff line
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef NDB_THREAD_HPP
#define NDB_THREAD_HPP

#include <NdbMutex.h>
#include <NdbCondition.h>
#include <NdbThread.h>

// NDBT_Thread ctor -> NDBT_Thread_run -> thr.run()
extern "C" {
static void* NDBT_Thread_run(void* arg);
}

// Function to run in a thread.

typedef void NDBT_ThreadFunc(class NDBT_Thread&);

/*
 * NDBT_Thread
 *
 * Represents a thread.  The thread pauses at startup.
 * Main process sets a function to run.  When the function
 * returns, the thread pauses again to wait for a command.
 * This allows main process to sync with the thread and
 * exchange data with it.
 *
 * Input to thread is typically options.  The input area
 * is read-only in the thread.  Output from thread is
 * results such as statistics.  Error code is handled
 * separately.
 *
 * Pointer to Ndb object and method to create it are
 * provided for convenience.
 */

class NDBT_ThreadSet;

class NDBT_Thread {
public:
  NDBT_Thread();
  NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no);
  void create(NDBT_ThreadSet* thread_set, int thread_no);
  ~NDBT_Thread();

  // if part of a set
  inline NDBT_ThreadSet& get_thread_set() const {
    assert(m_thread_set != 0);
    return *m_thread_set;
  }
  inline int get_thread_no() const {
    return m_thread_no;
  }

  // { Wait -> Start -> Stop }+ -> Exit
  enum State {
    Wait = 1,   // wait for command
    Start,      // run current function
    Stop,       // stopped (paused) when current function done
    Exit        // exit thread
  };

  // tell thread to start running current function
  void start();
  // wait for thread to stop when function is done
  void stop();
  // tell thread to exit
  void exit();
  // collect thread after exit
  void join();

  // set function to run
  inline void set_func(NDBT_ThreadFunc* func) {
    m_func = func;
  }

  // input area
  inline void set_input(const void* input) {
    m_input = input;
  }
  inline const void* get_input() const {
    return m_input;
  }

  // output area
  inline void set_output(void* output) {
    m_output = output;
  }
  inline void* get_output() const {
    return m_output;
  }
  template <class T> inline void set_output() {
    set_output(new T);
  }
  inline void delete_output() {
    delete m_output;
    m_output = 0;
  }

  // thread-specific Ndb object
  inline class Ndb* get_ndb() const {
    return m_ndb;
  }
  int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
  void disconnect();

  // error code (OS, Ndb, other)
  void clear_err() {
    m_err = 0;
  }
  void set_err(int err) {
    m_err = err;
  }
  int get_err() const {
    return m_err;
  }

private:
  friend class NDBT_ThreadSet;
  friend void* NDBT_Thread_run(void* arg);

  enum { Magic = 0xabacadae };
  Uint32 m_magic;

  State m_state;
  NDBT_ThreadSet* m_thread_set;
  int m_thread_no;

  NDBT_ThreadFunc* m_func;
  const void* m_input;
  void* m_output;
  class Ndb* m_ndb;
  int m_err;

  // run the thread
  void run();

  void lock() {
    NdbMutex_Lock(m_mutex);
  }
  void unlock() {
    NdbMutex_Unlock(m_mutex);
  }

  void wait() {
    NdbCondition_Wait(m_cond, m_mutex);
  }
  void signal() {
    NdbCondition_Signal(m_cond);
  }

  NdbMutex* m_mutex;
  NdbCondition* m_cond;
  NdbThread* m_thread;
  void* m_status;
};

/*
 * A set of threads, indexed from 0 to count-1.  Methods
 * are applied to each thread (serially).  Input area is
 * common to all threads.  Output areas are allocated
 * separately according to a template class.
 */

class NDBT_ThreadSet {
public:
  NDBT_ThreadSet(int count);
  ~NDBT_ThreadSet();

  inline int get_count() const {
    return m_count;
  }
  inline NDBT_Thread& get_thread(int n) {
    assert(n < m_count && m_thread[n] != 0);
    return *m_thread[n];
  }

  // tell each thread to start running
  void start();
  // wait for each thread to stop
  void stop();
  // tell each thread to exit
  void exit();
  // collect each thread after exit
  void join();

  // set function to run in each thread
  void set_func(NDBT_ThreadFunc* func);

  // set input area (same instance in each thread)
  void set_input(const void* input);

  // set output areas
  template <class T> inline void set_output() {
    for (int n = 0; n < m_count; n++) {
      NDBT_Thread& thr = *m_thread[n];
      thr.set_output<T>();
    }
  }
  void delete_output();

  // thread-specific Ndb objects
  int connect(class Ndb_cluster_connection*, const char* db = "TEST_DB");
  void disconnect();

  int get_err() const;

private:
  int m_count;
  NDBT_Thread** m_thread;
};

#endif
+74 −4
Original line number Diff line number Diff line
@@ -14,8 +14,9 @@
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "HugoTransactions.hpp"
#include <NDBT_Stats.hpp>
#include <NdbSleep.h>

#include <NdbTick.h>

HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,
				   const NdbDictionary::Index* idx):
@@ -24,6 +25,10 @@ HugoTransactions::HugoTransactions(const NdbDictionary::Table& _tab,

  m_defaultScanUpdateMethod = 3;
  setRetryMax();
  m_stats_latency = 0;

  m_thr_count = 0;
  m_thr_no = -1;
}

HugoTransactions::~HugoTransactions(){
@@ -820,6 +825,16 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
      return NDBT_FAILED;
    }

    MicroSecondTimer timer_start;
    MicroSecondTimer timer_stop;
    bool timer_active =
      m_stats_latency != 0 &&
      r >= batch &&             // first batch is "warmup"
      r + batch != records;     // last batch is usually partial

    if (timer_active)
      NdbTick_getMicroTimer(&timer_start);

    if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)
    {
      ERR(pTrans->getNdbError());
@@ -892,6 +907,12 @@ HugoTransactions::pkReadRecords(Ndb* pNdb,
    }
    
    closeTransaction(pNdb);

    if (timer_active) {
      NdbTick_getMicroTimer(&timer_stop);
      NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
      m_stats_latency->addObservation((double)ticks);
    }
  }
  deallocRows();
  g_info << reads << " records read" << endl;
@@ -913,10 +934,18 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
  allocRows(batch);

  g_info << "|- Updating records (batch=" << batch << ")..." << endl;
  int batch_no = 0;
  while (r < records){
    if(r + batch > records)
      batch = records - r;

    if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
    {
      r += batch;
      batch_no++;
      continue;
    }
    
    if (retryAttempt >= m_retryMax){
      g_info << "ERROR: has retried this operation " << retryAttempt 
	     << " times, failing!" << endl;
@@ -963,6 +992,16 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
      return NDBT_FAILED;
    }

    MicroSecondTimer timer_start;
    MicroSecondTimer timer_stop;
    bool timer_active =
      m_stats_latency != 0 &&
      r >= batch &&             // first batch is "warmup"
      r + batch != records;     // last batch is usually partial

    if (timer_active)
      NdbTick_getMicroTimer(&timer_start);

    if(pIndexScanOp)
    {
      int rows_found = 0;
@@ -1040,7 +1079,14 @@ HugoTransactions::pkUpdateRecords(Ndb* pNdb,
    
    closeTransaction(pNdb);

    if (timer_active) {
      NdbTick_getMicroTimer(&timer_stop);
      NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
      m_stats_latency->addObservation((double)ticks);
    }

    r += batch; // Read next record
    batch_no++;
  }
  
  deallocRows();
@@ -1228,10 +1274,18 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
  int                  check;

  g_info << "|- Deleting records..." << endl;
  int batch_no = 0;
  while (r < records){
    if(r + batch > records)
      batch = records - r;

    if (m_thr_count != 0 && m_thr_no != batch_no % m_thr_count)
    {
      r += batch;
      batch_no++;
      continue;
    }

    if (retryAttempt >= m_retryMax){
      g_info << "ERROR: has retried this operation " << retryAttempt 
	     << " times, failing!" << endl;
@@ -1255,6 +1309,16 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
      return NDBT_FAILED;
    }

    MicroSecondTimer timer_start;
    MicroSecondTimer timer_stop;
    bool timer_active =
      m_stats_latency != 0 &&
      r >= batch &&             // first batch is "warmup"
      r + batch != records;     // last batch is usually partial

    if (timer_active)
      NdbTick_getMicroTimer(&timer_start);

    if(pkDeleteRecord(pNdb, r, batch) != NDBT_OK)
    {
      ERR(pTrans->getNdbError());
@@ -1304,8 +1368,14 @@ HugoTransactions::pkDelRecords(Ndb* pNdb,
    }
    closeTransaction(pNdb);

    r += batch; // Read next record
    if (timer_active) {
      NdbTick_getMicroTimer(&timer_stop);
      NDB_TICKS ticks = NdbTick_getMicrosPassed(timer_start, timer_stop);
      m_stats_latency->addObservation((double)ticks);
    }

    r += batch; // Read next record
    batch_no++;
  }

  g_info << "|- " << deleted << " records deleted" << endl;
+1 −1
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ libNDBT_a_SOURCES = \
	NdbRestarter.cpp NdbRestarts.cpp NDBT_Output.cpp \
	NdbBackup.cpp  NdbConfig.cpp NdbGrep.cpp NDBT_Table.cpp \
	NdbSchemaCon.cpp NdbSchemaOp.cpp getarg.c \
	CpcClient.cpp
	CpcClient.cpp NDBT_Thread.cpp

INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/common/mgmcommon -I$(top_srcdir)/storage/ndb/include/mgmcommon -I$(top_srcdir)/storage/ndb/include/kernel -I$(top_srcdir)/storage/ndb/src/mgmapi

+283 −0
Original line number Diff line number Diff line
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include <NDBT_Thread.hpp>
#include <NdbApi.hpp>

NDBT_Thread::NDBT_Thread()
{
  create(0, -1);
}

NDBT_Thread::NDBT_Thread(NDBT_ThreadSet* thread_set, int thread_no)
{
  create(thread_set, thread_no);
}

void
NDBT_Thread::create(NDBT_ThreadSet* thread_set, int thread_no)
{
  m_magic = NDBT_Thread::Magic;

  m_state = Wait;
  m_thread_set = thread_set;
  m_thread_no = thread_no;
  m_func = 0;
  m_input = 0;
  m_output = 0;
  m_ndb = 0;
  m_err = 0;

  m_mutex = NdbMutex_Create();
  assert(m_mutex != 0);
  m_cond = NdbCondition_Create();
  assert(m_cond != 0);

  char buf[20];
  sprintf(buf, "NDBT_%04u");
  const char* name = strdup(buf);
  assert(name != 0);

  unsigned stacksize = 512 * 1024;
  NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_LOW;
  m_thread = NdbThread_Create(NDBT_Thread_run,
                              (void**)this, stacksize, name, prio);
  assert(m_thread != 0);
}

NDBT_Thread::~NDBT_Thread()
{
  if (m_thread != 0) {
    NdbThread_Destroy(&m_thread);
    m_thread = 0;
  }
  if (m_cond != 0) {
    NdbCondition_Destroy(m_cond);
    m_cond = 0;
  }
  if (m_mutex != 0) {
    NdbMutex_Destroy(m_mutex);
    m_mutex = 0;
  }
}

static void*
NDBT_Thread_run(void* arg)
{
  assert(arg != 0);
  NDBT_Thread& thr = *(NDBT_Thread*)arg;
  assert(thr.m_magic == NDBT_Thread::Magic);
  thr.run();
  return 0;
}

void
NDBT_Thread::run()
{
  while (1) {
    lock();
    while (m_state != Start && m_state != Exit) {
      wait();
    }
    if (m_state == Exit) {
      unlock();
      break;
    }
    (*m_func)(*this);
    m_state = Stop;
    signal();
    unlock();
  }
}

// methods for main process

void
NDBT_Thread::start()
{
  lock();
  m_state = Start;
  signal();
  unlock();
}

void
NDBT_Thread::stop()
{
  lock();
  while (m_state != Stop)
    wait();
  m_state = Wait;
  unlock();
}

void
NDBT_Thread::exit()
{
  lock();
  m_state = Exit;
  signal();
  unlock();
};

void
NDBT_Thread::join()
{
  NdbThread_WaitFor(m_thread, &m_status);
  m_thread = 0;
}

int
NDBT_Thread::connect(class Ndb_cluster_connection* ncc, const char* db)
{
  m_ndb = new Ndb(ncc, db);
  if (m_ndb->init() == -1 ||
      m_ndb->waitUntilReady() == -1) {
    m_err = m_ndb->getNdbError().code;
    return -1;
  }
  return 0;
}

void
NDBT_Thread::disconnect()
{
  delete m_ndb;
  m_ndb = 0;
}

// set of threads

NDBT_ThreadSet::NDBT_ThreadSet(int count)
{
  m_count = count;
  m_thread = new NDBT_Thread* [count];
  for (int n = 0; n < count; n++) {
    m_thread[n] = new NDBT_Thread(this, n);
  }
}

NDBT_ThreadSet::~NDBT_ThreadSet()
{
  delete_output();
  for (int n = 0; n < m_count; n++) {
    delete m_thread[n];
    m_thread[n] = 0;
  }
  delete [] m_thread;
}

void
NDBT_ThreadSet::start()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.start();
  }
}

void
NDBT_ThreadSet::stop()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.stop();
  }
}

void
NDBT_ThreadSet::exit()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.exit();
  }
}

void
NDBT_ThreadSet::join()
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.join();
  }
}

void
NDBT_ThreadSet::set_func(NDBT_ThreadFunc* func)
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.set_func(func);
  }
}

void
NDBT_ThreadSet::set_input(const void* input)
{
  for (int n = 0; n < m_count; n++) {
    NDBT_Thread& thr = *m_thread[n];
    thr.set_input(input);
  }
}

void
NDBT_ThreadSet::delete_output()
{
  for (int n = 0; n < m_count; n++) {
    if (m_thread[n] != 0) {
      NDBT_Thread& thr = *m_thread[n];
      thr.delete_output();
    }
  }
}

int
NDBT_ThreadSet::connect(class Ndb_cluster_connection* ncc, const char* db)
{
  for (int n = 0; n < m_count; n++) {
    assert(m_thread[n] != 0);
    NDBT_Thread& thr = *m_thread[n];
    if (thr.connect(ncc, db) == -1)
      return -1;
  }
  return 0;
}

void
NDBT_ThreadSet::disconnect()
{
  for (int n = 0; n < m_count; n++) {
    if (m_thread[n] != 0) {
      NDBT_Thread& thr = *m_thread[n];
      thr.disconnect();
    }
  }
}

int
NDBT_ThreadSet::get_err() const
{
  for (int n = 0; n < m_count; n++) {
    if (m_thread[n] != 0) {
      NDBT_Thread& thr = *m_thread[n];
      int err = thr.get_err();
      if (err != 0)
        return err;
    }
  }
  return 0;
}
Loading