Loading sql/event_data_objects.cc +47 −29 Original line number Diff line number Diff line Loading @@ -20,6 +20,8 @@ #include "event_db_repository.h" #include "sp_head.h" /* That's a provisional solution */ extern Event_db_repository events_event_db_repository; #define EVEX_MAX_INTERVAL_VALUE 1000000000L Loading @@ -30,6 +32,47 @@ event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, static void event_restore_security_context(THD *thd, Security_context *backup); /* Initiliazes dbname and name of an Event_queue_element_for_exec object SYNOPSIS Event_queue_element_for_exec::init() RETURN VALUE FALSE OK TRUE Error (OOM) */ bool Event_queue_element_for_exec::init(LEX_STRING db, LEX_STRING n) { if (!(dbname.str= my_strndup(db.str, dbname.length= db.length, MYF(MY_WME)))) return TRUE; if (!(name.str= my_strndup(n.str, name.length= n.length, MYF(MY_WME)))) { my_free((gptr) dbname.str, MYF(0)); return TRUE; } return FALSE; } /* Destructor SYNOPSIS Event_queue_element_for_exec::~Event_queue_element_for_exec() */ Event_queue_element_for_exec::~Event_queue_element_for_exec() { my_free((gptr) dbname.str, MYF(0)); my_free((gptr) name.str, MYF(0)); } /* Returns a new instance Loading Loading @@ -743,7 +786,7 @@ Event_timed::~Event_timed() */ Event_job_data::Event_job_data() :thd(NULL), sphead(NULL), sql_mode(0) :sphead(NULL), sql_mode(0) { } Loading Loading @@ -1239,6 +1282,7 @@ Event_queue_element::compute_next_execution_time() DBUG_PRINT("info", ("Dropped: %d", dropped)); status= Event_queue_element::DISABLED; status_changed= TRUE; dropped= TRUE; goto ret; } Loading Loading @@ -1446,32 +1490,6 @@ Event_queue_element::mark_last_executed(THD *thd) } /* Drops the event SYNOPSIS Event_queue_element::drop() thd thread context RETURN VALUE 0 OK -1 Cannot open mysql.event -2 Cannot find the event in mysql.event (already deleted?) others return code from SE in case deletion of the event row failed. */ int Event_queue_element::drop(THD *thd) { DBUG_ENTER("Event_queue_element::drop"); DBUG_RETURN(Events::get_instance()-> drop_event(thd, dbname, name, FALSE, TRUE)); } /* Saves status and last_executed_at to the disk if changed. Loading Loading @@ -1503,13 +1521,13 @@ Event_queue_element::update_timing_fields(THD *thd) thd->reset_n_backup_open_tables_state(&backup); if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table)) if (events_event_db_repository.open_event_table(thd, TL_WRITE, &table)) { ret= TRUE; goto done; } fields= table->field; if ((ret= Events::get_instance()->db_repository-> if ((ret= events_event_db_repository. find_named_event(thd, dbname, name, table))) goto done; Loading sql/event_data_objects.h +21 −4 Original line number Diff line number Diff line Loading @@ -27,6 +27,27 @@ class sp_head; class Sql_alloc; class Event_queue_element_for_exec { public: Event_queue_element_for_exec(){}; ~Event_queue_element_for_exec(); bool init(LEX_STRING dbname, LEX_STRING name); LEX_STRING dbname; LEX_STRING name; bool dropped; THD *thd; private: /* Prevent use of these */ Event_queue_element_for_exec(const Event_queue_element_for_exec &); void operator=(Event_queue_element_for_exec &); }; class Event_basic { protected: Loading Loading @@ -96,9 +117,6 @@ class Event_queue_element : public Event_basic bool compute_next_execution_time(); int drop(THD *thd); void mark_last_executed(THD *thd); Loading Loading @@ -160,7 +178,6 @@ class Event_timed : public Event_queue_element class Event_job_data : public Event_basic { public: THD *thd; sp_head *sphead; LEX_STRING body; Loading sql/event_queue.cc +34 −213 Original line number Diff line number Diff line Loading @@ -16,7 +16,6 @@ #include "mysql_priv.h" #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" #define EVENT_QUEUE_INITIAL_SIZE 30 Loading Loading @@ -136,16 +135,14 @@ Event_queue::deinit_mutexes() */ bool Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) Event_queue::init_queue(THD *thd) { bool res; struct event_queue_param *event_queue_param_value= NULL; DBUG_ENTER("Event_queue::init_queue"); DBUG_PRINT("enter", ("this: 0x%lx", (long) this)); LOCK_QUEUE_DATA(); db_repository= db_repo; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, Loading @@ -162,12 +159,8 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) goto err; } res= load_events_from_db(thd); UNLOCK_QUEUE_DATA(); if (res) deinit_queue(); DBUG_RETURN(res); DBUG_RETURN(FALSE); err: UNLOCK_QUEUE_DATA(); Loading Loading @@ -204,37 +197,29 @@ Event_queue::deinit_queue() Event_queue::create_event() dbname The schema of the new event name The name of the new event RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk */ int Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) void Event_queue::create_event(THD *thd, Event_queue_element *new_element) { int res; Event_queue_element *new_element; DBUG_ENTER("Event_queue::create_event"); DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd, dbname.str, name.str)); DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, new_element->dbname.str, new_element->name.str)); new_element= new Event_queue_element(); res= db_repository->load_named_event(thd, dbname, name, new_element); if (res || new_element->status == Event_queue_element::DISABLED) if (new_element->status == Event_queue_element::DISABLED) delete new_element; else { new_element->compute_next_execution_time(); DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); LOCK_QUEUE_DATA(); DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); } DBUG_RETURN(res); DBUG_VOID_RETURN; } Loading @@ -248,32 +233,16 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) name Name of the event new_schema New schema, in case of RENAME TO, otherwise NULL new_name New name, in case of RENAME TO, otherwise NULL RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk */ int void Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name) Event_queue_element *new_element) { int res; Event_queue_element *new_element; DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str)); new_element= new Event_queue_element(); res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname, new_name ? *new_name:name, new_element); if (res) { delete new_element; goto end; } else if (new_element->status == Event_queue_element::DISABLED) if (new_element->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("The event is disabled.")); /* Loading @@ -300,9 +269,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); DBUG_VOID_RETURN; } Loading Loading @@ -453,133 +420,6 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name) } /* Loads all ENABLED events from mysql.event into the prioritized queue. Called during scheduler main thread initialization. Compiles the events. Creates Event_queue_element instances for every ENABLED event from mysql.event. SYNOPSIS Event_queue::load_events_from_db() thd - Thread context. Used for memory allocation in some cases. RETURN VALUE 0 OK !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, EVEX_COMPILE_ERROR) - in all these cases mysql.event was tampered. NOTES Reports the error to the console */ int Event_queue::load_events_from_db(THD *thd) { TABLE *table; READ_RECORD read_record_info; int ret= -1; uint count= 0; bool clean_the_queue= TRUE; DBUG_ENTER("Event_queue::load_events_from_db"); DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd)); if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) { sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open"); DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); } init_read_record(&read_record_info, thd, table ,NULL,1,0); while (!(read_record_info.read_record(&read_record_info))) { Event_queue_element *et; if (!(et= new Event_queue_element)) { DBUG_PRINT("info", ("Out of memory")); break; } DBUG_PRINT("info", ("Loading event from row.")); if ((ret= et->load_from_row(table))) { sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); break; } if (et->status != Event_queue_element::ENABLED) { DBUG_PRINT("info",("%s is disabled",et->name.str)); delete et; continue; } /* let's find when to be executed */ if (et->compute_next_execution_time()) { sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." " Skipping", et->dbname.str, et->name.str); continue; } { Event_job_data temp_job_data; DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); temp_job_data.load_from_row(table); /* We load only on scheduler root just to check whether the body compiles. */ switch (ret= temp_job_data.compile(thd, thd->mem_root)) { case EVEX_MICROSECOND_UNSUP: sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " "supported but found in mysql.event"); break; case EVEX_COMPILE_ERROR: sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load", et->dbname.str, et->name.str); break; default: break; } thd->end_statement(); thd->cleanup_after_query(); } if (ret) { delete et; goto end; } queue_insert_safe(&queue, (byte *) et); count++; } clean_the_queue= FALSE; end: end_read_record(&read_record_info); if (clean_the_queue) { empty_queue(); ret= -1; } else { ret= 0; sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); } close_thread_tables(thd); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); } /* Recalculates activation times in the queue. There is one reason for that. Because the values (execute_at) by which the queue is ordered are Loading Loading @@ -629,7 +469,7 @@ Event_queue::empty_queue() { uint i; DBUG_ENTER("Event_queue::empty_queue"); DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements)); sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements); /* empty the queue */ for (i= 0; i < queue.elements; ++i) Loading Loading @@ -691,30 +531,26 @@ static const char *queue_wait_msg= "Waiting for next activation"; SYNOPSIS Event_queue::get_top_for_execution_if_time() thd [in] Thread job_data [out] The object to execute event_name [out] The object to execute RETURN VALUE FALSE No error. If *job_data==NULL then top not elligible for execution. Could be that there is no top. TRUE Error FALSE No error. event_name != NULL TRUE Serious error */ bool Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) Event_queue::get_top_for_execution_if_time(THD *thd, Event_queue_element_for_exec **event_name) { bool ret= FALSE; struct timespec top_time; Event_queue_element *top= NULL; bool to_free= FALSE; bool to_drop= FALSE; *job_data= NULL; *event_name= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); LOCK_QUEUE_DATA(); for (;;) { int res; Event_queue_element *top= NULL; /* Break loop if thd has been killed */ if (thd->killed) Loading Loading @@ -753,39 +589,30 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) continue; } DBUG_PRINT("info", ("Ready for execution")); if (!(*job_data= new Event_job_data())) { ret= TRUE; break; } if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) if (!(*event_name= new Event_queue_element_for_exec()) || (*event_name)->init(top->dbname, top->name)) { DBUG_PRINT("error", ("Got %d from load_named_event", res)); delete *job_data; *job_data= NULL; ret= TRUE; break; } DBUG_PRINT("info", ("Ready for execution")); top->mark_last_executed(thd); if (top->compute_next_execution_time()) top->status= Event_queue_element::DISABLED; DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status)); (*job_data)->execution_count= top->execution_count; top->execution_count++; (*event_name)->dropped= top->dropped; top->update_timing_fields(thd); if (((top->execute_at.year && !top->expression) || top->execute_at_null) || (top->status == Event_queue_element::DISABLED)) if (top->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("removing from the queue")); sql_print_information("SCHEDULER: Last execution of %s.%s. %s", top->dbname.str, top->name.str, top->dropped? "Dropping.":""); to_free= TRUE; to_drop= top->dropped; delete top; queue_remove(&queue, 0); } else Loading @@ -796,19 +623,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) } end: UNLOCK_QUEUE_DATA(); if (to_drop) { DBUG_PRINT("info", ("Dropping from disk")); top->drop(thd); } if (to_free) delete top; DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *job_data)); DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *event_name)); if (*job_data) DBUG_PRINT("info", ("db: %s name: %s definer=%s", (*job_data)->dbname.str, (*job_data)->name.str, (*job_data)->definer.str)); if (*event_name) DBUG_PRINT("info", ("db: %s name: %s", (*event_name)->dbname.str, (*event_name)->name.str)); DBUG_RETURN(ret); } Loading sql/event_queue.h +12 −19 Original line number Diff line number Diff line Loading @@ -16,12 +16,10 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ class Event_basic; class Event_db_repository; class Event_job_data; class Event_queue_element; class Event_queue_element_for_exec; class THD; class Event_scheduler; class Event_queue { Loading @@ -35,19 +33,19 @@ class Event_queue deinit_mutexes(); bool init_queue(THD *thd, Event_db_repository *db_repo); init_queue(THD *thd); void deinit_queue(); /* Methods for queue management follow */ int create_event(THD *thd, LEX_STRING dbname, LEX_STRING name); void create_event(THD *thd, Event_queue_element *new_element); int void update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name); Event_queue_element *new_element); void drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name); Loading @@ -59,14 +57,15 @@ class Event_queue recalculate_activation_times(THD *thd); bool get_top_for_execution_if_time(THD *thd, Event_job_data **job_data); get_top_for_execution_if_time(THD *thd, Event_queue_element_for_exec **event_name); void dump_internal_status(); int load_events_from_db(THD *thd); void empty_queue(); protected: void find_n_remove_event(LEX_STRING db, LEX_STRING name); Loading @@ -76,8 +75,6 @@ class Event_queue drop_matching_events(THD *thd, LEX_STRING pattern, bool (*)(LEX_STRING, Event_basic *)); void empty_queue(); void dbug_dump_queue(time_t now); Loading @@ -86,11 +83,7 @@ class Event_queue pthread_mutex_t LOCK_event_queue; pthread_cond_t COND_queue_state; Event_db_repository *db_repository; Event_scheduler *scheduler; /* The sorted queue with the Event_job_data objects */ /* The sorted queue with the Event_queue_element objects */ QUEUE queue; TIME next_activation_at; Loading sql/event_scheduler.cc +101 −46 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include "event_data_objects.h" #include "event_scheduler.h" #include "event_queue.h" #include "event_db_repository.h" #ifdef __GNUC__ #if __GNUC__ >= 2 Loading @@ -34,6 +35,11 @@ extern pthread_attr_t connection_attrib; Event_db_repository *Event_worker_thread::db_repository; Events *Event_worker_thread::events_facade; static const LEX_STRING scheduler_states_names[] = { Loading @@ -60,8 +66,8 @@ struct scheduler_param { et The event itself */ static void evex_print_warnings(THD *thd, Event_job_data *et) void Event_worker_thread::print_warnings(THD *thd, Event_job_data *et) { MYSQL_ERROR *err; DBUG_ENTER("evex_print_warnings"); Loading Loading @@ -253,49 +259,97 @@ event_worker_thread(void *arg) { /* needs to be first for thread_stack */ THD *thd; Event_job_data *event= (Event_job_data *)arg; int ret; Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg; thd= event->thd; thd->thread_stack= (char *) &thd; // remember where our stack is DBUG_ENTER("event_worker_thread"); if (!post_init_event_thread(thd)) Event_worker_thread worker_thread; worker_thread.run(thd, (Event_queue_element_for_exec *)arg); deinit_event_thread(thd); return 0; // Can't return anything here } /* Function that executes an event in a child thread. Setups the environment for the event execution and cleans after that. SYNOPSIS Event_worker_thread::run() thd Thread context event The Event_queue_element_for_exec object to be processed */ void Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event) { DBUG_PRINT("info", ("Baikonur, time is %ld, BURAN reporting and operational." "THD: 0x%lx", (long) time(NULL), (long) thd)); int ret; Event_job_data *job_data= NULL; DBUG_ENTER("Event_worker_thread::run"); DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." "THD=0x%lx", time(NULL), thd)); if (post_init_event_thread(thd)) goto end; if (!(job_data= new Event_job_data())) goto end; else if ((ret= db_repository-> load_named_event(thd, event->dbname, event->name, job_data))) { DBUG_PRINT("error", ("Got %d from load_named_event", ret)); goto end; } sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. " "Execution %u", event->dbname.str, event->name.str, event->definer.str, thd->thread_id, event->execution_count); sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. ", job_data->dbname.str, job_data->name.str, job_data->definer.str, thd->thread_id); thd->enable_slow_log= TRUE; ret= event->execute(thd); ret= job_data->execute(thd); evex_print_warnings(thd, event); print_warnings(thd, job_data); sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. " "RetCode=%d", event->dbname.str, event->name.str, event->definer.str, thd->thread_id, ret); "RetCode=%d", job_data->dbname.str, job_data->name.str, job_data->definer.str, thd->thread_id, ret); if (ret == EVEX_COMPILE_ERROR) sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", event->dbname.str, event->name.str, event->definer.str); job_data->dbname.str, job_data->name.str, job_data->definer.str); else if (ret == EVEX_MICROSECOND_UNSUP) sql_print_information("SCHEDULER: MICROSECOND is not supported"); end: delete job_data; if (event->dropped) { sql_print_information("SCHEDULER: Dropping %s.%s", event->dbname.str, event->name.str); /* Using db_repository can lead to a race condition because we access the table without holding LOCK_metadata. Scenario: 1. CREATE EVENT xyz AT ... (conn thread) 2. execute xyz (worker) 3. CREATE EVENT XYZ EVERY ... (conn thread) 4. drop xyz (worker) 5. XYZ was just created on disk but `drop xyz` of the worker dropped it. A consequent load to create Event_queue_element will fail. If all operations are performed under LOCK_metadata there is no such problem. However, this comes at the price of introduction bi-directional association between class Events and class Event_worker_thread. */ events_facade->drop_event(thd, event->dbname, event->name, FALSE); } DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, event->name.str)); delete event; deinit_event_thread(thd); DBUG_RETURN(0); // Can't return anything here delete event; } Loading Loading @@ -441,7 +495,6 @@ bool Event_scheduler::run(THD *thd) { int res= FALSE; Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); sql_print_information("SCHEDULER: Manager thread started with id %lu", Loading @@ -454,18 +507,20 @@ Event_scheduler::run(THD *thd) while (is_running()) { Event_queue_element_for_exec *event_name; /* Gets a minimized version */ if (queue->get_top_for_execution_if_time(thd, &job_data)) if (queue->get_top_for_execution_if_time(thd, &event_name)) { sql_print_information("SCHEDULER: Serious error during getting next " "event to execute. Stopping"); break; } DBUG_PRINT("info", ("get_top returned job_data: 0x%lx", (long) job_data)); if (job_data) DBUG_PRINT("info", ("get_top returned job_data=0x%lx", event_name)); if (event_name) { if ((res= execute_top(thd, job_data))) if ((res= execute_top(thd, event_name))) break; } else Loading Loading @@ -499,7 +554,7 @@ Event_scheduler::run(THD *thd) */ bool Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) Event_scheduler::execute_top(THD *thd, Event_queue_element_for_exec *event_name) { THD *new_thd; pthread_t th; Loading @@ -510,13 +565,13 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) pre_init_event_thread(new_thd); new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; job_data->thd= new_thd; event_name->thd= new_thd; DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition", job_data->dbname.str, job_data->name.str)); event_name->dbname.str, event_name->name.str)); /* Major failure */ if ((res= pthread_create(&th, &connection_attrib, event_worker_thread, job_data))) event_name))) goto error; ++started_events; Loading @@ -537,7 +592,7 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } delete job_data; delete event_name; DBUG_RETURN(TRUE); } Loading Loading
sql/event_data_objects.cc +47 −29 Original line number Diff line number Diff line Loading @@ -20,6 +20,8 @@ #include "event_db_repository.h" #include "sp_head.h" /* That's a provisional solution */ extern Event_db_repository events_event_db_repository; #define EVEX_MAX_INTERVAL_VALUE 1000000000L Loading @@ -30,6 +32,47 @@ event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, static void event_restore_security_context(THD *thd, Security_context *backup); /* Initiliazes dbname and name of an Event_queue_element_for_exec object SYNOPSIS Event_queue_element_for_exec::init() RETURN VALUE FALSE OK TRUE Error (OOM) */ bool Event_queue_element_for_exec::init(LEX_STRING db, LEX_STRING n) { if (!(dbname.str= my_strndup(db.str, dbname.length= db.length, MYF(MY_WME)))) return TRUE; if (!(name.str= my_strndup(n.str, name.length= n.length, MYF(MY_WME)))) { my_free((gptr) dbname.str, MYF(0)); return TRUE; } return FALSE; } /* Destructor SYNOPSIS Event_queue_element_for_exec::~Event_queue_element_for_exec() */ Event_queue_element_for_exec::~Event_queue_element_for_exec() { my_free((gptr) dbname.str, MYF(0)); my_free((gptr) name.str, MYF(0)); } /* Returns a new instance Loading Loading @@ -743,7 +786,7 @@ Event_timed::~Event_timed() */ Event_job_data::Event_job_data() :thd(NULL), sphead(NULL), sql_mode(0) :sphead(NULL), sql_mode(0) { } Loading Loading @@ -1239,6 +1282,7 @@ Event_queue_element::compute_next_execution_time() DBUG_PRINT("info", ("Dropped: %d", dropped)); status= Event_queue_element::DISABLED; status_changed= TRUE; dropped= TRUE; goto ret; } Loading Loading @@ -1446,32 +1490,6 @@ Event_queue_element::mark_last_executed(THD *thd) } /* Drops the event SYNOPSIS Event_queue_element::drop() thd thread context RETURN VALUE 0 OK -1 Cannot open mysql.event -2 Cannot find the event in mysql.event (already deleted?) others return code from SE in case deletion of the event row failed. */ int Event_queue_element::drop(THD *thd) { DBUG_ENTER("Event_queue_element::drop"); DBUG_RETURN(Events::get_instance()-> drop_event(thd, dbname, name, FALSE, TRUE)); } /* Saves status and last_executed_at to the disk if changed. Loading Loading @@ -1503,13 +1521,13 @@ Event_queue_element::update_timing_fields(THD *thd) thd->reset_n_backup_open_tables_state(&backup); if (Events::get_instance()->open_event_table(thd, TL_WRITE, &table)) if (events_event_db_repository.open_event_table(thd, TL_WRITE, &table)) { ret= TRUE; goto done; } fields= table->field; if ((ret= Events::get_instance()->db_repository-> if ((ret= events_event_db_repository. find_named_event(thd, dbname, name, table))) goto done; Loading
sql/event_data_objects.h +21 −4 Original line number Diff line number Diff line Loading @@ -27,6 +27,27 @@ class sp_head; class Sql_alloc; class Event_queue_element_for_exec { public: Event_queue_element_for_exec(){}; ~Event_queue_element_for_exec(); bool init(LEX_STRING dbname, LEX_STRING name); LEX_STRING dbname; LEX_STRING name; bool dropped; THD *thd; private: /* Prevent use of these */ Event_queue_element_for_exec(const Event_queue_element_for_exec &); void operator=(Event_queue_element_for_exec &); }; class Event_basic { protected: Loading Loading @@ -96,9 +117,6 @@ class Event_queue_element : public Event_basic bool compute_next_execution_time(); int drop(THD *thd); void mark_last_executed(THD *thd); Loading Loading @@ -160,7 +178,6 @@ class Event_timed : public Event_queue_element class Event_job_data : public Event_basic { public: THD *thd; sp_head *sphead; LEX_STRING body; Loading
sql/event_queue.cc +34 −213 Original line number Diff line number Diff line Loading @@ -16,7 +16,6 @@ #include "mysql_priv.h" #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" #define EVENT_QUEUE_INITIAL_SIZE 30 Loading Loading @@ -136,16 +135,14 @@ Event_queue::deinit_mutexes() */ bool Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) Event_queue::init_queue(THD *thd) { bool res; struct event_queue_param *event_queue_param_value= NULL; DBUG_ENTER("Event_queue::init_queue"); DBUG_PRINT("enter", ("this: 0x%lx", (long) this)); LOCK_QUEUE_DATA(); db_repository= db_repo; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, Loading @@ -162,12 +159,8 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) goto err; } res= load_events_from_db(thd); UNLOCK_QUEUE_DATA(); if (res) deinit_queue(); DBUG_RETURN(res); DBUG_RETURN(FALSE); err: UNLOCK_QUEUE_DATA(); Loading Loading @@ -204,37 +197,29 @@ Event_queue::deinit_queue() Event_queue::create_event() dbname The schema of the new event name The name of the new event RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk */ int Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) void Event_queue::create_event(THD *thd, Event_queue_element *new_element) { int res; Event_queue_element *new_element; DBUG_ENTER("Event_queue::create_event"); DBUG_PRINT("enter", ("thd: 0x%lx et=%s.%s", (long) thd, dbname.str, name.str)); DBUG_PRINT("enter", ("thd=0x%lx et=%s.%s",thd, new_element->dbname.str, new_element->name.str)); new_element= new Event_queue_element(); res= db_repository->load_named_event(thd, dbname, name, new_element); if (res || new_element->status == Event_queue_element::DISABLED) if (new_element->status == Event_queue_element::DISABLED) delete new_element; else { new_element->compute_next_execution_time(); DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); LOCK_QUEUE_DATA(); DBUG_PRINT("info", ("new event in the queue: 0x%lx", (long) new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); } DBUG_RETURN(res); DBUG_VOID_RETURN; } Loading @@ -248,32 +233,16 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) name Name of the event new_schema New schema, in case of RENAME TO, otherwise NULL new_name New name, in case of RENAME TO, otherwise NULL RETURN VALUE OP_OK OK or scheduler not working OP_LOAD_ERROR Error during loading from disk */ int void Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name) Event_queue_element *new_element) { int res; Event_queue_element *new_element; DBUG_ENTER("Event_queue::update_event"); DBUG_PRINT("enter", ("thd: 0x%lx et=[%s.%s]", (long) thd, dbname.str, name.str)); new_element= new Event_queue_element(); res= db_repository->load_named_event(thd, new_schema ? *new_schema:dbname, new_name ? *new_name:name, new_element); if (res) { delete new_element; goto end; } else if (new_element->status == Event_queue_element::DISABLED) if (new_element->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("The event is disabled.")); /* Loading @@ -300,9 +269,7 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); DBUG_VOID_RETURN; } Loading Loading @@ -453,133 +420,6 @@ Event_queue::find_n_remove_event(LEX_STRING db, LEX_STRING name) } /* Loads all ENABLED events from mysql.event into the prioritized queue. Called during scheduler main thread initialization. Compiles the events. Creates Event_queue_element instances for every ENABLED event from mysql.event. SYNOPSIS Event_queue::load_events_from_db() thd - Thread context. Used for memory allocation in some cases. RETURN VALUE 0 OK !0 Error (EVEX_OPEN_TABLE_FAILED, EVEX_MICROSECOND_UNSUP, EVEX_COMPILE_ERROR) - in all these cases mysql.event was tampered. NOTES Reports the error to the console */ int Event_queue::load_events_from_db(THD *thd) { TABLE *table; READ_RECORD read_record_info; int ret= -1; uint count= 0; bool clean_the_queue= TRUE; DBUG_ENTER("Event_queue::load_events_from_db"); DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd)); if ((ret= db_repository->open_event_table(thd, TL_READ, &table))) { sql_print_error("SCHEDULER: Table mysql.event is damaged. Can not open"); DBUG_RETURN(EVEX_OPEN_TABLE_FAILED); } init_read_record(&read_record_info, thd, table ,NULL,1,0); while (!(read_record_info.read_record(&read_record_info))) { Event_queue_element *et; if (!(et= new Event_queue_element)) { DBUG_PRINT("info", ("Out of memory")); break; } DBUG_PRINT("info", ("Loading event from row.")); if ((ret= et->load_from_row(table))) { sql_print_error("SCHEDULER: Error while loading from mysql.event. " "Table probably corrupted"); break; } if (et->status != Event_queue_element::ENABLED) { DBUG_PRINT("info",("%s is disabled",et->name.str)); delete et; continue; } /* let's find when to be executed */ if (et->compute_next_execution_time()) { sql_print_error("SCHEDULER: Error while computing execution time of %s.%s." " Skipping", et->dbname.str, et->name.str); continue; } { Event_job_data temp_job_data; DBUG_PRINT("info", ("Event %s loaded from row. ", et->name.str)); temp_job_data.load_from_row(table); /* We load only on scheduler root just to check whether the body compiles. */ switch (ret= temp_job_data.compile(thd, thd->mem_root)) { case EVEX_MICROSECOND_UNSUP: sql_print_error("SCHEDULER: mysql.event is tampered. MICROSECOND is not " "supported but found in mysql.event"); break; case EVEX_COMPILE_ERROR: sql_print_error("SCHEDULER: Error while compiling %s.%s. Aborting load", et->dbname.str, et->name.str); break; default: break; } thd->end_statement(); thd->cleanup_after_query(); } if (ret) { delete et; goto end; } queue_insert_safe(&queue, (byte *) et); count++; } clean_the_queue= FALSE; end: end_read_record(&read_record_info); if (clean_the_queue) { empty_queue(); ret= -1; } else { ret= 0; sql_print_information("SCHEDULER: Loaded %d event%s", count, (count == 1)?"":"s"); } close_thread_tables(thd); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_RETURN(ret); } /* Recalculates activation times in the queue. There is one reason for that. Because the values (execute_at) by which the queue is ordered are Loading Loading @@ -629,7 +469,7 @@ Event_queue::empty_queue() { uint i; DBUG_ENTER("Event_queue::empty_queue"); DBUG_PRINT("enter", ("Purging the queue. %d element(s)", queue.elements)); DBUG_PRINT("enter", ("Purging the queue. %u element(s)", queue.elements)); sql_print_information("SCHEDULER: Purging queue. %u events", queue.elements); /* empty the queue */ for (i= 0; i < queue.elements; ++i) Loading Loading @@ -691,30 +531,26 @@ static const char *queue_wait_msg= "Waiting for next activation"; SYNOPSIS Event_queue::get_top_for_execution_if_time() thd [in] Thread job_data [out] The object to execute event_name [out] The object to execute RETURN VALUE FALSE No error. If *job_data==NULL then top not elligible for execution. Could be that there is no top. TRUE Error FALSE No error. event_name != NULL TRUE Serious error */ bool Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) Event_queue::get_top_for_execution_if_time(THD *thd, Event_queue_element_for_exec **event_name) { bool ret= FALSE; struct timespec top_time; Event_queue_element *top= NULL; bool to_free= FALSE; bool to_drop= FALSE; *job_data= NULL; *event_name= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); LOCK_QUEUE_DATA(); for (;;) { int res; Event_queue_element *top= NULL; /* Break loop if thd has been killed */ if (thd->killed) Loading Loading @@ -753,39 +589,30 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) continue; } DBUG_PRINT("info", ("Ready for execution")); if (!(*job_data= new Event_job_data())) { ret= TRUE; break; } if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) if (!(*event_name= new Event_queue_element_for_exec()) || (*event_name)->init(top->dbname, top->name)) { DBUG_PRINT("error", ("Got %d from load_named_event", res)); delete *job_data; *job_data= NULL; ret= TRUE; break; } DBUG_PRINT("info", ("Ready for execution")); top->mark_last_executed(thd); if (top->compute_next_execution_time()) top->status= Event_queue_element::DISABLED; DBUG_PRINT("info", ("event %s status is %d", top->name.str, top->status)); (*job_data)->execution_count= top->execution_count; top->execution_count++; (*event_name)->dropped= top->dropped; top->update_timing_fields(thd); if (((top->execute_at.year && !top->expression) || top->execute_at_null) || (top->status == Event_queue_element::DISABLED)) if (top->status == Event_queue_element::DISABLED) { DBUG_PRINT("info", ("removing from the queue")); sql_print_information("SCHEDULER: Last execution of %s.%s. %s", top->dbname.str, top->name.str, top->dropped? "Dropping.":""); to_free= TRUE; to_drop= top->dropped; delete top; queue_remove(&queue, 0); } else Loading @@ -796,19 +623,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) } end: UNLOCK_QUEUE_DATA(); if (to_drop) { DBUG_PRINT("info", ("Dropping from disk")); top->drop(thd); } if (to_free) delete top; DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *job_data)); DBUG_PRINT("info", ("returning %d et_new: 0x%lx ", ret, (long) *event_name)); if (*job_data) DBUG_PRINT("info", ("db: %s name: %s definer=%s", (*job_data)->dbname.str, (*job_data)->name.str, (*job_data)->definer.str)); if (*event_name) DBUG_PRINT("info", ("db: %s name: %s", (*event_name)->dbname.str, (*event_name)->name.str)); DBUG_RETURN(ret); } Loading
sql/event_queue.h +12 −19 Original line number Diff line number Diff line Loading @@ -16,12 +16,10 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ class Event_basic; class Event_db_repository; class Event_job_data; class Event_queue_element; class Event_queue_element_for_exec; class THD; class Event_scheduler; class Event_queue { Loading @@ -35,19 +33,19 @@ class Event_queue deinit_mutexes(); bool init_queue(THD *thd, Event_db_repository *db_repo); init_queue(THD *thd); void deinit_queue(); /* Methods for queue management follow */ int create_event(THD *thd, LEX_STRING dbname, LEX_STRING name); void create_event(THD *thd, Event_queue_element *new_element); int void update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, LEX_STRING *new_schema, LEX_STRING *new_name); Event_queue_element *new_element); void drop_event(THD *thd, LEX_STRING dbname, LEX_STRING name); Loading @@ -59,14 +57,15 @@ class Event_queue recalculate_activation_times(THD *thd); bool get_top_for_execution_if_time(THD *thd, Event_job_data **job_data); get_top_for_execution_if_time(THD *thd, Event_queue_element_for_exec **event_name); void dump_internal_status(); int load_events_from_db(THD *thd); void empty_queue(); protected: void find_n_remove_event(LEX_STRING db, LEX_STRING name); Loading @@ -76,8 +75,6 @@ class Event_queue drop_matching_events(THD *thd, LEX_STRING pattern, bool (*)(LEX_STRING, Event_basic *)); void empty_queue(); void dbug_dump_queue(time_t now); Loading @@ -86,11 +83,7 @@ class Event_queue pthread_mutex_t LOCK_event_queue; pthread_cond_t COND_queue_state; Event_db_repository *db_repository; Event_scheduler *scheduler; /* The sorted queue with the Event_job_data objects */ /* The sorted queue with the Event_queue_element objects */ QUEUE queue; TIME next_activation_at; Loading
sql/event_scheduler.cc +101 −46 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include "event_data_objects.h" #include "event_scheduler.h" #include "event_queue.h" #include "event_db_repository.h" #ifdef __GNUC__ #if __GNUC__ >= 2 Loading @@ -34,6 +35,11 @@ extern pthread_attr_t connection_attrib; Event_db_repository *Event_worker_thread::db_repository; Events *Event_worker_thread::events_facade; static const LEX_STRING scheduler_states_names[] = { Loading @@ -60,8 +66,8 @@ struct scheduler_param { et The event itself */ static void evex_print_warnings(THD *thd, Event_job_data *et) void Event_worker_thread::print_warnings(THD *thd, Event_job_data *et) { MYSQL_ERROR *err; DBUG_ENTER("evex_print_warnings"); Loading Loading @@ -253,49 +259,97 @@ event_worker_thread(void *arg) { /* needs to be first for thread_stack */ THD *thd; Event_job_data *event= (Event_job_data *)arg; int ret; Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg; thd= event->thd; thd->thread_stack= (char *) &thd; // remember where our stack is DBUG_ENTER("event_worker_thread"); if (!post_init_event_thread(thd)) Event_worker_thread worker_thread; worker_thread.run(thd, (Event_queue_element_for_exec *)arg); deinit_event_thread(thd); return 0; // Can't return anything here } /* Function that executes an event in a child thread. Setups the environment for the event execution and cleans after that. SYNOPSIS Event_worker_thread::run() thd Thread context event The Event_queue_element_for_exec object to be processed */ void Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event) { DBUG_PRINT("info", ("Baikonur, time is %ld, BURAN reporting and operational." "THD: 0x%lx", (long) time(NULL), (long) thd)); int ret; Event_job_data *job_data= NULL; DBUG_ENTER("Event_worker_thread::run"); DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." "THD=0x%lx", time(NULL), thd)); if (post_init_event_thread(thd)) goto end; if (!(job_data= new Event_job_data())) goto end; else if ((ret= db_repository-> load_named_event(thd, event->dbname, event->name, job_data))) { DBUG_PRINT("error", ("Got %d from load_named_event", ret)); goto end; } sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. " "Execution %u", event->dbname.str, event->name.str, event->definer.str, thd->thread_id, event->execution_count); sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu. ", job_data->dbname.str, job_data->name.str, job_data->definer.str, thd->thread_id); thd->enable_slow_log= TRUE; ret= event->execute(thd); ret= job_data->execute(thd); evex_print_warnings(thd, event); print_warnings(thd, job_data); sql_print_information("SCHEDULER: [%s.%s of %s] executed in thread %lu. " "RetCode=%d", event->dbname.str, event->name.str, event->definer.str, thd->thread_id, ret); "RetCode=%d", job_data->dbname.str, job_data->name.str, job_data->definer.str, thd->thread_id, ret); if (ret == EVEX_COMPILE_ERROR) sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s", event->dbname.str, event->name.str, event->definer.str); job_data->dbname.str, job_data->name.str, job_data->definer.str); else if (ret == EVEX_MICROSECOND_UNSUP) sql_print_information("SCHEDULER: MICROSECOND is not supported"); end: delete job_data; if (event->dropped) { sql_print_information("SCHEDULER: Dropping %s.%s", event->dbname.str, event->name.str); /* Using db_repository can lead to a race condition because we access the table without holding LOCK_metadata. Scenario: 1. CREATE EVENT xyz AT ... (conn thread) 2. execute xyz (worker) 3. CREATE EVENT XYZ EVERY ... (conn thread) 4. drop xyz (worker) 5. XYZ was just created on disk but `drop xyz` of the worker dropped it. A consequent load to create Event_queue_element will fail. If all operations are performed under LOCK_metadata there is no such problem. However, this comes at the price of introduction bi-directional association between class Events and class Event_worker_thread. */ events_facade->drop_event(thd, event->dbname, event->name, FALSE); } DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, event->name.str)); delete event; deinit_event_thread(thd); DBUG_RETURN(0); // Can't return anything here delete event; } Loading Loading @@ -441,7 +495,6 @@ bool Event_scheduler::run(THD *thd) { int res= FALSE; Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); sql_print_information("SCHEDULER: Manager thread started with id %lu", Loading @@ -454,18 +507,20 @@ Event_scheduler::run(THD *thd) while (is_running()) { Event_queue_element_for_exec *event_name; /* Gets a minimized version */ if (queue->get_top_for_execution_if_time(thd, &job_data)) if (queue->get_top_for_execution_if_time(thd, &event_name)) { sql_print_information("SCHEDULER: Serious error during getting next " "event to execute. Stopping"); break; } DBUG_PRINT("info", ("get_top returned job_data: 0x%lx", (long) job_data)); if (job_data) DBUG_PRINT("info", ("get_top returned job_data=0x%lx", event_name)); if (event_name) { if ((res= execute_top(thd, job_data))) if ((res= execute_top(thd, event_name))) break; } else Loading Loading @@ -499,7 +554,7 @@ Event_scheduler::run(THD *thd) */ bool Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) Event_scheduler::execute_top(THD *thd, Event_queue_element_for_exec *event_name) { THD *new_thd; pthread_t th; Loading @@ -510,13 +565,13 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) pre_init_event_thread(new_thd); new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; job_data->thd= new_thd; event_name->thd= new_thd; DBUG_PRINT("info", ("BURAN %s@%s ready for start t-3..2..1..0..ignition", job_data->dbname.str, job_data->name.str)); event_name->dbname.str, event_name->name.str)); /* Major failure */ if ((res= pthread_create(&th, &connection_attrib, event_worker_thread, job_data))) event_name))) goto error; ++started_events; Loading @@ -537,7 +592,7 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } delete job_data; delete event_name; DBUG_RETURN(TRUE); } Loading