Commit c79d5d31 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel
Browse files

stress test runner: better logging, refactoring, and saving prepared environments between test runs

git-svn-id: file:///svn/toku/tokudb@42140 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1c7e9991
Loading
Loading
Loading
Loading
+126 −120
Original line number Diff line number Diff line
@@ -29,7 +29,6 @@ from shutil import copy, copytree, move, rmtree
from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM
from subprocess import call, Popen, PIPE, STDOUT
from tempfile import mkdtemp, mkstemp
from thread import get_ident
from threading import Event, Thread, Timer

__version__   = '$Id$'
@@ -53,6 +52,12 @@ def setlimits():
    setrlimit(RLIMIT_CORE, (-1, -1))
    os.nice(7)

def timestr(timeval):
    if timeval == 0:
        return 'None'
    else:
        return time.ctime(timeval)

class TestFailure(Exception):
    pass

@@ -60,7 +65,7 @@ class Killed(Exception):
    pass

class TestRunnerBase(object):
    def __init__(self, scheduler, tokudb, rev, jemalloc, execf, tsize, csize, test_time, savedir, log):
    def __init__(self, scheduler, tokudb, rev, jemalloc, execf, tsize, csize, test_time, savedir):
        self.scheduler = scheduler
        self.tokudb = tokudb
        self.rev = rev
@@ -69,8 +74,8 @@ class TestRunnerBase(object):
        self.csize = csize
        self.test_time = test_time
        self.savedir = savedir
        self.env = os.environ

        self.env = os.environ
        libpath = os.path.join(self.tokudb, 'lib')
        if 'LD_LIBRARY_PATH' in self.env:
            self.env['LD_LIBRARY_PATH'] = '%s:%s' % (libpath, self.env['LD_LIBRARY_PATH'])
@@ -84,129 +89,105 @@ class TestRunnerBase(object):
            else:
                self.env['LD_PRELOAD'] = preload

        loggername = '%s-%d-%d' % (self.execf, self.tsize, self.csize)
        self.logger = logging.getLogger(loggername)
        self.logger.propagate = False
        self.logger.setLevel(logging.INFO)
        logfile = os.path.join(log, loggername)
        self.logger.addHandler(logging.FileHandler(logfile))

        self.nruns = 0
        self.rundir = None
        self.tmplog = None
        self.tmplogname = None
        self.phase = 0
        self.times = [0, 0, 0]
        self.outf = None
        self.times = [0, 0]
        self.is_large = (tsize >= 10000000)

    def __str__(self):
        return 'TestRunner<%s, %d, %d>' % (self.execf, self.tsize, self.csize)

    def infostr(self):
        return '%(execf)s\t%(rev)s\t%(tsize)d\t%(csize)d\t%(num_ptquery)d\t%(num_update)d\t%(time)d' % self

    def __getitem__(self, k):
        if k == 'time1':
            return self.timestr(0)
        elif k == 'time2':
            return self.timestr(1)
        elif k == 'time3':
            return self.timestr(2)
        else:
            return self.__dict__[k]
        return self.__getattribute__(k)

    def run(self):
        srctests = os.path.join(self.tokudb, 'src', 'tests')
        self.rundir = mkdtemp(dir=srctests)
        (tmplog, self.tmplogname) = mkstemp()
        self.tmplog = os.fdopen(tmplog)
    @property
    def time(self):
        if self.times[0] != 0 and self.times[1] != 0:
            return self.times[1] - self.times[0]
        else:
            return 0

    @property
    def num_ptquery(self):
        if self.nruns % 2 < 1:
            self.ptquery = 1
            return 1
        else:
            self.ptquery = randrange(16)
            return randrange(16)

    @property
    def num_update(self):
        if self.nruns % 4 < 2:
            self.update = 1
            return 1
        else:
            self.update = randrange(16)
            return randrange(16)

        self.envdir = ('../%s-%d-%d-%d-%d-%d.dir' %
                       (self.execf, self.tsize, self.csize,
                        self.ptquery, self.update, get_ident()))
    @property
    def prepareloc(self):
        preparename = 'dir.%(execf)s-%(tsize)d-%(csize)d' % self
        return os.path.join(self.tokudb, 'src', 'tests', preparename)

    def prepare(self):
        if os.path.isdir(self.prepareloc):
            debug('%s found existing environment.', self)
            copytree(self.prepareloc, os.path.join(self.rundir, self.envdir))
        else:
            debug('%s preparing an environment.', self)
            self.run_prepare()
            debug('%s copying environment to %s.', self, self.prepareloc)
            copytree(os.path.join(self.rundir, self.envdir), self.prepareloc)

    def delete_prepared_env(self):
        if os.path.isdir(self.prepareloc):
            rmtree(self.prepareloc)

    def run(self):
        srctests = os.path.join(self.tokudb, 'src', 'tests')
        self.rundir = mkdtemp(dir=srctests)
        self.envdir = '%(execf)s-%(tsize)d-%(csize)d-%(num_ptquery)d-%(num_update)d' % self

        try:
            outname = os.path.join(self.rundir, 'output.txt')
            self.outf = open(outname, 'w')

            try:
                self.times[0] = time.time()
                debug('%s preparing.', self)
                self.setup_test()
                self.times[1] = time.time()
                self.prepare()
                debug('%s testing.', self)
                self.times[0] = time.time()
                self.run_test()
                self.times[2] = time.time()
                self.times[1] = time.time()
                debug('%s done.', self)
            except Killed:
                pass
            except TestFailure:
                savedir = self.save()
                self.print_failure()
                self.scheduler.report_failure(self)
                warning('Saved environment to %s', savedir)
            else:
                self.print_success()
                self.scheduler.report_success(self)
        finally:
            fullenvdir = os.path.join(self.rundir, self.envdir)
            rmtree(fullenvdir, ignore_errors=True)
            self.envdir = None
            rmtree(self.rundir, ignore_errors=True)
            self.outf.close()
            rmtree(self.rundir)
            self.rundir = None
            self.tmplog.close()
            self.tmplog = None
            if os.path.exists(self.tmplogname):
                os.remove(self.tmplogname)
            self.tmplogname = None
            self.times = [0, 0, 0]
            self.times = [0, 0]
            self.nruns += 1

    def save(self):
        savepfx = ('%s-%s-%d-%d-%d-%d-%s-' %
                   (self.execf, self.rev, self.tsize, self.csize,
                    self.ptquery, self.update, self.phase))
        savepfx = '%(execf)s-%(rev)s-%(tsize)d-%(csize)d-%(num_ptquery)d-%(num_update)d-%(phase)s' % self
        savedir = mkdtemp(dir=self.savedir, prefix=savepfx)
        def targetfor(path):
            return os.path.join(savedir, os.path.basename(path))
        if os.path.exists(self.tmplogname):
            move(self.tmplogname, targetfor("output.txt"))
        for core in glob(os.path.join(self.rundir, "core*")):
            move(core, targetfor(core))
        if os.path.exists(self.envdir):
            copytree(self.envdir, targetfor(self.envdir))

        copytree(self.rundir, targetfor(self.rundir))
        fullexecf = os.path.join(self.tokudb, 'src', 'tests', self.execf)
        copy(fullexecf, targetfor(fullexecf))
        for lib in glob(os.path.join(self.tokudb, 'lib', '*')):
        for lib in glob(os.path.join(self.tokudb, 'lib', '*.so')):
            copy(lib, targetfor(lib))
        return savedir

    def timestr(self, phase):
        timeval = self.times[phase]
        if timeval == 0:
            return ''
        else:
            return time.ctime(timeval)

    def infostr(self, result):
        return ('[PASS=%d FAIL=%d] %s %s tsize=%d csize=%d ptquery=%d update=%d' %
                (self.scheduler.passed, self.scheduler.failed, result,
                 self.execf, self.tsize, self.csize, self.ptquery, self.update))

    def logstr(self, result):
        fmtstr = result + '\t%(execf)s\t%(rev)s\t%(tsize)d\t%(csize)d\t%(ptquery)d\t%(update)d\t%(time1)s\t%(time2)s\t%(time3)s'
        return fmtstr % self

    def print_success(self):
        self.scheduler.passed += 1
        self.logger.info(self.logstr('PASSED'))
        info(self.infostr('PASSED'))

    def print_failure(self):
        self.scheduler.failed += 1
        self.logger.warning(self.logstr('FAILED'))
        warning(self.infostr('FAILED'))
        return savedir

    def waitfor(self, proc):
        while proc.poll() is None:
@@ -215,49 +196,52 @@ class TestRunnerBase(object):
                os.kill(proc.pid, SIGTERM)
                raise Killed()

    def defaultargs(self, timed):
        a = ['-v',
             '--envdir', self.envdir,
             '--num_elements', str(self.tsize),
             '--cachetable_size', str(self.csize)]
        if timed:
            a += ['--num_seconds', str(self.test_time),
                  '--no-crash_on_update_failure',
                  '--num_ptquery_threads', str(self.ptquery),
                  '--num_update_threads', str(self.update)]
        return a

    def spawn_child(self, mode, timed):
        proc = Popen([self.execf, mode] + self.defaultargs(timed),
    def spawn_child(self, args):
        proc = Popen([self.execf] + args,
                     executable=os.path.join('..', self.execf),
                     env=self.env,
                     cwd=self.rundir,
                     preexec_fn=setlimits,
                     stdout=self.tmplog,
                     stdout=self.outf,
                     stderr=STDOUT)
        self.waitfor(proc)
        return proc.returncode

    def prepareargs(self):
        return ['-v',
                '--envdir', self.envdir,
                '--num_elements', str(self.tsize),
                '--cachetable_size', str(self.csize)]

    def testargs(self):
        return ['--num_seconds', str(self.test_time),
                '--no-crash_on_update_failure',
                '--num_ptquery_threads', str(self.num_ptquery),
                '--num_update_threads', str(self.num_update)] + self.prepareargs()

class TestRunner(TestRunnerBase):
    def setup_test(self):
    def run_prepare(self):
        self.phase = "create"
        if self.spawn_child('--only_create', False) != 0:
        if self.spawn_child(['--only_create'] + self.prepareargs()) != 0:
            raise TestFailure('%s crashed during --only_create.' % self.execf)

    def run_test(self):
        self.phase = "stress"
        if self.spawn_child('--only_stress', True) != 0:
        if self.spawn_child(['--only_stress'] + self.testargs()) != 0:
            raise TestFailure('%s crashed during --only_stress.' % self.execf)

class RecoverTestRunner(TestRunnerBase):
    def setup_test(self):
        self.phase = "test"
        if self.spawn_child('--test', True) == 0:
            raise TestFailure('%s did not crash during --test' % self.execf)
    def run_prepare(self):
        self.phase = "create"
        if self.spawn_child(['--only_create', '--test'] + self.prepareargs()) != 0:
            raise TestFailure('%s crashed during --only_create --test.' % self.execf)

    def run_test(self):
        self.phase = "test"
        if self.spawn_child(['--only_stress', '--test'] + self.testargs()) == 0:
            raise TestFailure('%s did not crash during --only_stress --test' % self.execf)
        self.phase = "recover"
        if self.spawn_child('--recover', False) != 0:
        if self.spawn_child(['--recover'] + self.prepareargs()) != 0:
            raise TestFailure('%s crashed during --recover' % self.execf)

class Worker(Thread):
@@ -290,18 +274,19 @@ class Worker(Thread):
        debug('%s exiting.' % self)

class Scheduler(Queue):
    def __init__(self, nworkers, maxlarge):
    def __init__(self, nworkers, maxlarge, logger):
        Queue.__init__(self)
        info('Initializing scheduler with %d jobs.', nworkers)
        self.nworkers = nworkers
        self.logger = logger
        self.maxlarge = maxlarge
        self.nlarge = 0  # not thread safe, don't really care right now
        self.passed = 0
        self.failed = 0
        self.workers = []
        self.stopping = Event()
        self.timer = None
        self.error = None
        self.nlarge = 0  # not thread safe, don't really care right now
        self.maxlarge = maxlarge

    def run(self, timeout):
        info('Starting workers.')
@@ -338,6 +323,22 @@ class Scheduler(Queue):
        info('Stopping workers.')
        self.stopping.set()

    def __getitem__(self, k):
        return self.__dict__[k]

    def reportstr(self):
        return '[PASS=%(passed)d FAIL=%(failed)d]' % self

    def report_success(self, runner):
        self.passed += 1
        self.logger.info('PASSED %s', runner.infostr())
        info('%s PASSED %s', self.reportstr(), runner.infostr())

    def report_failure(self, runner):
        self.failed += 1
        self.logger.warning('FAILED %s', runner.infostr())
        warning('%s FAILED %s', self.reportstr(), runner.infostr())

def compiler_works(cc):
    try:
        devnull = open(os.devnull, 'w')
@@ -388,26 +389,29 @@ def main(opts):
    if opts.build:
        rebuild(opts.tokudb, opts.cc)
    rev = revfor(opts.tokudb)
    if not os.path.exists(opts.log):
        os.mkdir(opts.log)

    if not os.path.exists(opts.savedir):
        os.mkdir(opts.savedir)

    logger = logging.getLogger('stress')
    logger.propagate = False
    logger.setLevel(logging.INFO)
    logger.addHandler(logging.FileHandler(opts.log))

    info('Saving pass/fail logs to %s.', opts.log)
    info('Saving failure environments to %s.', opts.savedir)

    scheduler = Scheduler(opts.jobs, opts.maxlarge)
    scheduler = Scheduler(opts.jobs, opts.maxlarge, logger)

    runners = []
    for tsize in [2000, 200000, 50000000]:
        for csize in [50 * tsize, 1000 ** 3]:
            for test in testnames:
                runners.append(TestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
                                          test, tsize, csize, opts.test_time,
                                          opts.savedir, opts.log))
                                          test, tsize, csize, opts.test_time, opts.savedir))
            for test in recover_testnames:
                runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
                                                 test, tsize, csize, opts.test_time,
                                                 opts.savedir, opts.log))
                                                 test, tsize, csize, opts.test_time, opts.savedir))

    shuffle(runners)

@@ -420,6 +424,8 @@ def main(opts):
            if scheduler.error is not None:
                error('Scheduler reported an error.')
                raise scheduler.error
            for runner in runners:
                runner.delete_prepared_env()
            rebuild(opts.tokudb, opts.cc)
            rev = revfor(opts.tokudb)
            for runner in runners: