Commit 8b63333a authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel
Browse files

[t:4797] adding upgrade test runner stuff to run.stress-tests.py


git-svn-id: file:///svn/toku/tokudb@42669 c7de825b-a66e-492c-adef-691d508d4ae1
parent b124c11a
Loading
Loading
Loading
Loading
+202 −50
Original line number Diff line number Diff line
@@ -16,12 +16,13 @@ tests once a day.

import logging
import os
import re
import sys
import time

from glob import glob
from logging import debug, info, warning, error, exception
from optparse import OptionParser
from optparse import OptionGroup, OptionParser
from Queue import Queue
from random import randrange, shuffle
from resource import setrlimit, RLIMIT_CORE
@@ -41,13 +42,6 @@ __copyright__ = """Copyright (c) 2007-2012 Tokutek Inc. All rights reserved.
                No. 11/760379 and to the patents and/or patent
                applications resulting from it."""

testnames = ['test_stress1.tdb',
             'test_stress5.tdb',
             'test_stress6.tdb']
recover_testnames = ['recover-test_stress1.tdb',
                     'recover-test_stress2.tdb',
                     'recover-test_stress3.tdb']

def setlimits():
    setrlimit(RLIMIT_CORE, (-1, -1))
    os.nice(7)
@@ -88,6 +82,7 @@ class TestRunnerBase(object):
        self.outf = None
        self.times = [0, 0]
        self.is_large = (tsize >= 10000000)
        self.oldversionstr = 'noupgrade'

    def __str__(self):
        return 'TestRunner<%s, %d, %d>' % (self.execf, self.tsize, self.csize)
@@ -99,6 +94,7 @@ class TestRunnerBase(object):
        return '\t'.join(['%(execf)s',
                          '%(rev)s',
                          '%(tsize)d',
                          '%(oldversionstr)s',
                          '%(csize)d',
                          '%(num_ptquery)d',
                          '%(num_update)d',
@@ -141,6 +137,9 @@ class TestRunnerBase(object):
        else:
            debug('%s preparing an environment.', self)
            self.run_prepare()
            self.save_prepared_envdir()

    def save_prepared_envdir(self):
        debug('%s copying environment to %s.', self, self.prepareloc)
        copytree(self.envdir, self.prepareloc)

@@ -175,17 +174,26 @@ class TestRunnerBase(object):
            self.nruns += 1

    def save(self):
        savepfx = '%(execf)s-%(rev)s-%(tsize)d-%(csize)d-%(num_ptquery)d-%(num_update)d-%(phase)s' % self
        savepfx = '%(execf)s-%(rev)s-%(tsize)d-%(csize)d-%(num_ptquery)d-%(num_update)d-%(phase)s-' % self
        savedir = mkdtemp(dir=self.savedir, prefix=savepfx)
        def targetfor(path):
            return os.path.join(savedir, os.path.basename(path))

        copytree(self.rundir, targetfor(self.rundir))
        for f in glob(os.path.join(self.rundir, '*')):
            if os.path.isdir(f):
                copytree(f, targetfor(f))
            else:
                copy(f, targetfor(f))
        fullexecf = os.path.join(self.tokudb, 'src', 'tests', self.execf)
        copy(fullexecf, targetfor(fullexecf))
        for lib in glob(os.path.join(self.tokudb, 'lib', '*.so')):
            copy(lib, targetfor(lib))

        commandsf = open(targetfor("commands.txt"), "w")
        print >>commandsf, ' '.join([self.execf] + self.prepareargs)
        print >>commandsf, ' '.join([self.execf] + self.testargs)
        commandsf.close()

        return savedir

    def waitfor(self, proc):
@@ -196,6 +204,7 @@ class TestRunnerBase(object):
                raise Killed()

    def spawn_child(self, args):
        logging.debug('%s spawning %s', self, ' '.join([self.execf] + args))
        proc = Popen([self.execf] + args,
                     executable=os.path.join('..', self.execf),
                     env=self.env,
@@ -206,43 +215,99 @@ class TestRunnerBase(object):
        self.waitfor(proc)
        return proc.returncode

    @property
    def extraargs(self):
        # for overriding
        return []

    @property
    def prepareargs(self):
        return ['-v',
                '--envdir', 'envdir',
                '--num_elements', str(self.tsize),
                '--cachetable_size', str(self.csize)]
                '--cachetable_size', str(self.csize)] + self.extraargs

    @property
    def testargs(self):
        return ['--num_seconds', str(self.test_time),
                '--no-crash_on_update_failure',
                '--num_ptquery_threads', str(self.num_ptquery),
                '--num_update_threads', str(self.num_update)] + self.prepareargs()
                '--num_update_threads', str(self.num_update)] + self.prepareargs

class TestRunner(TestRunnerBase):
    def run_prepare(self):
        self.phase = "create"
        if self.spawn_child(['--only_create'] + self.prepareargs()) != 0:
        if self.spawn_child(['--only_create'] + self.prepareargs) != 0:
            raise TestFailure('%s crashed during --only_create.' % self.execf)

    def run_test(self):
        self.phase = "stress"
        if self.spawn_child(['--only_stress'] + self.testargs()) != 0:
        if self.spawn_child(['--only_stress'] + self.testargs) != 0:
            raise TestFailure('%s crashed during --only_stress.' % self.execf)

class RecoverTestRunner(TestRunnerBase):
    def run_prepare(self):
        self.phase = "create"
        if self.spawn_child(['--only_create', '--test'] + self.prepareargs()) != 0:
        if self.spawn_child(['--only_create', '--test'] + self.prepareargs) != 0:
            raise TestFailure('%s crashed during --only_create --test.' % self.execf)

    def run_test(self):
        self.phase = "test"
        if self.spawn_child(['--only_stress', '--test'] + self.testargs()) == 0:
        if self.spawn_child(['--only_stress', '--test'] + self.testargs) == 0:
            raise TestFailure('%s did not crash during --only_stress --test' % self.execf)
        self.phase = "recover"
        if self.spawn_child(['--recover'] + self.prepareargs()) != 0:
        if self.spawn_child(['--recover'] + self.prepareargs) != 0:
            raise TestFailure('%s crashed during --recover' % self.execf)

class UpgradeTestRunnerMixin(TestRunnerBase):
    def __init__(self, old_environments_dir, version, pristine_or_stressed, **kwargs):
        super(UpgradeTestRunnerMixin, self).__init__(**kwargs)
        self.version = version
        self.pristine_or_stressed = pristine_or_stressed
        self.old_env_dirs = os.path.join(old_environments_dir, version)
        self.oldversionstr = '%(version)s-%(pristine_or_stressed)s' % self

    @property
    def extraargs(self):
        return ['--num_DBs', '1']

    @property
    def old_envdir(self):
        oldname = 'saved%(pristine_or_stressed)s-%(tsize)d-dir' % self
        logging.debug('%s using old version environment %s from %s.', self, oldname, self.old_env_dirs)
        return os.path.join(self.old_env_dirs, oldname)

    def save_prepared_envdir(self):
        # no need to do this
        pass

    def run_prepare(self):
        self.phase = "create"
        copytree(self.old_envdir, self.envdir)

class DoubleTestRunnerMixin(TestRunnerBase):
    """Runs the test phase twice in a row.

    Good for upgrade tests, to run the test once to upgrade it and then
    again to make sure the upgrade left it in a good state.
    """

    def run_test(self):
        super(DoubleTestRunnerMixin, self).run_test()
        super(DoubleTestRunnerMixin, self).run_test()

class UpgradeTestRunner(UpgradeTestRunnerMixin, TestRunner):
    pass

class UpgradeRecoverTestRunner(UpgradeTestRunnerMixin, RecoverTestRunner):
    pass

class DoubleUpgradeTestRunner(DoubleTestRunnerMixin, UpgradeTestRunner):
    pass

class DoubleUpgradeRecoverTestRunner(DoubleTestRunnerMixin, UpgradeRecoverTestRunner):
    pass

class Worker(Thread):
    def __init__(self, scheduler):
        super(Worker, self).__init__()
@@ -348,7 +413,7 @@ def compiler_works(cc):
        exception('Error running %s.', cc)
        return False

def rebuild(tokudb, cc):
def rebuild(tokudb, cc, tests):
    env = os.environ
    env['CC'] = cc
    env['DEBUG'] = '0'
@@ -370,7 +435,7 @@ def rebuild(tokudb, cc):
    if r != 0:
        error('Building the fractal tree failed.')
        sys.exit(r)
    r = call(['make', '-s'] + testnames + recover_testnames,
    r = call(['make', '-s'] + tests,
             cwd=os.path.join(tokudb, 'src', 'tests'), env=env)
    if r != 0:
        error('Building the tests failed.')
@@ -386,7 +451,7 @@ def revfor(tokudb):

def main(opts):
    if opts.build:
        rebuild(opts.tokudb, opts.cc)
        rebuild(opts.tokudb, opts.cc, opts.testnames + opts.recover_testnames)
    rev = revfor(opts.tokudb)

    if not os.path.exists(opts.savedir):
@@ -405,12 +470,59 @@ def main(opts):
    runners = []
    for tsize in [2000, 200000, 50000000]:
        for csize in [50 * tsize, 1000 ** 3]:
            for test in testnames:
                runners.append(TestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
                                          test, tsize, csize, opts.test_time, opts.savedir))
            for test in recover_testnames:
                runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
                                                 test, tsize, csize, opts.test_time, opts.savedir))
            kwargs = {
                'scheduler': scheduler,
                'tokudb': opts.tokudb,
                'rev': rev,
                'jemalloc': opts.jemalloc,
                'tsize': tsize,
                'csize': csize,
                'test_time': opts.test_time,
                'savedir': opts.savedir
                }
            for test in opts.testnames:
                if opts.run_non_upgrade:
                    runners.append(TestRunner(execf=test, **kwargs))

                if opts.run_upgrade:
                    for version in opts.old_versions:
                        for pristine_or_stressed in ['pristine', 'stressed']:
                            upgrade_kwargs = {
                                'old_environments_dir': opts.old_environments_dir,
                                'version': version,
                                'pristine_or_stressed': pristine_or_stressed
                                }
                            upgrade_kwargs.update(kwargs)
                            if opts.double_upgrade:
                                runners.append(DoubleUpgradeTestRunner(
                                        execf=test,
                                        **upgrade_kwargs))
                            else:
                                runners.append(UpgradeTestRunner(
                                        execf=test,
                                        **upgrade_kwargs))

            for test in opts.recover_testnames:
                if opts.run_non_upgrade:
                    runners.append(RecoverTestRunner(execf=test, **kwargs))

                if opts.run_upgrade:
                    for version in opts.old_versions:
                        for pristine_or_stressed in ['pristine', 'stressed']:
                            upgrade_kwargs = {
                                'old_environments_dir': opts.old_environments_dir,
                                'version': version,
                                'pristine_or_stressed': pristine_or_stressed
                                }
                            upgrade_kwargs.update(kwargs)
                            if opts.double_upgrade:
                                runners.append(DoubleUpgradeRecoverTestRunner(
                                        execf=test,
                                        **upgrade_kwargs))
                            else:
                                runners.append(UpgradeRecoverTestRunner(
                                        execf=test,
                                        **upgrade_kwargs))

    shuffle(runners)

@@ -423,7 +535,7 @@ def main(opts):
            if scheduler.error is not None:
                error('Scheduler reported an error.')
                raise scheduler.error
            rebuild(opts.tokudb, opts.cc)
            rebuild(opts.tokudb, opts.cc, opts.testnames + opts.recover_testnames)
            rev = revfor(opts.tokudb)
            for runner in runners:
                runner.rev = rev
@@ -490,38 +602,78 @@ if __name__ == '__main__':
    a0 = os.path.abspath(sys.argv[0])
    usage = '%prog [options]\n' + __doc__
    parser = OptionParser(usage=usage)
    parser.add_option('-v', '--verbose', action='store_true', dest='verbose', default=False)
    parser.add_option('-d', '--debug', action='store_true', dest='debug', default=False)
    default_toplevel = os.path.dirname(os.path.dirname(a0))
    parser.add_option('-v', '--verbose', action='store_true', dest='verbose', default=False, help='show build status, passing tests, and other info')
    parser.add_option('-d', '--debug', action='store_true', dest='debug', default=False, help='show debugging info')
    parser.add_option('-l', '--log', type='string', dest='log',
                      default='/tmp/run.stress-tests.log',
                      help='where to save logfiles')
    parser.add_option('-s', '--savedir', type='string', dest='savedir',
                      default='/tmp/run.stress-tests.failures',
                      help='where to save environments and extra data for failed tests')
    default_toplevel = os.path.dirname(os.path.dirname(a0))
    parser.add_option('--tokudb', type='string', dest='tokudb',
                      default=default_toplevel,
                      help=('top of the tokudb tree (contains newbrt/ and src/) [default=%s]' % os.path.relpath(default_toplevel)))
    parser.add_option('-t', '--test_time', type='int', dest='test_time',

    test_group = OptionGroup(parser, 'Scheduler Options', 'Control how the scheduler runs jobs.')
    test_group.add_option('-t', '--test_time', type='int', dest='test_time',
                          default=600,
                          help='time to run each test, in seconds [default=600]'),
    parser.add_option('-j', '--jobs', type='int', dest='jobs', default=8,
    test_group.add_option('-j', '--jobs', type='int', dest='jobs', default=8,
                          help='how many concurrent tests to run [default=8]')
    parser.add_option('--maxlarge', type='int', dest='maxlarge', default=2,
    test_group.add_option('--maxlarge', type='int', dest='maxlarge', default=2,
                          help='maximum number of large tests to run concurrently (helps prevent swapping) [default=2]')
    parser.add_option('--no-build', action='store_false', dest='build', default=True,
                      help="don't build before testing [default=do build]")
    parser.add_option('--rebuild_period', type='int', dest='rebuild_period',
                      default=60 * 60 * 24,
                      help='how many seconds between svn up and rebuild, 0 means never rebuild [default=24 hours]')
    parser.add_option('--cc', type='string', dest='cc', default='icc',
    parser.add_option_group(test_group)


    default_testnames = ['test_stress1.tdb',
                         'test_stress5.tdb',
                         'test_stress6.tdb']
    default_recover_testnames = ['recover-test_stress1.tdb',
                                 'recover-test_stress2.tdb',
                                 'recover-test_stress3.tdb']
    build_group = OptionGroup(parser, 'Build Options', 'Control how the fractal tree and tests get built.')
    build_group.add_option('--skip_build', action='store_false', dest='build', default=True,
                           help='skip the svn up and build phase before testing [default=False]')
    build_group.add_option('--rebuild_period', type='int', dest='rebuild_period', default=60 * 60 * 24,
                           help='how many seconds between doing an svn up and rebuild, 0 means never rebuild [default=24 hours]')
    build_group.add_option('--cc', type='string', dest='cc', default='icc',
                           help='which compiler to use [default=icc]')
    parser.add_option('--jemalloc', type='string', dest='jemalloc',
    build_group.add_option('--jemalloc', type='string', dest='jemalloc',
                           help='a libjemalloc.so to put in LD_PRELOAD when running tests')
    build_group.add_option('--add_test', action='append', type='string', dest='testnames', default=default_testnames,
                           help=('add a stress test to run [default=%r]' % default_testnames))
    build_group.add_option('--add_recover_test', action='append', type='string', dest='recover_testnames', default=default_recover_testnames,
                           help=('add a recover stress test to run [default=%r]' % default_recover_testnames))
    parser.add_option_group(build_group)

    upgrade_group = OptionGroup(parser, 'Upgrade Options', 'Also run on environments from old versions of tokudb.')
    upgrade_group.add_option('--run_upgrade', action='store_true', dest='run_upgrade', default=False,
                             help='run the tests on old dictionaries as well, to test upgrade [default=False]')
    upgrade_group.add_option('--skip_non_upgrade', action='store_false', dest='run_non_upgrade', default=True,
                             help="skip the tests that don't involve upgrade [default=False]")
    upgrade_group.add_option('--double_upgrade', action='store_true', dest='double_upgrade', default=False,
                             help='run the upgrade tests twice in a row [default=False]')
    upgrade_group.add_option('--add_old_version', action='append', type='choice', dest='old_versions', choices=['4.2.0', '5.0.8', '5.2.7'],
                             help='which old versions to use for running the stress tests in upgrade mode. can be specified multiple times [options=4.2.0, 5.0.8, 5.2.7]')
    upgrade_group.add_option('--old_environments_dir', type='string', dest='old_environments_dir',
                             default='../../tokudb.data/stress_environments',
                             help='directory containing old version environments (should contain 5.0.8/, 5.2.7/, etc, and the environments should be in those) [default=../../tokudb.data/stress_environments]')
    parser.add_option_group(upgrade_group)

    (opts, args) = parser.parse_args()
    if len(args) > 0:
        parser.print_usage()
        sys.exit(1)
        parser.error('Invalid arguments: %r' % args)

    if opts.run_upgrade:
        if not os.path.isdir(opts.old_environments_dir):
            parser.error('You specified --run_upgrade but did not specify an --old_environments_dir that exists.')
        if len(opts.old_versions) < 1:
            parser.error('You specified --run_upgrade but gave no --old_versions to run against.')
        for version in opts.old_versions:
            version_dir = os.path.join(opts.old_environments_dir, version)
            if not os.path.isdir(version_dir):
                parser.error('You specified --run_upgrade but %s is not a directory.' % version_dir)

    if opts.debug:
        logging.basicConfig(level=logging.DEBUG)