Loading sql/event_queue.cc +124 −49 Original line number Diff line number Diff line Loading @@ -18,7 +18,6 @@ #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" #include "event_scheduler.h" #define EVENT_QUEUE_INITIAL_SIZE 30 Loading Loading @@ -87,6 +86,7 @@ Event_queue::Event_queue() { mutex_last_unlocked_in_func= mutex_last_locked_in_func= mutex_last_attempted_lock_in_func= ""; set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); } Loading Loading @@ -135,8 +135,7 @@ Event_queue::deinit_mutexes() */ bool Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, Event_scheduler *sched) Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) { pthread_t th; bool res; Loading @@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, LOCK_QUEUE_DATA(); db_repository= db_repo; scheduler= sched; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, Loading Loading @@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); notify_observers(); } DBUG_RETURN(res); Loading Loading @@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, { DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); pthread_cond_broadcast(&COND_queue_state); } dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); if (new_element) notify_observers(); end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); Loading Loading @@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, i++; } /* We don't call notify_observers() . If we remove the top event: We don't call pthread_cond_broadcast(&COND_queue_state); If we remove the top event: 1. The queue is empty. The scheduler will wake up at some time and realize that the queue is empty. If create_event() comes inbetween it will signal the scheduler Loading Loading @@ -421,24 +418,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) } /* Signals the observers (the main scheduler thread) that the state of the queue has been changed. SYNOPSIS Event_queue::notify_observers() */ void Event_queue::notify_observers() { DBUG_ENTER("Event_queue::notify_observers"); DBUG_PRINT("info", ("Signalling change of the queue")); scheduler->queue_changed(); DBUG_VOID_RETURN; } /* Searches for an event in the queue Loading Loading @@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now) #endif } static const char *queue_empty_msg= "Waiting on empty queue"; static const char *queue_wait_msg= "Waiting for next activation"; /* Checks whether the top of the queue is elligible for execution and Loading @@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now) */ bool Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data, struct timespec *abstime) Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) { bool ret= FALSE; struct timespec top_time; struct timespec *abstime; *job_data= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); abstime->tv_nsec= 0; top_time.tv_nsec= 0; LOCK_QUEUE_DATA(); do { int res; if (!queue.elements) for (;;) { abstime->tv_sec= 0; break; } int res; Event_queue_element *top= NULL; Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0)); thd->end_time(); time_t now= thd->query_start(); abstime= NULL; if (queue.elements) { top= ((Event_queue_element*) queue_element(&queue, 0)); top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); if (top_time.tv_sec > now) abstime= &top_time; } if (!abstime || abstime->tv_sec > now) { abstime->tv_sec= top_time.tv_sec; DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, abstime->tv_sec)); break; const char *msg; if (abstime) { next_activation_at= top->execute_at; msg= queue_wait_msg; } else { set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); msg= queue_wait_msg; } cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__); if (thd->killed) { DBUG_PRINT("info", ("thd->killed=%d", thd->killed)); goto end; } /* The queue could have been emptied. Therefore it's safe to start from the beginning. Moreover, this way we will get also the new top, if the element at the top has been changed. */ continue; } DBUG_PRINT("info", ("Ready for execution")); abstime->tv_sec= 0; if (!(*job_data= new Event_job_data())) { ret= TRUE; Loading @@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) { DBUG_PRINT("error", ("Got %d from load_named_event", res)); delete *job_data; *job_data= NULL; ret= TRUE; Loading Loading @@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, queue_replaced(&queue); dbug_dump_queue(now); } while (0); break; } end: UNLOCK_QUEUE_DATA(); DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ", ret, *job_data, abstime->tv_sec)); ret, *job_data, abstime? abstime->tv_sec:0)); if (*job_data) DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str, Loading Loading @@ -864,6 +871,52 @@ Event_queue::unlock_data(const char *func, uint line) } /* Wrapper for pthread_cond_wait/timedwait SYNOPSIS Event_queue::cond_wait() thd Thread (Could be NULL during shutdown procedure) msg Message for thd->proc_info abstime If not null then call pthread_cond_timedwait() func Which function is requesting cond_wait line On which line cond_wait is requested */ void Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg, const char *func, uint line) { DBUG_ENTER("Event_queue::cond_wait"); waiting_on_cond= TRUE; mutex_last_unlocked_at_line= line; mutex_queue_data_locked= FALSE; mutex_last_unlocked_in_func= func; thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg); DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":"")); if (!abstime) pthread_cond_wait(&COND_queue_state, &LOCK_event_queue); else pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime); mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; mutex_queue_data_locked= TRUE; waiting_on_cond= FALSE; /* This will free the lock so we need to relock. Not the best thing to do but we need to obey cond_wait() */ thd->exit_cond(""); LOCK_QUEUE_DATA(); DBUG_VOID_RETURN; } /* Dumps the internal status of the queue Loading Loading @@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd) protocol->store(&tmp_string); ret= protocol->write(); /* waiting on */ protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs); int_string.set((longlong) waiting_on_cond, scs); protocol->store(&int_string); ret= protocol->write(); protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("next activation at"), scs); tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), tmp_string.alloced_length(), "%4d-%02d-%02d %02d:%02d:%02d", next_activation_at.year, next_activation_at.month, next_activation_at.day, next_activation_at.hour, next_activation_at.minute, next_activation_at.second )); protocol->store(&tmp_string); ret= protocol->write(); #endif DBUG_RETURN(FALSE); } sql/event_queue.h +10 −6 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class Event_queue deinit_mutexes(); bool init_queue(THD *thd, Event_db_repository *db_repo, Event_scheduler *sched); init_queue(THD *thd, Event_db_repository *db_repo); void deinit_queue(); Loading @@ -60,8 +60,7 @@ class Event_queue recalculate_activation_times(THD *thd); bool get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data, struct timespec *abstime); get_top_for_execution_if_time(THD *thd, Event_job_data **job_data); bool dump_internal_status(THD *thd); Loading @@ -80,14 +79,12 @@ class Event_queue void empty_queue(); void notify_observers(); void dbug_dump_queue(time_t now); /* LOCK_event_queue is the mutex which protects the access to the queue. */ pthread_mutex_t LOCK_event_queue; pthread_cond_t COND_queue_state; Event_db_repository *db_repository; Loading @@ -96,6 +93,8 @@ class Event_queue /* The sorted queue with the Event_job_data objects */ QUEUE queue; TIME next_activation_at; uint mutex_last_locked_at_line; uint mutex_last_unlocked_at_line; uint mutex_last_attempted_lock_at_line; Loading @@ -104,6 +103,7 @@ class Event_queue const char* mutex_last_attempted_lock_in_func; bool mutex_queue_data_locked; bool mutex_queue_data_attempting_lock; bool waiting_on_cond; /* helper functions for working with mutexes & conditionals */ void Loading @@ -111,6 +111,10 @@ class Event_queue void unlock_data(const char *func, uint line); void cond_wait(THD *thd, struct timespec *abstime, const char* msg, const char *func, uint line); }; #endif /* _EVENT_QUEUE_H_ */ sql/event_scheduler.cc +79 −109 Original line number Diff line number Diff line Loading @@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q) LOCK_DATA(); queue= q; started_events= 0; thread_id= 0; scheduler_thd= NULL; state= INITIALIZED; UNLOCK_DATA(); } Loading Loading @@ -397,22 +397,18 @@ Event_scheduler::start() scheduler_param_value->thd= new_thd; scheduler_param_value->scheduler= this; scheduler_thd= new_thd; DBUG_PRINT("info", ("Setting state go RUNNING")); state= RUNNING; DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); if (pthread_create(&th, &connection_attrib, event_scheduler_thread, (void*)scheduler_param_value)) { DBUG_PRINT("error", ("cannot create a new thread")); state= INITIALIZED; scheduler_thd= NULL; ret= TRUE; } DBUG_PRINT("info", ("Setting state go RUNNING")); state= RUNNING; end: UNLOCK_DATA(); if (ret && new_thd) { DBUG_PRINT("info", ("There was an error during THD creation. Clean up")); new_thd->proc_info= "Clearing"; DBUG_ASSERT(new_thd->net.buff != 0); net_end(&new_thd->net); Loading @@ -422,6 +418,9 @@ Event_scheduler::start() delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } end: UNLOCK_DATA(); DBUG_RETURN(ret); } Loading @@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd) Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); LOCK_DATA(); thread_id= thd->thread_id; sql_print_information("SCHEDULER: Manager thread started with id %lu", thread_id); thd->thread_id); /* Recalculate the values in the queue because there could have been stops in executions of the scheduler and some times could have passed by. */ queue->recalculate_activation_times(thd); while (state == RUNNING) while (is_running()) { thd->end_time(); /* Gets a minimized version */ if (queue->get_top_for_execution_if_time(thd, thd->query_start(), &job_data, &abstime)) if (queue->get_top_for_execution_if_time(thd, &job_data)) { sql_print_information("SCHEDULER: Serious error during getting next " "event to execute. Stopping"); break; } DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " "abs_time.tv_sec=%d", job_data, thd->query_start(), abstime.tv_sec)); if (!job_data && !abstime.tv_sec) { DBUG_PRINT("info", ("The queue is empty. Going to sleep")); COND_STATE_WAIT(thd, NULL, "Waiting on empty queue"); DBUG_PRINT("info", ("Woke up. Got COND_state")); } else if (abstime.tv_sec) DBUG_PRINT("info", ("get_top returned job_data=0x%lx", job_data)); if (job_data) { DBUG_PRINT("info", ("Have to sleep some time %u s. till %u", abstime.tv_sec - thd->query_start(), abstime.tv_sec)); COND_STATE_WAIT(thd, &abstime, "Waiting for next activation"); /* If we get signal we should recalculate the whether it's the right time because there could be : 1. Spurious wake-up 2. The top of the queue was changed (new one becase of create/update) */ DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); if ((res= execute_top(thd, job_data))) break; } else { UNLOCK_DATA(); res= execute_top(thd, job_data); LOCK_DATA(); if (res) break; ++started_events; DBUG_ASSERT(thd->killed); DBUG_PRINT("info", ("job_data is NULL, the thread was killed")); } DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); } LOCK_DATA(); DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); pthread_cond_signal(&COND_state); error: state= INITIALIZED; pthread_cond_signal(&COND_state); UNLOCK_DATA(); sql_print_information("SCHEDULER: Stopped"); Loading Loading @@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) job_data))) goto error; ++started_events; DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd)); DBUG_RETURN(FALSE); Loading @@ -567,6 +543,27 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) } /* Checkes whether the state of the scheduler is RUNNING SYNOPSIS Event_scheduler::is_running() RETURN VALUE TRUE RUNNING FALSE Not RUNNING */ inline bool Event_scheduler::is_running() { LOCK_DATA(); bool ret= (state == RUNNING); UNLOCK_DATA(); return ret; } /* Stops the scheduler (again). Waits for acknowledgement from the scheduler that it has stopped - synchronous stopping. Loading @@ -591,26 +588,48 @@ Event_scheduler::stop() if (state != RUNNING) goto end; state= STOPPING; DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); pthread_cond_signal(&COND_state); /* Guarantee we don't catch spurious signals */ sql_print_information("SCHEDULER: Waiting the manager thread to reply"); do { DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " "thread. Current value of state is %s . " "workers count=%d", scheduler_states_names[state].str, workers_count())); /* NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON threads. In addition, kill_one_thread() requires THD but during shutdown current_thd is NULL. Hence, if kill_one_thread should be used it has to be modified to kill also daemons, by adding a flag, and also we have to create artificial THD here. To save all this work, we just do what kill_one_thread() does to kill a thread. See also sql_repl.cc for similar usage. */ state= STOPPING; DBUG_PRINT("info", ("Manager thread has id %d", scheduler_thd->thread_id)); /* Lock from delete */ pthread_mutex_lock(&scheduler_thd->LOCK_delete); /* This will wake up the thread if it waits on Queue's conditional */ sql_print_information("SCHEDULER: Killing manager thread %lu", scheduler_thd->thread_id); scheduler_thd->awake(THD::KILL_CONNECTION); pthread_mutex_unlock(&scheduler_thd->LOCK_delete); /* thd could be 0x0, when shutting down */ sql_print_information("SCHEDULER: Waiting the manager thread to reply"); COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop"); } while (state == STOPPING); DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); thread_id= 0; /* The rationale behind setting it to NULL here but not destructing it beforehand is because the THD will be deinited in event_scheduler_thread(). It's more clear when the post_init and the deinit is done in one function. Here we just mark that the scheduler doesn't have a THD anymore. Though for milliseconds the old thread could exist we can't use it anymore. When we unlock the mutex in this function a little later the state will be INITIALIZED. Therefore, a connection thread could enter the critical section and will create a new THD object. */ scheduler_thd= NULL; end: UNLOCK_DATA(); DBUG_RETURN(FALSE); Loading @@ -634,37 +653,14 @@ Event_scheduler::workers_count() pthread_mutex_lock(&LOCK_thread_count); // For unlink from list I_List_iterator<THD> it(threads); while ((tmp=it++)) { if (tmp->command == COM_DAEMON) continue; if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) ++count; } pthread_mutex_unlock(&LOCK_thread_count); DBUG_PRINT("exit", ("%d", count)); DBUG_RETURN(count); } /* Signals the main scheduler thread that the queue has changed its state. SYNOPSIS Event_scheduler::queue_changed() */ void Event_scheduler::queue_changed() { DBUG_ENTER("Event_scheduler::queue_changed"); DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ", scheduler_states_names[state].str)); pthread_cond_signal(&COND_state); DBUG_VOID_RETURN; } /* Auxiliary function for locking LOCK_scheduler_state. Used by the LOCK_DATA macro. Loading Loading @@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line) Event_scheduler::cond_wait() thd Thread (Could be NULL during shutdown procedure) abstime If not null then call pthread_cond_timedwait() msg Message for thd->proc_info func Which function is requesting cond_wait line On which line cond_wait is requested */ Loading Loading @@ -756,33 +753,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg, } /* Returns the current state of the scheduler SYNOPSIS Event_scheduler::get_state() RETURN VALUE The state of the scheduler (INITIALIZED | RUNNING | STOPPING) */ enum Event_scheduler::enum_state Event_scheduler::get_state() { enum Event_scheduler::enum_state ret; DBUG_ENTER("Event_scheduler::get_state"); LOCK_DATA(); ret= state; UNLOCK_DATA(); DBUG_RETURN(ret); } /* REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF Returns whether the scheduler was initialized. */ /* Dumps the internal status of the scheduler Loading Loading @@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->store(STRING_WITH_LEN("thread_id"), scs); if (thread_id) { int_string.set((longlong) thread_id, scs); int_string.set((longlong) scheduler_thd->thread_id, scs); protocol->store(&int_string); } else Loading sql/event_scheduler.h +12 −21 Original line number Diff line number Diff line Loading @@ -34,14 +34,6 @@ class Event_scheduler Event_scheduler():state(UNINITIALIZED){} ~Event_scheduler(){} enum enum_state { UNINITIALIZED = 0, INITIALIZED, RUNNING, STOPPING }; /* State changing methods follow */ bool Loading Loading @@ -70,12 +62,8 @@ class Event_scheduler deinit_mutexes(); /* Information retrieving methods follow */ enum enum_state get_state(); void queue_changed(); bool is_running(); bool dump_internal_status(THD *thd); Loading @@ -84,6 +72,7 @@ class Event_scheduler uint workers_count(); /* helper functions */ bool execute_top(THD *thd, Event_job_data *job_data); Loading @@ -101,16 +90,18 @@ class Event_scheduler pthread_mutex_t LOCK_scheduler_state; enum enum_state { UNINITIALIZED = 0, INITIALIZED, RUNNING, STOPPING }; /* This is the current status of the life-cycle of the scheduler. */ enum enum_state state; /* Holds the thread id of the executor thread or 0 if the scheduler is not running. It is used by ::shutdown() to know which thread to kill with kill_one_thread(). The latter wake ups a thread if it is waiting on a conditional variable and sets thd->killed to non-zero. */ ulong thread_id; THD *scheduler_thd; pthread_cond_t COND_state; Loading sql/events.cc +2 −2 Original line number Diff line number Diff line Loading @@ -630,7 +630,7 @@ Events::init() } check_system_tables_error= FALSE; if (event_queue->init_queue(thd, db_repository, scheduler)) if (event_queue->init_queue(thd, db_repository)) { sql_print_error("SCHEDULER: Error while loading from disk."); goto end; Loading Loading @@ -820,7 +820,7 @@ Events::is_execution_of_events_started() my_error(ER_EVENTS_DB_ERROR, MYF(0)); DBUG_RETURN(FALSE); } DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING); DBUG_RETURN(scheduler->is_running()); } Loading Loading
sql/event_queue.cc +124 −49 Original line number Diff line number Diff line Loading @@ -18,7 +18,6 @@ #include "event_queue.h" #include "event_data_objects.h" #include "event_db_repository.h" #include "event_scheduler.h" #define EVENT_QUEUE_INITIAL_SIZE 30 Loading Loading @@ -87,6 +86,7 @@ Event_queue::Event_queue() { mutex_last_unlocked_in_func= mutex_last_locked_in_func= mutex_last_attempted_lock_in_func= ""; set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); } Loading Loading @@ -135,8 +135,7 @@ Event_queue::deinit_mutexes() */ bool Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, Event_scheduler *sched) Event_queue::init_queue(THD *thd, Event_db_repository *db_repo) { pthread_t th; bool res; Loading @@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo, LOCK_QUEUE_DATA(); db_repository= db_repo; scheduler= sched; if (init_queue_ex(&queue, EVENT_QUEUE_INITIAL_SIZE , 0 /*offset*/, 0 /*max_on_top*/, event_queue_element_compare_q, Loading Loading @@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name) DBUG_PRINT("info", ("new event in the queue 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); dbug_dump_queue(thd->query_start()); pthread_cond_broadcast(&COND_queue_state); UNLOCK_QUEUE_DATA(); notify_observers(); } DBUG_RETURN(res); Loading Loading @@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name, { DBUG_PRINT("info", ("new event in the Q 0x%lx", new_element)); queue_insert_safe(&queue, (byte *) new_element); pthread_cond_broadcast(&COND_queue_state); } dbug_dump_queue(thd->query_start()); UNLOCK_QUEUE_DATA(); if (new_element) notify_observers(); end: DBUG_PRINT("info", ("res=%d", res)); DBUG_RETURN(res); Loading Loading @@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, i++; } /* We don't call notify_observers() . If we remove the top event: We don't call pthread_cond_broadcast(&COND_queue_state); If we remove the top event: 1. The queue is empty. The scheduler will wake up at some time and realize that the queue is empty. If create_event() comes inbetween it will signal the scheduler Loading Loading @@ -421,24 +418,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema) } /* Signals the observers (the main scheduler thread) that the state of the queue has been changed. SYNOPSIS Event_queue::notify_observers() */ void Event_queue::notify_observers() { DBUG_ENTER("Event_queue::notify_observers"); DBUG_PRINT("info", ("Signalling change of the queue")); scheduler->queue_changed(); DBUG_VOID_RETURN; } /* Searches for an event in the queue Loading Loading @@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now) #endif } static const char *queue_empty_msg= "Waiting on empty queue"; static const char *queue_wait_msg= "Waiting for next activation"; /* Checks whether the top of the queue is elligible for execution and Loading @@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now) */ bool Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data, struct timespec *abstime) Event_queue::get_top_for_execution_if_time(THD *thd, Event_job_data **job_data) { bool ret= FALSE; struct timespec top_time; struct timespec *abstime; *job_data= NULL; DBUG_ENTER("Event_queue::get_top_for_execution_if_time"); DBUG_PRINT("enter", ("thd=0x%lx now=%d", thd, now)); abstime->tv_nsec= 0; top_time.tv_nsec= 0; LOCK_QUEUE_DATA(); do { int res; if (!queue.elements) for (;;) { abstime->tv_sec= 0; break; } int res; Event_queue_element *top= NULL; Event_queue_element *top= ((Event_queue_element*) queue_element(&queue, 0)); thd->end_time(); time_t now= thd->query_start(); abstime= NULL; if (queue.elements) { top= ((Event_queue_element*) queue_element(&queue, 0)); top_time.tv_sec= sec_since_epoch_TIME(&top->execute_at); if (top_time.tv_sec > now) abstime= &top_time; } if (!abstime || abstime->tv_sec > now) { abstime->tv_sec= top_time.tv_sec; DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now, abstime->tv_sec)); break; const char *msg; if (abstime) { next_activation_at= top->execute_at; msg= queue_wait_msg; } else { set_zero_time(&next_activation_at, MYSQL_TIMESTAMP_DATETIME); msg= queue_wait_msg; } cond_wait(thd, abstime, msg, SCHED_FUNC, __LINE__); if (thd->killed) { DBUG_PRINT("info", ("thd->killed=%d", thd->killed)); goto end; } /* The queue could have been emptied. Therefore it's safe to start from the beginning. Moreover, this way we will get also the new top, if the element at the top has been changed. */ continue; } DBUG_PRINT("info", ("Ready for execution")); abstime->tv_sec= 0; if (!(*job_data= new Event_job_data())) { ret= TRUE; Loading @@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, if ((res= db_repository->load_named_event(thd, top->dbname, top->name, *job_data))) { DBUG_PRINT("error", ("Got %d from load_named_event", res)); delete *job_data; *job_data= NULL; ret= TRUE; Loading Loading @@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now, queue_replaced(&queue); dbug_dump_queue(now); } while (0); break; } end: UNLOCK_QUEUE_DATA(); DBUG_PRINT("info", ("returning %d. et_new=0x%lx abstime.tv_sec=%d ", ret, *job_data, abstime->tv_sec)); ret, *job_data, abstime? abstime->tv_sec:0)); if (*job_data) DBUG_PRINT("info", ("db=%s name=%s definer=%s", (*job_data)->dbname.str, Loading Loading @@ -864,6 +871,52 @@ Event_queue::unlock_data(const char *func, uint line) } /* Wrapper for pthread_cond_wait/timedwait SYNOPSIS Event_queue::cond_wait() thd Thread (Could be NULL during shutdown procedure) msg Message for thd->proc_info abstime If not null then call pthread_cond_timedwait() func Which function is requesting cond_wait line On which line cond_wait is requested */ void Event_queue::cond_wait(THD *thd, struct timespec *abstime, const char* msg, const char *func, uint line) { DBUG_ENTER("Event_queue::cond_wait"); waiting_on_cond= TRUE; mutex_last_unlocked_at_line= line; mutex_queue_data_locked= FALSE; mutex_last_unlocked_in_func= func; thd->enter_cond(&COND_queue_state, &LOCK_event_queue, msg); DBUG_PRINT("info", ("pthread_cond_%swait", abstime? "timed":"")); if (!abstime) pthread_cond_wait(&COND_queue_state, &LOCK_event_queue); else pthread_cond_timedwait(&COND_queue_state, &LOCK_event_queue, abstime); mutex_last_locked_in_func= func; mutex_last_locked_at_line= line; mutex_queue_data_locked= TRUE; waiting_on_cond= FALSE; /* This will free the lock so we need to relock. Not the best thing to do but we need to obey cond_wait() */ thd->exit_cond(""); LOCK_QUEUE_DATA(); DBUG_VOID_RETURN; } /* Dumps the internal status of the queue Loading Loading @@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd) protocol->store(&tmp_string); ret= protocol->write(); /* waiting on */ protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("queue waiting on condition"), scs); int_string.set((longlong) waiting_on_cond, scs); protocol->store(&int_string); ret= protocol->write(); protocol->prepare_for_resend(); protocol->store(STRING_WITH_LEN("next activation at"), scs); tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), tmp_string.alloced_length(), "%4d-%02d-%02d %02d:%02d:%02d", next_activation_at.year, next_activation_at.month, next_activation_at.day, next_activation_at.hour, next_activation_at.minute, next_activation_at.second )); protocol->store(&tmp_string); ret= protocol->write(); #endif DBUG_RETURN(FALSE); }
sql/event_queue.h +10 −6 Original line number Diff line number Diff line Loading @@ -36,7 +36,7 @@ class Event_queue deinit_mutexes(); bool init_queue(THD *thd, Event_db_repository *db_repo, Event_scheduler *sched); init_queue(THD *thd, Event_db_repository *db_repo); void deinit_queue(); Loading @@ -60,8 +60,7 @@ class Event_queue recalculate_activation_times(THD *thd); bool get_top_for_execution_if_time(THD *thd, time_t now, Event_job_data **job_data, struct timespec *abstime); get_top_for_execution_if_time(THD *thd, Event_job_data **job_data); bool dump_internal_status(THD *thd); Loading @@ -80,14 +79,12 @@ class Event_queue void empty_queue(); void notify_observers(); void dbug_dump_queue(time_t now); /* LOCK_event_queue is the mutex which protects the access to the queue. */ pthread_mutex_t LOCK_event_queue; pthread_cond_t COND_queue_state; Event_db_repository *db_repository; Loading @@ -96,6 +93,8 @@ class Event_queue /* The sorted queue with the Event_job_data objects */ QUEUE queue; TIME next_activation_at; uint mutex_last_locked_at_line; uint mutex_last_unlocked_at_line; uint mutex_last_attempted_lock_at_line; Loading @@ -104,6 +103,7 @@ class Event_queue const char* mutex_last_attempted_lock_in_func; bool mutex_queue_data_locked; bool mutex_queue_data_attempting_lock; bool waiting_on_cond; /* helper functions for working with mutexes & conditionals */ void Loading @@ -111,6 +111,10 @@ class Event_queue void unlock_data(const char *func, uint line); void cond_wait(THD *thd, struct timespec *abstime, const char* msg, const char *func, uint line); }; #endif /* _EVENT_QUEUE_H_ */
sql/event_scheduler.cc +79 −109 Original line number Diff line number Diff line Loading @@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q) LOCK_DATA(); queue= q; started_events= 0; thread_id= 0; scheduler_thd= NULL; state= INITIALIZED; UNLOCK_DATA(); } Loading Loading @@ -397,22 +397,18 @@ Event_scheduler::start() scheduler_param_value->thd= new_thd; scheduler_param_value->scheduler= this; scheduler_thd= new_thd; DBUG_PRINT("info", ("Setting state go RUNNING")); state= RUNNING; DBUG_PRINT("info", ("Forking new thread for scheduduler. THD=0x%lx", new_thd)); if (pthread_create(&th, &connection_attrib, event_scheduler_thread, (void*)scheduler_param_value)) { DBUG_PRINT("error", ("cannot create a new thread")); state= INITIALIZED; scheduler_thd= NULL; ret= TRUE; } DBUG_PRINT("info", ("Setting state go RUNNING")); state= RUNNING; end: UNLOCK_DATA(); if (ret && new_thd) { DBUG_PRINT("info", ("There was an error during THD creation. Clean up")); new_thd->proc_info= "Clearing"; DBUG_ASSERT(new_thd->net.buff != 0); net_end(&new_thd->net); Loading @@ -422,6 +418,9 @@ Event_scheduler::start() delete new_thd; pthread_mutex_unlock(&LOCK_thread_count); } end: UNLOCK_DATA(); DBUG_RETURN(ret); } Loading @@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd) Event_job_data *job_data; DBUG_ENTER("Event_scheduler::run"); LOCK_DATA(); thread_id= thd->thread_id; sql_print_information("SCHEDULER: Manager thread started with id %lu", thread_id); thd->thread_id); /* Recalculate the values in the queue because there could have been stops in executions of the scheduler and some times could have passed by. */ queue->recalculate_activation_times(thd); while (state == RUNNING) while (is_running()) { thd->end_time(); /* Gets a minimized version */ if (queue->get_top_for_execution_if_time(thd, thd->query_start(), &job_data, &abstime)) if (queue->get_top_for_execution_if_time(thd, &job_data)) { sql_print_information("SCHEDULER: Serious error during getting next " "event to execute. Stopping"); break; } DBUG_PRINT("info", ("get_top returned job_data=0x%lx now=%d " "abs_time.tv_sec=%d", job_data, thd->query_start(), abstime.tv_sec)); if (!job_data && !abstime.tv_sec) { DBUG_PRINT("info", ("The queue is empty. Going to sleep")); COND_STATE_WAIT(thd, NULL, "Waiting on empty queue"); DBUG_PRINT("info", ("Woke up. Got COND_state")); } else if (abstime.tv_sec) DBUG_PRINT("info", ("get_top returned job_data=0x%lx", job_data)); if (job_data) { DBUG_PRINT("info", ("Have to sleep some time %u s. till %u", abstime.tv_sec - thd->query_start(), abstime.tv_sec)); COND_STATE_WAIT(thd, &abstime, "Waiting for next activation"); /* If we get signal we should recalculate the whether it's the right time because there could be : 1. Spurious wake-up 2. The top of the queue was changed (new one becase of create/update) */ DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); if ((res= execute_top(thd, job_data))) break; } else { UNLOCK_DATA(); res= execute_top(thd, job_data); LOCK_DATA(); if (res) break; ++started_events; DBUG_ASSERT(thd->killed); DBUG_PRINT("info", ("job_data is NULL, the thread was killed")); } DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str)); } LOCK_DATA(); DBUG_PRINT("info", ("Signalling back to the stopper COND_state")); pthread_cond_signal(&COND_state); error: state= INITIALIZED; pthread_cond_signal(&COND_state); UNLOCK_DATA(); sql_print_information("SCHEDULER: Stopped"); Loading Loading @@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) job_data))) goto error; ++started_events; DBUG_PRINT("info", ("Launch succeeded. BURAN is in THD=0x%lx", new_thd)); DBUG_RETURN(FALSE); Loading @@ -567,6 +543,27 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data) } /* Checkes whether the state of the scheduler is RUNNING SYNOPSIS Event_scheduler::is_running() RETURN VALUE TRUE RUNNING FALSE Not RUNNING */ inline bool Event_scheduler::is_running() { LOCK_DATA(); bool ret= (state == RUNNING); UNLOCK_DATA(); return ret; } /* Stops the scheduler (again). Waits for acknowledgement from the scheduler that it has stopped - synchronous stopping. Loading @@ -591,26 +588,48 @@ Event_scheduler::stop() if (state != RUNNING) goto end; state= STOPPING; DBUG_PRINT("info", ("Manager thread has id %d", thread_id)); sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id); pthread_cond_signal(&COND_state); /* Guarantee we don't catch spurious signals */ sql_print_information("SCHEDULER: Waiting the manager thread to reply"); do { DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager " "thread. Current value of state is %s . " "workers count=%d", scheduler_states_names[state].str, workers_count())); /* NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON threads. In addition, kill_one_thread() requires THD but during shutdown current_thd is NULL. Hence, if kill_one_thread should be used it has to be modified to kill also daemons, by adding a flag, and also we have to create artificial THD here. To save all this work, we just do what kill_one_thread() does to kill a thread. See also sql_repl.cc for similar usage. */ state= STOPPING; DBUG_PRINT("info", ("Manager thread has id %d", scheduler_thd->thread_id)); /* Lock from delete */ pthread_mutex_lock(&scheduler_thd->LOCK_delete); /* This will wake up the thread if it waits on Queue's conditional */ sql_print_information("SCHEDULER: Killing manager thread %lu", scheduler_thd->thread_id); scheduler_thd->awake(THD::KILL_CONNECTION); pthread_mutex_unlock(&scheduler_thd->LOCK_delete); /* thd could be 0x0, when shutting down */ sql_print_information("SCHEDULER: Waiting the manager thread to reply"); COND_STATE_WAIT(thd, NULL, "Waiting scheduler to stop"); } while (state == STOPPING); DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT")); thread_id= 0; /* The rationale behind setting it to NULL here but not destructing it beforehand is because the THD will be deinited in event_scheduler_thread(). It's more clear when the post_init and the deinit is done in one function. Here we just mark that the scheduler doesn't have a THD anymore. Though for milliseconds the old thread could exist we can't use it anymore. When we unlock the mutex in this function a little later the state will be INITIALIZED. Therefore, a connection thread could enter the critical section and will create a new THD object. */ scheduler_thd= NULL; end: UNLOCK_DATA(); DBUG_RETURN(FALSE); Loading @@ -634,37 +653,14 @@ Event_scheduler::workers_count() pthread_mutex_lock(&LOCK_thread_count); // For unlink from list I_List_iterator<THD> it(threads); while ((tmp=it++)) { if (tmp->command == COM_DAEMON) continue; if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER) ++count; } pthread_mutex_unlock(&LOCK_thread_count); DBUG_PRINT("exit", ("%d", count)); DBUG_RETURN(count); } /* Signals the main scheduler thread that the queue has changed its state. SYNOPSIS Event_scheduler::queue_changed() */ void Event_scheduler::queue_changed() { DBUG_ENTER("Event_scheduler::queue_changed"); DBUG_PRINT("info", ("Sending COND_state. state (read wo lock)=%s ", scheduler_states_names[state].str)); pthread_cond_signal(&COND_state); DBUG_VOID_RETURN; } /* Auxiliary function for locking LOCK_scheduler_state. Used by the LOCK_DATA macro. Loading Loading @@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line) Event_scheduler::cond_wait() thd Thread (Could be NULL during shutdown procedure) abstime If not null then call pthread_cond_timedwait() msg Message for thd->proc_info func Which function is requesting cond_wait line On which line cond_wait is requested */ Loading Loading @@ -756,33 +753,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg, } /* Returns the current state of the scheduler SYNOPSIS Event_scheduler::get_state() RETURN VALUE The state of the scheduler (INITIALIZED | RUNNING | STOPPING) */ enum Event_scheduler::enum_state Event_scheduler::get_state() { enum Event_scheduler::enum_state ret; DBUG_ENTER("Event_scheduler::get_state"); LOCK_DATA(); ret= state; UNLOCK_DATA(); DBUG_RETURN(ret); } /* REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF Returns whether the scheduler was initialized. */ /* Dumps the internal status of the scheduler Loading Loading @@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd) protocol->store(STRING_WITH_LEN("thread_id"), scs); if (thread_id) { int_string.set((longlong) thread_id, scs); int_string.set((longlong) scheduler_thd->thread_id, scs); protocol->store(&int_string); } else Loading
sql/event_scheduler.h +12 −21 Original line number Diff line number Diff line Loading @@ -34,14 +34,6 @@ class Event_scheduler Event_scheduler():state(UNINITIALIZED){} ~Event_scheduler(){} enum enum_state { UNINITIALIZED = 0, INITIALIZED, RUNNING, STOPPING }; /* State changing methods follow */ bool Loading Loading @@ -70,12 +62,8 @@ class Event_scheduler deinit_mutexes(); /* Information retrieving methods follow */ enum enum_state get_state(); void queue_changed(); bool is_running(); bool dump_internal_status(THD *thd); Loading @@ -84,6 +72,7 @@ class Event_scheduler uint workers_count(); /* helper functions */ bool execute_top(THD *thd, Event_job_data *job_data); Loading @@ -101,16 +90,18 @@ class Event_scheduler pthread_mutex_t LOCK_scheduler_state; enum enum_state { UNINITIALIZED = 0, INITIALIZED, RUNNING, STOPPING }; /* This is the current status of the life-cycle of the scheduler. */ enum enum_state state; /* Holds the thread id of the executor thread or 0 if the scheduler is not running. It is used by ::shutdown() to know which thread to kill with kill_one_thread(). The latter wake ups a thread if it is waiting on a conditional variable and sets thd->killed to non-zero. */ ulong thread_id; THD *scheduler_thd; pthread_cond_t COND_state; Loading
sql/events.cc +2 −2 Original line number Diff line number Diff line Loading @@ -630,7 +630,7 @@ Events::init() } check_system_tables_error= FALSE; if (event_queue->init_queue(thd, db_repository, scheduler)) if (event_queue->init_queue(thd, db_repository)) { sql_print_error("SCHEDULER: Error while loading from disk."); goto end; Loading Loading @@ -820,7 +820,7 @@ Events::is_execution_of_events_started() my_error(ER_EVENTS_DB_ERROR, MYF(0)); DBUG_RETURN(FALSE); } DBUG_RETURN(scheduler->get_state() == Event_scheduler::RUNNING); DBUG_RETURN(scheduler->is_running()); } Loading