Commit 3a12408a authored by unknown's avatar unknown
Browse files

WL #1034 (update)

- improve the stability of the executor
- make create event if not exists work as before


sql/event.cc:
  refactoring:
  - have only 1 routine for comparing TIME structures 
  fix:
  - after previous refactoring IF NOT EXISTS of CREATE EVENT did not work 
    anymore. Now it will work.
sql/event.h:
  update definitions
sql/event_executor.cc:
  - don't load DISABLED events
  - if an event is being disabled because of time restrictions - drop it from the 
    prio queue
  - move dropping to the worker process
sql/event_priv.h:
  - remove unneeded func
sql/share/errmsg.txt:
  fix error message
sql/sql_parse.cc:
  - support 0 rows affected when CREATE EVENT IF NOT EXISTS
parent 9b323387
Loading
Loading
Loading
Loading
+39 −108
Original line number Diff line number Diff line
@@ -71,9 +71,8 @@ MEM_ROOT evex_mem_root;
void
evex_queue_init(EVEX_QUEUE_TYPE *queue)
{
  if (init_queue_ex(queue, 100 /*num_el*/, 0 /*offset*/, 
                   0 /*smallest_on_top*/, event_timed_compare_q, NULL,
                   100 /*auto_extent*/))
  if (init_queue_ex(queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
                    event_timed_compare_q, NULL, 30 /*auto_extent*/))
    sql_print_error("Insufficient memory to initialize executing queue.");
}

@@ -81,8 +80,7 @@ evex_queue_init(EVEX_QUEUE_TYPE *queue)
static
int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
{
 return cs->coll->strnncollsp(cs,
                              (unsigned char *) s.str,s.length,
 return cs->coll->strnncollsp(cs, (unsigned char *) s.str,s.length,
                                  (unsigned char *) t.str,t.length, 0);
}

@@ -90,94 +88,29 @@ int sortcmp_lex_string(LEX_STRING s, LEX_STRING t, CHARSET_INFO *cs)
int
my_time_compare(TIME *a, TIME *b)
{
/*
 Or maybe it is faster to use TIME_to_ulonglong_datetime
 for "a" and "b"
*/

  DBUG_ENTER("my_time_compare");

  if (a->year > b->year)
    DBUG_RETURN(1);
  
  if (a->year < b->year)
    DBUG_RETURN(-1);

  if (a->month > b->month)
    DBUG_RETURN(1);
  
  if (a->month < b->month)
    DBUG_RETURN(-1);

  if (a->day > b->day)
    DBUG_RETURN(1);
  
  if (a->day < b->day)
    DBUG_RETURN(-1);

  if (a->hour > b->hour)
    DBUG_RETURN(1);
  
  if (a->hour < b->hour)
    DBUG_RETURN(-1);

  if (a->minute > b->minute)
    DBUG_RETURN(1);
  
  if (a->minute < b->minute)
    DBUG_RETURN(-1);

  if (a->second > b->second)
    DBUG_RETURN(1);
  
  if (a->second < b->second)
    DBUG_RETURN(-1);


  if (a->second_part > b->second_part)
    DBUG_RETURN(1);
  
  if (a->second_part < b->second_part)
    DBUG_RETURN(-1);


  DBUG_RETURN(0);
}


int
evex_time_diff(TIME *a, TIME *b)
{
  my_bool in_gap;
  DBUG_ENTER("my_time_diff");
  
  return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
}


inline int
event_timed_compare(event_timed **a, event_timed **b)
{
  my_ulonglong a_t, b_t;
  a_t= TIME_to_ulonglong_datetime(&(*a)->execute_at)*100L + 
       (*a)->execute_at.second_part;
  b_t= TIME_to_ulonglong_datetime(&(*b)->execute_at)*100L + 
       (*b)->execute_at.second_part;
  my_ulonglong a_t= TIME_to_ulonglong_datetime(a)*100L + a->second_part;
  my_ulonglong b_t= TIME_to_ulonglong_datetime(b)*100L + b->second_part;

  if (a_t > b_t)
    return 1;
  else if (a_t < b_t)
    return -1;
  else

  return 0;
}


inline int
event_timed_compare(event_timed *a, event_timed *b)
{
  return my_time_compare(&a->execute_at, &b->execute_at);
}


int 
event_timed_compare_q(void *vptr, byte* a, byte *b)
{
  return event_timed_compare((event_timed **)&a, (event_timed **)&b);
  return event_timed_compare((event_timed *)a, (event_timed *)b);
}


@@ -371,6 +304,8 @@ evex_fill_row(THD *thd, TABLE *table, event_timed *et, my_bool is_update)
     db_create_event()
       thd             THD
       et              event_timed object containing information for the event
       create_if_not - if an warning should be generated in case event exists
       rows_affected - how many rows were affected
   
     Return value
                        0 - OK
@@ -381,9 +316,10 @@ evex_fill_row(THD *thd, TABLE *table, event_timed *et, my_bool is_update)
*/

static int
db_create_event(THD *thd, event_timed *et)
db_create_event(THD *thd, event_timed *et, my_bool create_if_not,
                uint *rows_affected)
{
  int ret= EVEX_OK;
  int ret= 0;
  TABLE *table;
  char definer[HOSTNAME_LENGTH+USERNAME_LENGTH+2];
  char olddb[128];
@@ -391,7 +327,7 @@ db_create_event(THD *thd, event_timed *et)
  DBUG_ENTER("db_create_event");
  DBUG_PRINT("enter", ("name: %.*s", et->name.length, et->name.str));


  *rows_affected= 0;
  DBUG_PRINT("info", ("open mysql.event for update"));
  if (evex_open_event_table(thd, TL_WRITE, &table))
  {
@@ -402,8 +338,10 @@ db_create_event(THD *thd, event_timed *et)
  DBUG_PRINT("info", ("check existance of an event with the same name"));
  if (!evex_db_find_event_aux(thd, et->dbname, et->name, table))
  {
    my_error(ER_EVENT_ALREADY_EXISTS, MYF(0), et->name.str);
    goto err;    
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
		      ER_EVENT_ALREADY_EXISTS, ER(ER_EVENT_ALREADY_EXISTS),
		      et->name.str);
    goto ok;    
  }

  DBUG_PRINT("info", ("non-existant, go forward"));
@@ -462,6 +400,8 @@ db_create_event(THD *thd, event_timed *et)
    mysql_bin_log.write(&qinfo);
  }
  
  *rows_affected= 1;
ok:
  if (dbchanged)
    (void) mysql_change_db(thd, olddb, 1);
  if (table)
@@ -755,6 +695,7 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock)
       et             event's data
       create_options Options specified when in the query. We are
                      interested whether there is IF NOT EXISTS
       rows_affected  How many rows were affected
          
   NOTES
     - in case there is an event with the same name (db) and 
@@ -762,7 +703,8 @@ evex_remove_from_cache(LEX_STRING *db, LEX_STRING *name, bool use_lock)
*/

int
evex_create_event(THD *thd, event_timed *et, uint create_options)
evex_create_event(THD *thd, event_timed *et, uint create_options,
                  uint *rows_affected)
{
  int ret = 0;

@@ -770,22 +712,9 @@ evex_create_event(THD *thd, event_timed *et, uint create_options)
  DBUG_PRINT("enter", ("name: %*s options:%d", et->name.length,
                et->name.str, create_options));

  if ((ret = db_create_event(thd, et)) == EVEX_WRITE_ROW_FAILED && 
        (create_options & HA_LEX_CREATE_IF_NOT_EXISTS))
  {
    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
		      ER_DB_CREATE_EXISTS, ER(ER_DB_CREATE_EXISTS),
		      "EVENT", et->name.str);
    ret= 0;
    goto done;
  }
  /*
    A warning is thrown only when create_options is set to 
    HA_LEX_CREATE_IF_NOT_EXISTS. In this case if EVEX_WRITE_ROW_FAILED,
    which means that we have duplicated key -> warning. In all
    other cases -> error.
  */
  if (ret)
  if ((ret = db_create_event(thd, et,
                             create_options & HA_LEX_CREATE_IF_NOT_EXISTS,
                             rows_affected)))
    goto done;

  VOID(pthread_mutex_lock(&LOCK_evex_running));
@@ -819,7 +748,8 @@ evex_create_event(THD *thd, event_timed *et, uint create_options)
*/

int
evex_update_event(THD *thd, event_timed *et, sp_name *new_name)
evex_update_event(THD *thd, event_timed *et, sp_name *new_name,
                  uint *rows_affected)
{
  int ret, i;
  bool need_second_pass= true;
@@ -873,7 +803,8 @@ evex_update_event(THD *thd, event_timed *et, sp_name *new_name)
*/

int
evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists)
evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists,
                uint *rows_affected)
{
  TABLE *table;
  int ret= EVEX_OPEN_TABLE_FAILED;
+6 −3
Original line number Diff line number Diff line
@@ -173,13 +173,16 @@ class event_timed


int
evex_create_event(THD *thd, event_timed *et, uint create_options);
evex_create_event(THD *thd, event_timed *et, uint create_options,
                  uint *rows_affected);

int
evex_update_event(THD *thd, event_timed *et, sp_name *new_name);
evex_update_event(THD *thd, event_timed *et, sp_name *new_name,
                  uint *rows_affected);

int
evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists);
evex_drop_event(THD *thd, event_timed *et, bool drop_if_exists,
                uint *rows_affected);


int
+29 −22
Original line number Diff line number Diff line
@@ -61,8 +61,15 @@ event_executor_worker(void *arg);
pthread_handler_t
event_executor_main(void *arg);

static
void evex_init_mutexes()
static int
evex_time_diff(TIME *a, TIME *b)
{
  return sec_since_epoch_TIME(a) - sec_since_epoch_TIME(b);
}


static void
evex_init_mutexes()
{
  if (evex_mutexes_initted)
    return;
@@ -239,8 +246,6 @@ event_executor_main(void *arg)

    {
      int t2sleep;
      
        
      /*
        now let's see how much time to sleep, we know there is at least 1
        element in the queue.
@@ -272,7 +277,7 @@ event_executor_main(void *arg)
      {
        /*
          We sleep t2sleep seconds but we check every second whether this thread
          has been killed, or there is new candidate
          has been killed, or there is a new candidate
        */
        while (t2sleep-- && !thd->killed &&
               evex_queue_num_elements(EVEX_EQ_NAME) &&
@@ -308,10 +313,13 @@ event_executor_main(void *arg)
    {
      pthread_t th;

      printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
      et->mark_last_executed();
      et->compute_next_execution_time();
      printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
      et->update_fields(thd);
      DBUG_PRINT("info", ("  Spawning a thread %d", ++iter_num));
//      sql_print_information("  Spawning a thread %d", ++iter_num);
#ifndef DBUG_FAULTY_THR
//      sql_print_information("  Thread is not debuggable!");
      if (pthread_create(&th, NULL, event_executor_worker, (void*)et))
      {
        sql_print_error("Problem while trying to create a thread");
@@ -320,24 +328,15 @@ event_executor_main(void *arg)
#else
      event_executor_worker((void *) et);
#endif
      printf("[%10s] exec at [%llu]\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
      et->mark_last_executed();
      et->compute_next_execution_time();
      printf("[%10s] next at [%llu]\n\n\n", et->name.str,TIME_to_ulonglong_datetime(&et->execute_at));
      et->update_fields(thd);
      if ((et->execute_at.year && !et->expression) ||
           TIME_to_ulonglong_datetime(&et->execute_at) == 0)
         et->flags |= EVENT_EXEC_NO_MORE;
    }

      if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == MYSQL_EVENT_DISABLED)
    {
      if (et->dropped)
        et->drop(thd);
      delete et;
        evex_queue_delete_element(&EVEX_EQ_NAME, 1);// 1 is top
    } else
      else
        evex_queue_first_updated(&EVEX_EQ_NAME);

    }
    VOID(pthread_mutex_unlock(&LOCK_event_arrays));
  }// while

@@ -454,7 +453,8 @@ event_executor_worker(void *event_void)
  strxnmov(thd->security_ctx->priv_host, sizeof(thd->security_ctx->priv_host),
                event->definer_host.str, NullS);  

  thd->security_ctx->user= thd->security_ctx->priv_user= my_strdup(event->definer_user.str, MYF(0));
  thd->security_ctx->user= thd->security_ctx->priv_user=
                             my_strdup(event->definer_user.str, MYF(0));

  thd->db= event->dbname.str;
  if (!check_access(thd, EVENT_ACL, event->dbname.str, 0, 0, 0,
@@ -467,6 +467,13 @@ event_executor_worker(void *event_void)
    sql_print_information("    EVEX EXECUTED event for event %s.%s  [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str,(int) event->expression, ret); 
    DBUG_PRINT("info", ("    EVEX EXECUTED event for event %s.%s  [EXPR:%d]. RetCode=%d", event->dbname.str, event->name.str,(int) event->expression, ret)); 
  }
  if ((event->flags & EVENT_EXEC_NO_MORE) || event->status==MYSQL_EVENT_DISABLED)
  {
    if (event->dropped)
      event->drop(thd);
    delete event;
  }

  thd->db= 0;

err:
+0 −4
Original line number Diff line number Diff line
@@ -59,10 +59,6 @@ evex_open_event_table(THD *thd, enum thr_lock_type lock_type, TABLE **table);
int 
event_timed_compare_q(void *vptr, byte* a, byte *b);

int
evex_time_diff(TIME *a, TIME *b);



#define EXEC_QUEUE_QUEUE_NAME executing_queue
#define EXEC_QUEUE_DARR_NAME evex_executing_queue
+1 −1
Original line number Diff line number Diff line
@@ -5722,7 +5722,7 @@ ER_DROP_PARTITION_WHEN_FK_DEFINED
ER_PLUGIN_IS_NOT_LOADED
	eng "Plugin '%-.64s' is not loaded"
ER_EVENT_ALREADY_EXISTS
        eng "Event %s already exists"
        eng "Event '%-.64s' already exists"
ER_EVENT_STORE_FAILED
        eng "Failed to store event %s. Error code %d from storage engine."
ER_EVENT_DOES_NOT_EXIST
Loading