Commit ab191165 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel
Browse files

better logging, exception handling for stress test runner

git-svn-id: file:///svn/toku/tokudb@42103 c7de825b-a66e-492c-adef-691d508d4ae1
parent 178ac5b7
Loading
Loading
Loading
Loading
+103 −93
Original line number Diff line number Diff line
@@ -14,22 +14,23 @@ By default, we stop everything, update from svn, rebuild, and restart the
tests once a day.
"""

import logging
import os
import sys
import time

from glob import glob
from logging import debug, info, warning, error, exception
from optparse import OptionParser
import os
from Queue import Queue
from random import randrange
from resource import setrlimit, RLIMIT_CORE
from shutil import copy, copytree, move, rmtree
from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM
from subprocess import call, Popen, PIPE, STDOUT
import sys
from tempfile import mkdtemp, mkstemp
import time
import thread
from thread import get_ident
from threading import Event, Thread, Timer
import threading
import traceback

__version__   = '$Id$'
__copyright__ = """Copyright (c) 2007-2012 Tokutek Inc.  All rights reserved.
@@ -48,12 +49,6 @@ recover_testnames = ['recover-test_stress1.tdb',
                     'recover-test_stress2.tdb',
                     'recover-test_stress3.tdb']

verbose = False
debug = False
def dbg(a):
    if debug:
        print >>sys.stderr, a

def setlimits():
    setrlimit(RLIMIT_CORE, (-1, -1))
    os.nice(7)
@@ -74,12 +69,18 @@ class TestRunnerBase(object):
        self.csize = csize
        self.test_time = test_time
        self.savedir = savedir
        self.logfile = os.path.join(log, ('%s-%d-%d' %
                                          (self.execf, self.tsize, self.csize)))
        self.nruns = 0
        self.env = os.environ
        self.env['LD_LIBRARY_PATH'] = (os.path.join(self.tokudb, 'lib') +
                                       ':' + self.env['LD_LIBRARY_PATH'])
        self.env['LD_LIBRARY_PATH'] = '%s:%s' % (os.path.join(self.tokudb, 'lib'),
                                                 self.env['LD_LIBRARY_PATH'])

        loggername = '%s-%d-%d' % (self.execf, self.tsize, self.csize)
        self.logger = logging.getLogger(loggername)
        self.logger.propagate = False
        self.logger.setLevel(logging.INFO)
        logfile = os.path.join(log, loggername)
        self.logger.addHandler(logging.FileHandler(logfile))

        self.nruns = 0
        self.rundir = None
        self.tmplog = None
        self.tmplogname = None
@@ -106,7 +107,7 @@ class TestRunnerBase(object):

        self.envdir = ('../%s-%d-%d-%d-%d-%d.dir' %
                       (self.execf, self.tsize, self.csize,
                        self.ptquery, self.update, thread.get_ident()))
                        self.ptquery, self.update, get_ident()))
        os.mkdir(self.envdir)

        try:
@@ -121,6 +122,7 @@ class TestRunnerBase(object):
            except TestFailure:
                savedir = self.save()
                self.print_failure(savedir)
                warning('Saved environment to %s', savedir)
            else:
                self.print_success()
        finally:
@@ -138,13 +140,9 @@ class TestRunnerBase(object):
            self.nruns += 1

    def save(self):
        savepfx = ("%s-%s-%d-%d-%d-%d-%s-" % (self.execf,
                                             self.rev,
                                             self.tsize,
                                             self.csize,
                                             self.ptquery,
                                             self.update,
                                             self.phase))
        savepfx = ('%s-%s-%d-%d-%d-%d-%s-' %
                   (self.execf, self.rev, self.tsize, self.csize,
                    self.ptquery, self.update, self.phase))
        savedir = mkdtemp(dir=self.savedir, prefix=savepfx)
        def targetfor(path):
            return os.path.join(savedir, os.path.basename(path))
@@ -160,49 +158,42 @@ class TestRunnerBase(object):
            copy(lib, targetfor(lib))
        return savedir

    def timestr(self, phase):
        timeval = self.times[phase]
        if timeval == 0:
            return ''
        else:
            return time.ctime(timeval)

    def logstr(self, passed):
        if passed:
            result = 'PASS'
        else:
            result = 'FAIL'
        return ('"%s",%s,%d,%d,%d,%d,%s,%s,%s,%s' %
                (self.execf, self.rev, self.tsize, self.csize,
                 self.ptquery, self.update,
                 self.timestr(0), self.timestr(1), self.timestr(2),
                 result))

    def infostr(self, passed):
        if passed:
            result = 'PASSED'
        else:
            result = 'FAILED'
        return ('[PASS=%d FAIL=%d] PASSED %s tsize=%d csize=%d ptquery=%d update=%d' %
                (self.scheduler.passed, self.scheduler.failed,
                 self.execf, self.tsize, self.csize, self.ptquery, self.update))

    def print_success(self):
        f = open(self.logfile, 'w+')
        print >>f, ('"%s",%s,%d,%d,%d,%d,%s,%s,%s,PASS' %
                    (self.execf,
                     self.rev,
                     self.tsize,
                     self.csize,
                     self.ptquery,
                     self.update,
                     time.ctime(self.times[0]),
                     time.ctime(self.times[1]),
                     time.ctime(self.times[2])))
        f.close()
        self.scheduler.passed += 1
        if verbose:
            print >>sys.stderr, ('[PASS=%d FAIL=%d] '
                                 'PASSED %s tsize=%d csize=%d '
                                 'ptquery=%d update=%d' %
                                 (self.scheduler.passed, self.scheduler.failed,
                                  self.execf, self.tsize, self.csize,
                                  self.ptquery, self.update))

    def print_failure(self, savedir):
        f = open(self.logfile, 'w+')
        print >>f, ('"%s",%s,%d,%d,%d,%d,%s,%s,%s,PASS' %
                    (self.execf,
                     self.rev,
                     self.tsize,
                     self.csize,
                     self.ptquery,
                     self.update,
                     time.ctime(self.times[0]),
                     time.ctime(self.times[1]),
                     time.ctime(self.times[2])))
        f.close()
        self.logger.info(self.logstr(True))
        info(self.infostr(True))

    def print_failure(self):
        self.scheduler.failed += 1
        print >>sys.stderr, ('[PASS=%d FAIL=%d] '
                             'FAILED %s tsize=%d csize=%d '
                             'ptquery=%d update=%d' %
                             (self.scheduler.passed, self.scheduler.failed,
                              self.execf, self.tsize, self.csize,
                              self.ptquery, self.update))
        print >>sys.stderr, 'Saved environment to %s' % savedir
        self.logger.warning(self.logstr(False))
        warning(self.infostr(False))

    def waitfor(self, proc):
        while proc.poll() is None:
@@ -262,26 +253,24 @@ class Worker(Thread):
        self.scheduler = scheduler

    def run(self):
        dbg('%s starting.' % self)
        debug('%s starting.' % self)
        while not self.scheduler.stopping.isSet():
            test_runner = self.scheduler.get()
            try:
                test_runner.run()
            except Exception, e:
                print >>sys.stderr, 'Fatal error in worker thread.'
                traceback.print_exc()
                print >>sys.stderr, 'Killing all workers.'
                exception('Fatal error in worker thread.')
                info('Killing all workers.')
                self.scheduler.error = e
                self.scheduler.stop()
            if not self.scheduler.stopping.isSet():
                self.scheduler.put(test_runner)
        dbg('%s exiting.' % self)
        debug('%s exiting.' % self)

class Scheduler(Queue):
    def __init__(self, nworkers):
        Queue.__init__(self)
        if verbose:
            print >>sys.stderr, 'Initializing scheduler with %d jobs.' % nworkers
        info('Initializing scheduler with %d jobs.', nworkers)
        self.nworkers = nworkers
        self.passed = 0
        self.failed = 0
@@ -291,8 +280,7 @@ class Scheduler(Queue):
        self.error = None

    def run(self, timeout):
        if verbose:
            print >>sys.stderr, 'Starting workers.'
        info('Starting workers.')
        self.stopping.clear()
        for i in range(self.nworkers):
            w = Worker(self)
@@ -308,10 +296,12 @@ class Scheduler(Queue):
                        break
                    w.join(timeout=1.0)
            except (KeyboardInterrupt, SystemExit):
                debug('Scheduler interrupted.  Stopping and joining threads.')
                self.stop()
                self.join()
                sys.exit(0)
        else:
            debug('Scheduler stopped by someone else.  Joining threads.')
            self.join()

    def join(self):
@@ -321,33 +311,45 @@ class Scheduler(Queue):
            self.workers.pop().join()

    def stop(self):
        if verbose:
            print >>sys.stderr, 'Stopping workers.'
        info('Stopping workers.')
        self.stopping.set()

def compiler_works(cc):
    try:
        devnull = open(os.devnull, 'w')
        r = call([cc, '-v'], stdout=devnull, stderr=STDOUT)
        devnull.close()
        return r == 0
    except OSError:
        exception('Error running %s.', cc)
        return False

def rebuild(tokudb, cc):
    env = os.environ
    env['CC'] = cc
    env['DEBUG'] = '0'
    if verbose:
        print >>sys.stderr, 'Updating from svn.'
    call(['svn', 'up'], cwd=tokudb)
    if verbose:
        print >>sys.stderr, 'Building tokudb.'
    info('Updating from svn.')
    devnull = open(os.devnull, 'w')
    call(['svn', 'up'], stdout=devnull, stderr=STDOUT, cwd=tokudb)
    devnull.close()
    if not compiler_works(cc):
        error('Cannot find working compiler named "%s".  Try sourcing the icc env script or providing another compiler with --cc.', cc)
        sys.exit(r)
    info('Building tokudb.')
    r = call(['make', '-s', 'clean'],
             cwd=tokudb, env=env)
    if r != 0:
        print >>sys.stderr, 'Build failed.  Do you need to source the icc env script?'
        error('Cleaning the source tree failed.')
        sys.exit(r)
    r = call(['make', 'fastbuild'],
    r = call(['make', '-s', 'fastbuild'],
             cwd=tokudb, env=env)
    if r != 0:
        print >>sys.stderr, 'Build failed.  Do you need to source the icc env script?'
        error('Building the fractal tree failed.')
        sys.exit(r)
    r = call(['make'] + testnames + recover_testnames,
    r = call(['make', '-s'] + testnames + recover_testnames,
             cwd=os.path.join(tokudb, 'src', 'tests'), env=env)
    if r != 0:
        print >>sys.stderr, 'Build failed.  Do you need to source the icc env script?'
        error('Building the tests failed.')
        sys.exit(r)

def revfor(tokudb):
@@ -355,8 +357,7 @@ def revfor(tokudb):
                 shell=True, cwd=tokudb, stdout=PIPE)
    (out, err) = proc.communicate()
    rev = out.strip()
    if verbose:
        print >>sys.stderr, 'Using tokudb at r%s.' % rev
    info('Using tokudb at r%s.', rev)
    return rev

def main(opts):
@@ -367,9 +368,8 @@ def main(opts):
        os.mkdir(opts.log)
    if not os.path.exists(opts.savedir):
        os.mkdir(opts.savedir)
    if verbose:
        print >>sys.stderr, 'Saving pass/fail logs to %s.' % opts.log
        print >>sys.stderr, 'Saving failure environments to %s.' % opts.savedir
    info('Saving pass/fail logs to %s.', opts.log)
    info('Saving failure environments to %s.', opts.savedir)

    scheduler = Scheduler(opts.jobs)

@@ -392,6 +392,7 @@ def main(opts):
        while scheduler.error is None:
            scheduler.run(opts.rebuild_period)
            if scheduler.error is not None:
                error('Scheduler reported an error.')
                raise scheduler.error
            rebuild(opts.tokudb, opts.cc)
            rev = revfor(opts.tokudb)
@@ -399,6 +400,9 @@ def main(opts):
                runner.rev = rev
    except (KeyboardInterrupt, SystemExit):
        sys.exit(0)
    except Exception, e:
        exception('Unhandled exception caught in main.')
        raise e

# relpath implementation for python <2.6
# from http://unittest-ext.googlecode.com/hg-history/1df911640f7be239e58fb185b06ac2a8489dcdc4/unittest2/unittest2/compatibility.py
@@ -486,6 +490,12 @@ if __name__ == '__main__':
    if len(args) > 0:
        parser.print_usage()
        sys.exit(1)
    debug = opts.debug
    verbose = opts.verbose or opts.debug

    if opts.debug:
        logging.basicConfig(level=logging.DEBUG)
    elif opts.verbose:
        logging.basicConfig(level=logging.INFO)
    else:
        logging.basicConfig(level=logging.WARNING)

    main(opts)