Commit b9a7fe27 authored by unknown's avatar unknown
Browse files

WL#3337 (Event scheduler new architecture)

Cleaned up the code a bit. Fixed few leaks.
This code still does not load events on server startup
from disk. The problem is that there is a need for a THD instance, which
does not exist during server boot. This will be solved soon.
Still Event_timed is used both for the memory queue and for exectution.
This will be changed according to WL#3337 probably in the next commit.


sql/event_data_objects.cc:
  Strip unneeded stuff from class Event_timed
  Event_timed is still used for the queue and execution.
  That will be changed in the next commit.
sql/event_data_objects.h:
  Strip unneeded stuff from class Event_timed
  Event_timed is still used for the queue and execution.
  That will be changed in the next commit.
sql/event_db_repository.cc:
  Cosmetics.
  Add a new method load_named_event_job, to be made complete in the 
  next commit. It will load from disk an instance of Event_job_data to
  be used during execution.
sql/event_db_repository.h:
  find_event does not need MEM_ROOT anymore
  because the memory is allocated on Event's own root.
sql/event_queue.cc:
  Remove dead code.
  Move dumping of the queue to separate method.
  Make critical sections in create_event & update_event
  as small as possible - load the new event outside of the section
  and free the object also outside of it.
sql/event_queue.h:
  init -> init_queue -> easier for ctags
  deinit -> deinit_queue -> easier for ctags
sql/event_scheduler.cc:
  empty this file
sql/event_scheduler.h:
  empty this file
sql/event_scheduler_ng.cc:
  add back DBUG_RETURN(0) in thread handlers.
  We don't stop running events when stopping the scheduler. Therefore
  remove this method now. If it is needed later it can be added back.
sql/event_scheduler_ng.h:
  Remove stop_all_running_threads()
  init -> init_scheduler
  deinit -> deinit_scheduler
  easier for ctags
sql/events.cc:
  Cosmetics
sql/events.h:
  Cosmetics
sql/set_var.cc:
  Remove references to dead code
sql/sql_parse.cc:
  Reorganize a bit.
parent a5dfeb02
Loading
Loading
Loading
Loading
+5 −190
Original line number Diff line number Diff line
@@ -530,18 +530,14 @@ Event_parse_data::init_ends(THD *thd, Item *new_ends)
    Event_timed::Event_timed()
*/

Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
                           running(0), thread_id(0), status_changed(false),
Event_timed::Event_timed():status_changed(false),
                           last_executed_changed(false), expression(0),
                           created(0), modified(0),
                           on_completion(Event_timed::ON_COMPLETION_DROP),
                           status(Event_timed::ENABLED), sphead(0),
                           sql_mode(0), dropped(false),
                           free_sphead_on_delete(true), flags(0)
                           sql_mode(0), dropped(false), flags(0)
                
{
  pthread_mutex_init(&this->LOCK_running, MY_MUTEX_INIT_FAST);
  pthread_cond_init(&this->COND_finished, NULL);
  init();
}

@@ -555,49 +551,11 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),

Event_timed::~Event_timed()
{    
  deinit_mutexes();
  free_root(&mem_root, MYF(0));

  if (free_sphead_on_delete)
  free_sp();
}


/*
  Destructor

  SYNOPSIS
    Event_timed::deinit_mutexes()
*/

void
Event_timed::deinit_mutexes()
{
  pthread_mutex_destroy(&this->LOCK_running);
  pthread_cond_destroy(&this->COND_finished);
}


/*
  Checks whether the event is running

  SYNOPSIS
    Event_timed::is_running()
*/

bool
Event_timed::is_running()
{
  bool ret;

  VOID(pthread_mutex_lock(&this->LOCK_running));
  ret= running;
  VOID(pthread_mutex_unlock(&this->LOCK_running));

  return ret;
}


/*
  Init all member variables

@@ -1253,7 +1211,7 @@ Event_timed::update_fields(THD *thd)
  Open_tables_state backup;
  int ret;

  DBUG_ENTER("Event_timed::update_time_fields");
  DBUG_ENTER("Event_timed::update_fields");

  DBUG_PRINT("enter", ("name: %*s", name.length, name.str));

@@ -1382,7 +1340,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
  Executes the event (the underlying sp_head object);

  SYNOPSIS
    evex_fill_row()
    Event_timed::execute()
      thd       THD
      mem_root  If != NULL use it to compile the event on it

@@ -1607,149 +1565,6 @@ Event_timed::compile(THD *thd, MEM_ROOT *mem_root)
}


extern pthread_attr_t connection_attrib;

/*
  Checks whether is possible and forks a thread. Passes self as argument.

  RETURN VALUE
    EVENT_EXEC_STARTED       OK
    EVENT_EXEC_ALREADY_EXEC  Thread not forked, already working
    EVENT_EXEC_CANT_FORK     Unable to spawn thread (error)
*/

int
Event_timed::spawn_now(void * (*thread_func)(void*), void *arg)
{
  THD *thd= current_thd;
  int ret= EVENT_EXEC_STARTED;
  DBUG_ENTER("Event_timed::spawn_now");
  DBUG_PRINT("info", ("[%s.%s]", dbname.str, name.str));

  VOID(pthread_mutex_lock(&this->LOCK_running));

  DBUG_PRINT("info", ("SCHEDULER: execute_at of %s is %lld", name.str,
             TIME_to_ulonglong_datetime(&execute_at)));
  mark_last_executed(thd);
  if (compute_next_execution_time())
  {
    sql_print_error("SCHEDULER: Error while computing time of %s.%s . "
                    "Disabling after execution.", dbname.str, name.str);
    status= DISABLED;
  }
  DBUG_PRINT("evex manager", ("[%10s] next exec at [%llu]", name.str,
             TIME_to_ulonglong_datetime(&execute_at)));
   /*
    1. For one-time event : year is > 0 and expression is 0
    2. For recurring, expression is != -=> check execute_at_null in this case
  */
  if ((execute_at.year && !expression) || execute_at_null)
  {
    sql_print_information("SCHEDULER: [%s.%s of %s] no more executions "
                          "after this one", dbname.str, name.str,
                          definer.str);
    flags |= EVENT_EXEC_NO_MORE | EVENT_FREE_WHEN_FINISHED;
  }

  update_fields(thd);

  if (!in_spawned_thread)
  {
    pthread_t th;
    in_spawned_thread= true;

    if (pthread_create(&th, &connection_attrib, thread_func, arg))
    {
      DBUG_PRINT("info", ("problem while spawning thread"));
      ret= EVENT_EXEC_CANT_FORK;
      in_spawned_thread= false;
    }
  }
  else
  {
    DBUG_PRINT("info", ("already in spawned thread. skipping"));
    ret= EVENT_EXEC_ALREADY_EXEC;
  }
  VOID(pthread_mutex_unlock(&this->LOCK_running));

  DBUG_RETURN(ret);  
}


bool
Event_timed::spawn_thread_finish(THD *thd)
{
  bool should_free;
  DBUG_ENTER("Event_timed::spawn_thread_finish");
  VOID(pthread_mutex_lock(&LOCK_running));
  in_spawned_thread= false;
  DBUG_PRINT("info", ("Sending COND_finished for thread %d", thread_id));
  thread_id= 0;
  if (dropped)
    drop(thd);
  pthread_cond_broadcast(&COND_finished);
  should_free= flags & EVENT_FREE_WHEN_FINISHED;
  VOID(pthread_mutex_unlock(&LOCK_running));
  DBUG_RETURN(should_free);
}


/*
  Kills a running event
  SYNOPSIS
    Event_timed::kill_thread()
    
  RETURN VALUE 
     0    OK
    -1    EVEX_CANT_KILL
    !0   Error 
*/

int
Event_timed::kill_thread(THD *thd)
{
  int ret= 0;
  DBUG_ENTER("Event_timed::kill_thread");
  pthread_mutex_lock(&LOCK_running);
  DBUG_PRINT("info", ("thread_id=%lu", thread_id));

  if (thread_id == thd->thread_id)
  {
    /*
      We don't kill ourselves in cases like :
      alter event e_43 do alter event e_43 do set @a = 4 because
      we will never receive COND_finished.
    */
    DBUG_PRINT("info", ("It's not safe to kill ourselves in self altering queries"));
    ret= EVEX_CANT_KILL;
  }
  else if (thread_id && !(ret= kill_one_thread(thd, thread_id, false)))
  {
    thd->enter_cond(&COND_finished, &LOCK_running, "Waiting for finished");
    DBUG_PRINT("info", ("Waiting for COND_finished from thread %d", thread_id));
    while (thread_id)
      pthread_cond_wait(&COND_finished, &LOCK_running);

    DBUG_PRINT("info", ("Got COND_finished"));
    /* This will implicitly unlock LOCK_running. Hence we return before that */
    thd->exit_cond("");

    DBUG_RETURN(0);
  }
  else if (!thread_id && in_spawned_thread)
  {
    /*
      Because the manager thread waits for the forked thread to update thread_id
      this situation is impossible.
    */
    DBUG_ASSERT(0);
  }
  pthread_mutex_unlock(&LOCK_running);
  DBUG_PRINT("exit", ("%d", ret));
  DBUG_RETURN(ret);
}


/*
  Checks whether two events have the same name

+0 −25
Original line number Diff line number Diff line
@@ -63,12 +63,6 @@ class Event_timed
{
  Event_timed(const Event_timed &);	/* Prevent use of these */
  void operator=(Event_timed &);
  my_bool in_spawned_thread;
  ulong locked_by_thread_id;
  my_bool running;
  ulong thread_id;
  pthread_mutex_t LOCK_running;
  pthread_cond_t COND_finished;

  bool status_changed;
  bool last_executed_changed;
@@ -118,7 +112,6 @@ class Event_timed
  ulong sql_mode;

  bool dropped;
  bool free_sphead_on_delete;
  uint flags;//all kind of purposes

  static void *operator new(size_t size)
@@ -146,9 +139,6 @@ class Event_timed
  void
  init();

  void
  deinit_mutexes();

  int
  load_from_row(TABLE *table);

@@ -173,23 +163,8 @@ class Event_timed
  int
  compile(THD *thd, MEM_ROOT *mem_root);
  
  bool
  is_running();

  int
  spawn_now(void * (*thread_func)(void*), void *arg);
  
  bool
  spawn_thread_finish(THD *thd);
  
  void
  free_sp();

  int
  kill_thread(THD *thd);

  void
  set_thread_id(ulong tid) { thread_id= tid; }
};


+67 −17
Original line number Diff line number Diff line
@@ -374,8 +374,7 @@ Event_db_repository::table_scan_all_for_i_s(THD *thd, TABLE *schema_table,
    ret= read_record_info.read_record(&read_record_info);
    if (ret == 0)
      ret= copy_event_to_schema_table(thd, schema_table, event_table);
  }
  while (ret == 0);
  } while (ret == 0);

  DBUG_PRINT("info", ("Scan finished. ret=%d", ret));
  end_read_record(&read_record_info);
@@ -464,8 +463,7 @@ Event_db_repository::fill_schema_events(THD *thd, TABLE_LIST *tables, char *db)

int
Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
                                Event_timed **ett,
                                TABLE *tbl, MEM_ROOT *root)
                                Event_timed **ett, TABLE *tbl)
{
  TABLE *table;
  int ret;
@@ -505,7 +503,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
  if (ret)
  {
    delete et;
    et= 0;
    et= NULL;
  }
  /* don't close the table if we haven't opened it ourselves */
  if (!tbl && table)
@@ -518,7 +516,6 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
int
Event_db_repository::init_repository()
{
  init_alloc_root(&repo_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
  return 0;
}

@@ -526,7 +523,6 @@ Event_db_repository::init_repository()
void
Event_db_repository::deinit_repository()
{
  free_root(&repo_root, MYF(0));
}


@@ -731,7 +727,8 @@ Event_db_repository::create_event(THD *thd, Event_parse_data *parse_data,
             parse_data->name.str));

  DBUG_PRINT("info", ("check existance of an event with the same name"));
  if (!evex_db_find_event_by_name(thd, parse_data->dbname, parse_data->name, table))
  if (!evex_db_find_event_by_name(thd, parse_data->dbname,
                                  parse_data->name, table))
  {
    if (create_if_not)
    {
@@ -1026,14 +1023,12 @@ Event_db_repository::find_event_by_name(THD *thd, LEX_STRING db,
  */
  if (db.length > table->field[ET_FIELD_DB]->field_length ||
      name.length > table->field[ET_FIELD_NAME]->field_length)
      
    DBUG_RETURN(EVEX_KEY_NOT_FOUND);

  table->field[ET_FIELD_DB]->store(db.str, db.length, &my_charset_bin);
  table->field[ET_FIELD_NAME]->store(name.str, name.length, &my_charset_bin);

  key_copy(key, table->record[0], table->key_info,
           table->key_info->key_length);
  key_copy(key, table->record[0], table->key_info, table->key_info->key_length);

  if (table->file->index_read_idx(table->record[0], 0, key,
                                  table->key_info->key_length,
@@ -1125,7 +1120,7 @@ Event_db_repository::drop_events_by_field(THD *thd,
  the table, compiles and inserts it into the cache.

  SYNOPSIS
    Event_scheduler::load_named_event()
    Event_db_repository::load_named_event_timed()
      thd      THD
      etn      The name of the event to load and compile on scheduler's root
      etn_new  The loaded event
@@ -1136,7 +1131,8 @@ Event_db_repository::drop_events_by_field(THD *thd,
*/

int
Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
Event_db_repository::load_named_event_timed(THD *thd, LEX_STRING dbname,
                                            LEX_STRING name,
                                            Event_timed **etn_new)
{
  int ret= 0;
@@ -1144,12 +1140,12 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
  Event_timed *et_loaded= NULL;
  Open_tables_state backup;

  DBUG_ENTER("Event_db_repository::load_named_event");
  DBUG_ENTER("Event_db_repository::load_named_event_timed");
  DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));

  thd->reset_n_backup_open_tables_state(&backup);
  /* No need to use my_error() here because db_find_event() has done it */
  ret= find_event(thd, dbname, name, &et_loaded, NULL, &repo_root);
  ret= find_event(thd, dbname, name, &et_loaded, NULL);
  thd->restore_backup_open_tables_state(&backup);
  /* In this case no memory was allocated so we don't need to clean */
  if (ret)
@@ -1171,3 +1167,57 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na

  DBUG_RETURN(OP_OK);
}


/*
  Looks for a named event in mysql.event and then loads it from
  the table, compiles and inserts it into the cache.

  SYNOPSIS
    Event_db_repository::load_named_event_job()
      thd      THD
      etn      The name of the event to load and compile on scheduler's root
      etn_new  The loaded event

  RETURN VALUE
    NULL       Error during compile or the event is non-enabled.
    otherwise  Address
*/

int
Event_db_repository::load_named_event_job(THD *thd, LEX_STRING dbname,
                                          LEX_STRING name,
                                          Event_job_data **etn_new)
{
  int ret= 0;
  MEM_ROOT *tmp_mem_root;
  Event_timed *et_loaded= NULL;
  Open_tables_state backup;

  DBUG_ENTER("Event_db_repository::load_named_event_job");
  DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
#if 0
  thd->reset_n_backup_open_tables_state(&backup);
  /* No need to use my_error() here because db_find_event() has done it */
  ret= find_event(thd, dbname, name, &et_loaded, NULL);
  thd->restore_backup_open_tables_state(&backup);
  /* In this case no memory was allocated so we don't need to clean */
  if (ret)
    DBUG_RETURN(OP_LOAD_ERROR);

  if (et_loaded->status != Event_timed::ENABLED)
  {
    /*
      We don't load non-enabled events.
      In db_find_event() `et_new` was allocated on the heap and not on
      scheduler_root therefore we delete it here.
    */
    delete et_loaded;
    DBUG_RETURN(OP_DISABLED_EVENT);
  }

  et_loaded->compute_next_execution_time();
  *etn_new= et_loaded;
#endif
  DBUG_RETURN(OP_OK);
}
+8 −4
Original line number Diff line number Diff line
@@ -56,6 +56,7 @@ fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */);
class Event_timed;
class Event_parse_data;
class Event_queue_element;
class Event_job_data;

class Event_db_repository
{
@@ -88,10 +89,15 @@ class Event_db_repository

  int
  find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **ett,
             TABLE *tbl, MEM_ROOT *root);
             TABLE *tbl);

  int
  load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING name, Event_timed **etn_new);
  load_named_event_timed(THD *thd, LEX_STRING dbname, LEX_STRING name,
                         Event_timed **etn_new);

  int
  load_named_event_job(THD *thd, LEX_STRING dbname, LEX_STRING name,
                       Event_job_data **etn_new);

  int
  find_event_by_name(THD *thd, LEX_STRING db, LEX_STRING name, TABLE *table);
@@ -116,8 +122,6 @@ class Event_db_repository
  static bool
  check_system_tables(THD *thd);

  MEM_ROOT repo_root;

  /* Prevent use of these */
  Event_db_repository(const Event_db_repository &);
  void operator=(Event_db_repository &);
+84 −165
Original line number Diff line number Diff line
@@ -90,34 +90,29 @@ Event_queue::Event_queue()
  RETURN VALUE
    OP_OK             OK or scheduler not working
    OP_LOAD_ERROR     Error during loading from disk
    OP_ALREADY_EXISTS Event already in the queue    
*/

int
Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
Event_queue::create_event(THD *thd, Event_parse_data *et)
{
  int res;
  Event_timed *et_new;
  DBUG_ENTER("Event_queue::create_event");
  DBUG_PRINT("enter", ("thd=%p et=%p lock=%p",thd,et, &LOCK_event_queue));

  res= db_repository->load_named_event_timed(thd, et->dbname, et->name, &et_new);
  LOCK_QUEUE_DATA();
  if (check_existence && find_event(et->dbname, et->name, FALSE))
  {
    res= OP_ALREADY_EXISTS;
    goto end;
  }

  if (!(res= db_repository->
                load_named_event(thd, et->dbname, et->name, &et_new)))
  if (!res)
  {
    DBUG_PRINT("info", ("new event in the queue %p", et_new));
    queue_insert_safe(&queue, (byte *) et_new);
    on_queue_change();
    notify_observers();
  }
  else if (res == OP_DISABLED_EVENT)
    res= OP_OK;
end:
  UNLOCK_QUEUE_DATA();

  DBUG_RETURN(res);
}

@@ -129,104 +124,54 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
    Event_queue::update_event()
      thd        Thread
      et         The event to replace(add) into the queue
      new_schema New schema
      new_name   New name
      new_schema New schema, in case of RENAME TO
      new_name   New name, in case of RENAME TO

  RETURN VALUE
    OP_OK             OK or scheduler not working
    OP_LOAD_ERROR     Error during loading from disk
    OP_ALREADY_EXISTS Event already in the queue    
*/

int
Event_queue::update_event(THD *thd, Event_parse_data *et,
                               LEX_STRING *new_schema,
                               LEX_STRING *new_name)
                          LEX_STRING *new_schema, LEX_STRING *new_name)
{
  int res= OP_OK;
  Event_timed *et_old, *et_new= NULL;
  LEX_STRING old_schema, old_name;

  LINT_INIT(old_schema.str);
  LINT_INIT(old_schema.length);
  LINT_INIT(old_name.str);
  LINT_INIT(old_name.length);
  int res;
  Event_timed *et_old= NULL, *et_new= NULL;

  DBUG_ENTER("Event_queue::update_event");
  DBUG_PRINT("enter", ("thd=%p et=%p et=[%s.%s] lock=%p",
             thd, et, et->dbname.str, et->name.str, &LOCK_event_queue));

  res= db_repository->
            load_named_event_timed(thd, new_schema?*new_schema:et->dbname,
                                   new_name? *new_name:et->name, &et_new);

  if (res && res != OP_DISABLED_EVENT)
    goto end;

  LOCK_QUEUE_DATA();
  if (!(et_old= find_event(et->dbname, et->name, TRUE)))
  {
    DBUG_PRINT("info", ("%s.%s not found cached, probably was DISABLED",
                        et->dbname.str, et->name.str));

  if (new_schema && new_name)
  {
    old_schema= et->dbname;
    old_name= et->name;
    et->dbname= *new_schema;
    et->name= *new_name;
  }

  if (!(res= db_repository->
            load_named_event(thd, et->dbname, et->name, &et_new)))
  if (!res)
  {
    DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
    queue_insert_safe(&queue, (byte *) et_new);
    on_queue_change();
  }
  else if (res == OP_DISABLED_EVENT)
    res= OP_OK;

  if (new_schema && new_name)
  {
    et->dbname= old_schema;
    et->name= old_name;
  }
  DBUG_PRINT("info", ("res=%d", res));
  UNLOCK_QUEUE_DATA();
  /*
    Andrey: Is this comment still truthful ???

    We don't move this code above because a potential kill_thread will call
    THD::awake(). Which in turn will try to acqure mysys_var->current_mutex,
    which is LOCK_event_queue on which the COND_new_work in ::run() locks.
    Hence, we try to acquire a lock which we have already acquired and we run
    into an assert. Holding LOCK_event_queue however is not needed because
    we don't touch any invariant of the scheduler anymore. ::drop_event() does
    the same.
  */

  notify_observers();

  if (et_old)
  {
    switch (et_old->kill_thread(thd)) {
    case EVEX_CANT_KILL:
      /* Don't delete but continue */
      et_old->flags |= EVENT_FREE_WHEN_FINISHED;
      break;
    case 0:
      /* 
        kill_thread() waits till the spawned thread finishes after it's
        killed. Hence, we delete here memory which is no more referenced from
        a running thread.
      */
    delete et_old;
      /*
        We don't signal COND_new_work here because:
        1. Even if the dropped event is on top of the queue this will not
          move another one to be executed before the time the one on the
          top (but could be at the same second as the dropped one)
        2. If this was the last event on the queue, then pthread_cond_timedwait
          in ::run() will finish and then see that the queue is empty and
          call cond_wait(). Hence, no need to interrupt the blocked
          ::run() thread.
      */
      break;
    default:
      DBUG_ASSERT(0);
    }
  }

end:
  DBUG_PRINT("info", ("res=%d", res));
  DBUG_RETURN(res);
}

@@ -256,40 +201,13 @@ Event_queue::drop_event(THD *thd, sp_name *name)
  LOCK_QUEUE_DATA();
  if (!(et_old= find_event(name->m_db, name->m_name, TRUE)))
    DBUG_PRINT("info", ("No such event found, probably DISABLED"));

  UNLOCK_QUEUE_DATA();

  /* See comments in ::replace_event() why this is split in two parts. */
  if (et_old)
  {
    switch ((res= et_old->kill_thread(thd))) {
    case EVEX_CANT_KILL:
      /* Don't delete but continue */
      et_old->flags |= EVENT_FREE_WHEN_FINISHED;
      break;
    case 0:
      /* 
        kill_thread() waits till the spawned thread finishes after it's
        killed. Hence, we delete here memory which is no more referenced from
        a running thread.
      */
    delete et_old;
  /*
        We don't signal COND_new_work here because:
        1. Even if the dropped event is on top of the queue this will not
          move another one to be executed before the time the one on the
          top (but could be at the same second as the dropped one)
        2. If this was the last event on the queue, then pthread_cond_timedwait
          in ::run() will finish and then see that the queue is empty and
          call cond_wait(). Hence, no need to interrupt the blocked
          ::run() thread.
    We don't signal here because the scheduler will catch the change
    next time it wakes up.
  */
      break;
    default:
      sql_print_error("SCHEDULER: Got unexpected error %d", res);
      DBUG_ASSERT(0);
    }
  }

  DBUG_RETURN(FALSE);
}
@@ -361,7 +279,7 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
  DBUG_ENTER("Event_queue::drop_matching_events");
  DBUG_PRINT("enter", ("pattern=%*s state=%d", pattern.length, pattern.str));

  uint i= 0, dropped= 0;   
  uint i= 0;   
  while (i < queue.elements)
  {
    Event_timed *et= (Event_timed *) queue_element(&queue, i);
@@ -375,30 +293,20 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
        counter and the (i < queue.elements) condition is ok.
      */
      queue_remove(&queue, i);

      /* See replace_event() */
      switch (et->kill_thread(thd)) {
      case EVEX_CANT_KILL:
        /* Don't delete but continue */
        et->flags |= EVENT_FREE_WHEN_FINISHED;
        ++dropped;
        break;
      case 0:
      delete et;
        ++dropped;
        break;
      default:
        DBUG_ASSERT(0);
      }
    }
    else
      i++;
  }
  DBUG_PRINT("info", ("Dropped %lu", dropped));
  /*
    Don't send COND_new_work because no need to wake up the scheduler thread.
    When it wakes next time up it will recalculate how much more it should
    sleep if the top of the queue has been changed by this method.
    We don't call notify_observers() . If we remove the top event:
    1. The queue is empty. The scheduler will wake up at some time and realize
       that the queue is empty. If create_event() comes inbetween it will
       signal the scheduler
    2. The queue is not empty, but the next event after the previous top, won't
       be executed any time sooner than the element we removed. Hence, we may
       not notify the scheduler and it will realize the change when it
       wakes up from timedwait.
  */
  
  DBUG_VOID_RETURN;
@@ -418,16 +326,14 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
    >=0  Number of dropped events
*/

int
void
Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
{
  int ret;
  DBUG_ENTER("Event_queue::drop_schema_events");
  LOCK_QUEUE_DATA();
  drop_matching_events(thd, schema, event_timed_db_equal);
  UNLOCK_QUEUE_DATA();

  DBUG_RETURN(ret);
  DBUG_VOID_RETURN;
}


@@ -744,13 +650,13 @@ Event_queue::deinit_mutexes()
  its state.

  SYNOPSIS
    Event_queue::on_queue_change()
    Event_queue::notify_observers()
*/

void
Event_queue::on_queue_change()
Event_queue::notify_observers()
{
  DBUG_ENTER("Event_queue::on_queue_change");
  DBUG_ENTER("Event_queue::notify_observers");
  DBUG_PRINT("info", ("Signalling change of the queue"));
  scheduler->queue_changed();
  DBUG_VOID_RETURN;
@@ -761,7 +667,7 @@ Event_queue::on_queue_change()
  The implementation of full-fledged initialization.

  SYNOPSIS
    Event_scheduler::init()
    Event_queue::init()

  RETURN VALUE
    FALSE  OK
@@ -769,15 +675,16 @@ Event_queue::on_queue_change()
*/

bool
Event_queue::init(Event_db_repository *db_repo)
Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler_ng *sched)
{
  int i= 0;
  bool ret= FALSE;
  DBUG_ENTER("Event_queue::init");
  DBUG_ENTER("Event_queue::init_queue");
  DBUG_PRINT("enter", ("this=%p", this));

  LOCK_QUEUE_DATA();
  db_repository= db_repo;
  scheduler= sched;

  if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
                    event_timed_compare_q, NULL, 30 /*auto_extent*/))
@@ -803,9 +710,9 @@ Event_queue::init(Event_db_repository *db_repo)


void
Event_queue::deinit()
Event_queue::deinit_queue()
{
  DBUG_ENTER("Event_queue::deinit");
  DBUG_ENTER("Event_queue::deinit_queue");

  LOCK_QUEUE_DATA();
  empty_queue();
@@ -833,6 +740,8 @@ void
Event_queue::empty_queue()
{
  uint i;
  DBUG_ENTER("Event_queue::empty_queue");
  DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements));
  /* empty the queue */
  for (i= 0; i < events_count_no_lock(); ++i)
  {
@@ -840,6 +749,7 @@ Event_queue::empty_queue()
    delete et;
  }
  resize_queue(&queue, 0);
  DBUG_VOID_RETURN;
}


@@ -864,6 +774,29 @@ Event_queue::top_changed()
}


inline void
Event_queue::dbug_dump_queue(time_t now)
{
#ifndef DBUG_OFF
  Event_timed *et;
  uint i;
  DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
  for (i = 0; i < queue.elements; i++)
  {
    et= ((Event_timed*)queue_element(&queue, i));
    DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
    DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
               " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
               TIME_to_ulonglong_datetime(&et->execute_at),
               TIME_to_ulonglong_datetime(&et->starts),
               TIME_to_ulonglong_datetime(&et->ends),
               et->expression, sec_since_epoch_TIME(&et->execute_at), now,
               (int)(sec_since_epoch_TIME(&et->execute_at) - now),
               sec_since_epoch_TIME(&et->execute_at) <= now));
  }
#endif
}

Event_timed *
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
                                           struct timespec *abstime)
@@ -876,35 +809,21 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
  LOCK_QUEUE_DATA();
  do {
    int res;
    Event_timed *et= NULL;
    if (!queue.elements)
    {
      abstime->tv_sec= 0;
      break;
    }
    int i;
    DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
    for (i = 0; i < queue.elements; i++)
    {
      et= ((Event_timed*)queue_element(&queue, i));
      DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
      DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
               " expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
               TIME_to_ulonglong_datetime(&et->execute_at),
               TIME_to_ulonglong_datetime(&et->starts),
               TIME_to_ulonglong_datetime(&et->ends),
               et->expression, sec_since_epoch_TIME(&et->execute_at), now,
               (int)(sec_since_epoch_TIME(&et->execute_at) - now),
               sec_since_epoch_TIME(&et->execute_at) <= now));
    }
    et= ((Event_timed*)queue_element(&queue, 0));
    dbug_dump_queue(now);

    Event_timed *et= ((Event_timed*)queue_element(&queue, 0));
    top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);

    if (top_time.tv_sec <= now)
    {
      DBUG_PRINT("info", ("Ready for execution"));
      abstime->tv_sec= 0;
      if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
      if ((res= db_repository->load_named_event_timed(thd, et->dbname, et->name,
                                                      &et_new)))
      {
        DBUG_ASSERT(0);
Loading