Loading include/thr_lock.h +37 −4 Original line number Diff line number Diff line Loading @@ -62,17 +62,45 @@ enum thr_lock_type { TL_IGNORE=-1, /* Abort new lock request with an error */ TL_WRITE_ONLY}; enum enum_thr_lock_result { THR_LOCK_SUCCESS= 0, THR_LOCK_ABORTED= 1, THR_LOCK_WAIT_TIMEOUT= 2, THR_LOCK_DEADLOCK= 3 }; extern ulong max_write_lock_count; extern ulong table_lock_wait_timeout; extern my_bool thr_lock_inited; extern enum thr_lock_type thr_upgraded_concurrent_insert_lock; typedef struct st_thr_lock_data { /* A description of the thread which owns the lock. The address of an instance of this structure is used to uniquely identify the thread. */ typedef struct st_thr_lock_info { pthread_t thread; ulong thread_id; ulong n_cursors; } THR_LOCK_INFO; /* Lock owner identifier. Globally identifies the lock owner within the thread and among all the threads. The address of an instance of this structure is used as id. */ typedef struct st_thr_lock_owner { THR_LOCK_INFO *info; } THR_LOCK_OWNER; typedef struct st_thr_lock_data { THR_LOCK_OWNER *owner; struct st_thr_lock_data *next,**prev; struct st_thr_lock *lock; pthread_cond_t *cond; enum thr_lock_type type; ulong thread_id; void *status_param; /* Param to status functions */ void *debug_print_param; } THR_LOCK_DATA; Loading Loading @@ -102,13 +130,18 @@ extern LIST *thr_lock_thread_list; extern pthread_mutex_t THR_LOCK_lock; my_bool init_thr_lock(void); /* Must be called once/thread */ #define thr_lock_owner_init(owner, info_arg) (owner)->info= (info_arg) void thr_lock_info_init(THR_LOCK_INFO *info); void thr_lock_init(THR_LOCK *lock); void thr_lock_delete(THR_LOCK *lock); void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *status_param); int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type); enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type); void thr_unlock(THR_LOCK_DATA *data); int thr_multi_lock(THR_LOCK_DATA **data,uint count); enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_OWNER *owner); void thr_multi_unlock(THR_LOCK_DATA **data,uint count); void thr_abort_locks(THR_LOCK *lock); void thr_abort_locks_for_thread(THR_LOCK *lock, pthread_t thread); Loading mysys/thr_lock.c +131 −53 Original line number Diff line number Diff line Loading @@ -84,6 +84,7 @@ multiple read locks. my_bool thr_lock_inited=0; ulong locks_immediate = 0L, locks_waited = 0L; ulong table_lock_wait_timeout; enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE; /* The following constants are only for debug output */ Loading @@ -109,25 +110,32 @@ my_bool init_thr_lock() return 0; } static inline my_bool thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs) { return rhs == lhs; } #ifdef EXTRA_DEBUG #define MAX_FOUND_ERRORS 10 /* Report 10 first errors */ static uint found_errors=0; static int check_lock(struct st_lock_list *list, const char* lock_type, const char *where, my_bool same_thread, bool no_cond) const char *where, my_bool same_owner, bool no_cond) { THR_LOCK_DATA *data,**prev; uint count=0; pthread_t first_thread; LINT_INIT(first_thread); THR_LOCK_OWNER *first_owner; LINT_INIT(first_owner); prev= &list->data; if (list->data) { enum thr_lock_type last_lock_type=list->data->type; if (same_thread && list->data) first_thread=list->data->thread; if (same_owner && list->data) first_owner= list->data->owner; for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next) { if (data->type != last_lock_type) Loading @@ -139,7 +147,8 @@ static int check_lock(struct st_lock_list *list, const char* lock_type, count, lock_type, where); return 1; } if (same_thread && ! pthread_equal(data->thread,first_thread) && if (same_owner && !thr_lock_owner_equal(data->owner, first_owner) && last_lock_type != TL_WRITE_ALLOW_WRITE) { fprintf(stderr, Loading Loading @@ -255,8 +264,8 @@ static void check_locks(THR_LOCK *lock, const char *where, } if (lock->read.data) { if (!pthread_equal(lock->write.data->thread, lock->read.data->thread) && if (!thr_lock_owner_equal(lock->write.data->owner, lock->read.data->owner) && ((lock->write.data->type > TL_WRITE_DELAYED && lock->write.data->type != TL_WRITE_ONLY) || ((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT || Loading Loading @@ -330,24 +339,32 @@ void thr_lock_delete(THR_LOCK *lock) DBUG_VOID_RETURN; } void thr_lock_info_init(THR_LOCK_INFO *info) { info->thread= pthread_self(); info->thread_id= my_thread_id(); /* for debugging */ info->n_cursors= 0; } /* Initialize a lock instance */ void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param) { data->lock=lock; data->type=TL_UNLOCK; data->thread=pthread_self(); data->thread_id=my_thread_id(); /* for debugging */ data->owner= 0; /* no owner yet */ data->status_param=param; data->cond=0; } static inline my_bool have_old_read_lock(THR_LOCK_DATA *data,pthread_t thread) static inline my_bool have_old_read_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner) { for ( ; data ; data=data->next) { if ((pthread_equal(data->thread,thread))) if (thr_lock_owner_equal(data->owner, owner)) return 1; /* Already locked by thread */ } return 0; Loading @@ -365,12 +382,16 @@ static inline my_bool have_specific_lock(THR_LOCK_DATA *data, } static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, static enum enum_thr_lock_result wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, my_bool in_wait_list) { pthread_cond_t *cond=get_cond(); struct st_my_thread_var *thread_var= my_thread_var; int result; pthread_cond_t *cond= &thread_var->suspend; struct timeval now; struct timespec wait_timeout; enum enum_thr_lock_result result= THR_LOCK_ABORTED; my_bool can_deadlock= test(data->owner->info->n_cursors); if (!in_wait_list) { Loading @@ -382,31 +403,56 @@ static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, /* Set up control struct to allow others to abort locks */ thread_var->current_mutex= &data->lock->mutex; thread_var->current_cond= cond; data->cond= cond; if (can_deadlock) { gettimeofday(&now, 0); wait_timeout.tv_sec= now.tv_sec + table_lock_wait_timeout; wait_timeout.tv_nsec= now.tv_usec * 1000; } while (!thread_var->abort || in_wait_list) { int rc= can_deadlock ? pthread_cond_timedwait(cond, &data->lock->mutex, &wait_timeout) : pthread_cond_wait(cond, &data->lock->mutex); if (data->cond != cond) /* We must break the wait if one of the following occurs: - the connection has been aborted (!thread_var->abort), but this is not a delayed insert thread (in_wait_list). For a delayed insert thread the proper action at shutdown is, apparently, to acquire the lock and complete the insert. - the lock has been granted (data->cond is set to NULL by the granter), or the waiting has been aborted (additionally data->type is set to TL_UNLOCK). - the wait has timed out (rc == ETIMEDOUT) Order of checks below is important to not report about timeout if the predicate is true. */ if (data->cond == 0) break; if (rc == ETIMEDOUT) { result= THR_LOCK_WAIT_TIMEOUT; break; } } if (data->cond || data->type == TL_UNLOCK) { if (data->cond) /* aborted */ if (data->cond) /* aborted or timed out */ { if (((*data->prev)=data->next)) /* remove from wait-list */ data->next->prev= data->prev; else wait->last=data->prev; } data->type= TL_UNLOCK; /* No lock */ result=1; /* Didn't get lock */ } check_locks(data->lock,"failed wait_for_lock",0); } else { result=0; result= THR_LOCK_SUCCESS; statistic_increment(locks_waited, &THR_LOCK_lock); if (data->lock->get_status) (*data->lock->get_status)(data->status_param, 0); Loading @@ -423,20 +469,24 @@ static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, } int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type) { THR_LOCK *lock=data->lock; int result=0; enum enum_thr_lock_result result= THR_LOCK_SUCCESS; struct st_lock_list *wait_queue; THR_LOCK_DATA *lock_owner; DBUG_ENTER("thr_lock"); data->next=0; data->cond=0; /* safety */ data->type=lock_type; data->thread=pthread_self(); /* Must be reset ! */ data->thread_id=my_thread_id(); /* Must be reset ! */ data->owner= owner; /* Must be reset ! */ VOID(pthread_mutex_lock(&lock->mutex)); DBUG_PRINT("lock",("data: 0x%lx thread: %ld lock: 0x%lx type: %d", data,data->thread_id,lock,(int) lock_type)); data, data->owner->info->thread_id, lock, (int) lock_type)); check_locks(lock,(uint) lock_type <= (uint) TL_READ_NO_INSERT ? "enter read_lock" : "enter write_lock",0); if ((int) lock_type <= (int) TL_READ_NO_INSERT) Loading @@ -454,8 +504,8 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) */ DBUG_PRINT("lock",("write locked by thread: %ld", lock->write.data->thread_id)); if (pthread_equal(data->thread,lock->write.data->thread) || lock->write.data->owner->info->thread_id)); if (thr_lock_owner_equal(data->owner, lock->write.data->owner) || (lock->write.data->type <= TL_WRITE_DELAYED && (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) || (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT && Loading @@ -476,14 +526,14 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) { /* We are not allowed to get a READ lock in this case */ data->type=TL_UNLOCK; result=1; /* Can't wait for this one */ result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } } else if (!lock->write_wait.data || lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY || lock_type == TL_READ_HIGH_PRIORITY || have_old_read_lock(lock->read.data,data->thread)) have_old_read_lock(lock->read.data, data->owner)) { /* No important write-locks */ (*lock->read.last)=data; /* Add to running FIFO */ data->prev=lock->read.last; Loading @@ -496,8 +546,12 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) statistic_increment(locks_immediate,&THR_LOCK_lock); goto end; } /* Can't get lock yet; Wait for it */ DBUG_RETURN(wait_for_lock(&lock->read_wait,data,0)); /* We're here if there is an active write lock or no write lock but a high priority write waiting in the write_wait queue. In the latter case we should yield the lock to the writer. */ wait_queue= &lock->read_wait; } else /* Request for WRITE lock */ { Loading @@ -506,7 +560,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY) { data->type=TL_UNLOCK; result=1; /* Can't wait for this one */ result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } /* Loading Loading @@ -540,7 +594,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) { /* We are not allowed to get a lock in this case */ data->type=TL_UNLOCK; result=1; /* Can't wait for this one */ result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } Loading @@ -549,7 +603,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) TL_WRITE_ALLOW_WRITE, TL_WRITE_ALLOW_READ or TL_WRITE_DELAYED in the same thread, but this will never happen within MySQL. */ if (pthread_equal(data->thread,lock->write.data->thread) || if (thr_lock_owner_equal(data->owner, lock->write.data->owner) || (lock_type == TL_WRITE_ALLOW_WRITE && !lock->write_wait.data && lock->write.data->type == TL_WRITE_ALLOW_WRITE)) Loading @@ -572,7 +626,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) goto end; } DBUG_PRINT("lock",("write locked by thread: %ld", lock->write.data->thread_id)); lock->write.data->owner->info->thread_id)); } else { Loading Loading @@ -608,10 +662,24 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) } } DBUG_PRINT("lock",("write locked by thread: %ld, type: %ld", lock->read.data->thread_id,data->type)); lock->read.data->owner->info->thread_id, data->type)); } DBUG_RETURN(wait_for_lock(&lock->write_wait,data,0)); wait_queue= &lock->write_wait; } /* Try to detect a trivial deadlock when using cursors: attempt to lock a table that is already locked by an open cursor within the same connection. lock_owner can be zero if we succumbed to a high priority writer in the write_wait queue. */ lock_owner= lock->read.data ? lock->read.data : lock->write.data; if (lock_owner && lock_owner->owner->info == owner->info) { result= THR_LOCK_DEADLOCK; goto end; } /* Can't get lock yet; Wait for it */ DBUG_RETURN(wait_for_lock(wait_queue, data, 0)); end: pthread_mutex_unlock(&lock->mutex); DBUG_RETURN(result); Loading Loading @@ -656,7 +724,7 @@ static inline void free_all_read_locks(THR_LOCK *lock, lock->read_no_write_count++; } DBUG_PRINT("lock",("giving read lock to thread: %ld", data->thread_id)); data->owner->info->thread_id)); data->cond=0; /* Mark thread free */ VOID(pthread_cond_signal(cond)); } while ((data=data->next)); Loading @@ -674,7 +742,7 @@ void thr_unlock(THR_LOCK_DATA *data) enum thr_lock_type lock_type=data->type; DBUG_ENTER("thr_unlock"); DBUG_PRINT("lock",("data: 0x%lx thread: %ld lock: 0x%lx", data,data->thread_id,lock)); data, data->owner->info->thread_id, lock)); pthread_mutex_lock(&lock->mutex); check_locks(lock,"start of release lock",0); Loading Loading @@ -734,7 +802,7 @@ void thr_unlock(THR_LOCK_DATA *data) (*lock->check_status)(data->status_param)) data->type=TL_WRITE; /* Upgrade lock */ DBUG_PRINT("lock",("giving write lock of type %d to thread: %ld", data->type,data->thread_id)); data->type, data->owner->info->thread_id)); { pthread_cond_t *cond=data->cond; data->cond=0; /* Mark thread free */ Loading Loading @@ -842,7 +910,8 @@ static void sort_locks(THR_LOCK_DATA **data,uint count) } int thr_multi_lock(THR_LOCK_DATA **data,uint count) enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_OWNER *owner) { THR_LOCK_DATA **pos,**end; DBUG_ENTER("thr_multi_lock"); Loading @@ -852,10 +921,11 @@ int thr_multi_lock(THR_LOCK_DATA **data,uint count) /* lock everything */ for (pos=data,end=data+count; pos < end ; pos++) { if (thr_lock(*pos,(*pos)->type)) enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type); if (result != THR_LOCK_SUCCESS) { /* Aborted */ thr_multi_unlock(data,(uint) (pos-data)); DBUG_RETURN(1); DBUG_RETURN(result); } #ifdef MAIN printf("Thread: %s Got lock: 0x%lx type: %d\n",my_thread_name(), Loading Loading @@ -909,7 +979,7 @@ int thr_multi_lock(THR_LOCK_DATA **data,uint count) } while (pos != data); } #endif DBUG_RETURN(0); DBUG_RETURN(THR_LOCK_SUCCESS); } /* free all locks */ Loading @@ -932,7 +1002,7 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count) else { DBUG_PRINT("lock",("Free lock: data: 0x%lx thread: %ld lock: 0x%lx", *pos,(*pos)->thread_id,(*pos)->lock)); *pos, (*pos)->owner->info->thread_id, (*pos)->lock)); } } DBUG_VOID_RETURN; Loading @@ -952,6 +1022,7 @@ void thr_abort_locks(THR_LOCK *lock) for (data=lock->read_wait.data; data ; data=data->next) { data->type=TL_UNLOCK; /* Mark killed */ /* It's safe to signal the cond first: we're still holding the mutex. */ pthread_cond_signal(data->cond); data->cond=0; /* Removed from list */ } Loading Loading @@ -985,10 +1056,11 @@ void thr_abort_locks_for_thread(THR_LOCK *lock, pthread_t thread) pthread_mutex_lock(&lock->mutex); for (data= lock->read_wait.data; data ; data= data->next) { if (pthread_equal(thread, data->thread)) if (pthread_equal(thread, data->owner->info->thread)) { DBUG_PRINT("info",("Aborting read-wait lock")); data->type= TL_UNLOCK; /* Mark killed */ /* It's safe to signal the cond first: we're still holding the mutex. */ pthread_cond_signal(data->cond); data->cond= 0; /* Removed from list */ Loading @@ -1000,7 +1072,7 @@ void thr_abort_locks_for_thread(THR_LOCK *lock, pthread_t thread) } for (data= lock->write_wait.data; data ; data= data->next) { if (pthread_equal(thread, data->thread)) if (pthread_equal(thread, data->owner->info->thread)) { DBUG_PRINT("info",("Aborting write-wait lock")); data->type= TL_UNLOCK; Loading Loading @@ -1117,7 +1189,8 @@ static void thr_print_lock(const char* name,struct st_lock_list *list) prev= &list->data; for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next) { printf("0x%lx (%lu:%d); ",(ulong) data,data->thread_id,(int) data->type); printf("0x%lx (%lu:%d); ", (ulong) data, data->owner->info->thread_id, (int) data->type); if (data->prev != prev) printf("\nWarning: prev didn't point at previous lock\n"); prev= &data->next; Loading Loading @@ -1250,11 +1323,16 @@ static void *test_thread(void *arg) { int i,j,param=*((int*) arg); THR_LOCK_DATA data[MAX_LOCK_COUNT]; THR_LOCK_OWNER owner; THR_LOCK_INFO lock_info; THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT]; my_thread_init(); printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout); thr_lock_info_init(&lock_info); thr_lock_owner_init(&owner, &lock_info); for (i=0; i < lock_counts[param] ; i++) thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL); for (j=1 ; j < 10 ; j++) /* try locking 10 times */ Loading @@ -1264,7 +1342,7 @@ static void *test_thread(void *arg) multi_locks[i]= &data[i]; data[i].type= tests[param][i].lock_type; } thr_multi_lock(multi_locks,lock_counts[param]); thr_multi_lock(multi_locks, lock_counts[param], &owner); pthread_mutex_lock(&LOCK_thread_count); { int tmp=rand() & 7; /* Do something from 0-2 sec */ Loading sql/examples/ha_archive.cc +11 −1 Original line number Diff line number Diff line Loading @@ -149,7 +149,8 @@ static handlerton archive_hton = { 0, /* prepare */ 0, /* recover */ 0, /* commit_by_xid */ 0 /* rollback_by_xid */ 0, /* rollback_by_xid */ HTON_NO_FLAGS }; Loading Loading @@ -208,6 +209,15 @@ bool archive_db_end() return FALSE; } ha_archive::ha_archive(TABLE *table_arg) :handler(&archive_hton, table_arg), delayed_insert(0), bulk_insert(0) { /* Set our original buffer from pre-allocated memory */ buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); /* The size of the offset value we will use for position() */ ref_length = sizeof(z_off_t); } /* This method reads the header of a datafile and returns whether or not it was successful. Loading sql/examples/ha_archive.h +1 −8 Original line number Diff line number Diff line Loading @@ -58,14 +58,7 @@ class ha_archive: public handler bool bulk_insert; /* If we are performing a bulk insert */ public: ha_archive(TABLE *table): handler(table), delayed_insert(0), bulk_insert(0) { /* Set our original buffer from pre-allocated memory */ buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); /* The size of the offset value we will use for position() */ ref_length = sizeof(z_off_t); } ha_archive(TABLE *table_arg); ~ha_archive() { } Loading sql/examples/ha_example.cc +22 −0 Original line number Diff line number Diff line Loading @@ -72,6 +72,24 @@ #ifdef HAVE_EXAMPLE_DB #include "ha_example.h" static handlerton example_hton= { "CSV", 0, /* slot */ 0, /* savepoint size. */ 0, /* close_connection */ 0, /* savepoint */ 0, /* rollback to savepoint */ 0, /* release savepoint */ 0, /* commit */ 0, /* rollback */ 0, /* prepare */ 0, /* recover */ 0, /* commit_by_xid */ 0, /* rollback_by_xid */ HTON_NO_FLAGS }; /* Variables for example share methods */ static HASH example_open_tables; // Hash used to track open tables pthread_mutex_t example_mutex; // This is the mutex we use to init the hash Loading Loading @@ -179,6 +197,10 @@ static int free_share(EXAMPLE_SHARE *share) } ha_example::ha_example(TABLE *table_arg) :handler(&example_hton, table_arg) {} /* If frm_error() is called then we will use this to to find out what file extentions exist for the storage engine. This is also used by the default rename_table and Loading Loading
include/thr_lock.h +37 −4 Original line number Diff line number Diff line Loading @@ -62,17 +62,45 @@ enum thr_lock_type { TL_IGNORE=-1, /* Abort new lock request with an error */ TL_WRITE_ONLY}; enum enum_thr_lock_result { THR_LOCK_SUCCESS= 0, THR_LOCK_ABORTED= 1, THR_LOCK_WAIT_TIMEOUT= 2, THR_LOCK_DEADLOCK= 3 }; extern ulong max_write_lock_count; extern ulong table_lock_wait_timeout; extern my_bool thr_lock_inited; extern enum thr_lock_type thr_upgraded_concurrent_insert_lock; typedef struct st_thr_lock_data { /* A description of the thread which owns the lock. The address of an instance of this structure is used to uniquely identify the thread. */ typedef struct st_thr_lock_info { pthread_t thread; ulong thread_id; ulong n_cursors; } THR_LOCK_INFO; /* Lock owner identifier. Globally identifies the lock owner within the thread and among all the threads. The address of an instance of this structure is used as id. */ typedef struct st_thr_lock_owner { THR_LOCK_INFO *info; } THR_LOCK_OWNER; typedef struct st_thr_lock_data { THR_LOCK_OWNER *owner; struct st_thr_lock_data *next,**prev; struct st_thr_lock *lock; pthread_cond_t *cond; enum thr_lock_type type; ulong thread_id; void *status_param; /* Param to status functions */ void *debug_print_param; } THR_LOCK_DATA; Loading Loading @@ -102,13 +130,18 @@ extern LIST *thr_lock_thread_list; extern pthread_mutex_t THR_LOCK_lock; my_bool init_thr_lock(void); /* Must be called once/thread */ #define thr_lock_owner_init(owner, info_arg) (owner)->info= (info_arg) void thr_lock_info_init(THR_LOCK_INFO *info); void thr_lock_init(THR_LOCK *lock); void thr_lock_delete(THR_LOCK *lock); void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *status_param); int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type); enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type); void thr_unlock(THR_LOCK_DATA *data); int thr_multi_lock(THR_LOCK_DATA **data,uint count); enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_OWNER *owner); void thr_multi_unlock(THR_LOCK_DATA **data,uint count); void thr_abort_locks(THR_LOCK *lock); void thr_abort_locks_for_thread(THR_LOCK *lock, pthread_t thread); Loading
mysys/thr_lock.c +131 −53 Original line number Diff line number Diff line Loading @@ -84,6 +84,7 @@ multiple read locks. my_bool thr_lock_inited=0; ulong locks_immediate = 0L, locks_waited = 0L; ulong table_lock_wait_timeout; enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE; /* The following constants are only for debug output */ Loading @@ -109,25 +110,32 @@ my_bool init_thr_lock() return 0; } static inline my_bool thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs) { return rhs == lhs; } #ifdef EXTRA_DEBUG #define MAX_FOUND_ERRORS 10 /* Report 10 first errors */ static uint found_errors=0; static int check_lock(struct st_lock_list *list, const char* lock_type, const char *where, my_bool same_thread, bool no_cond) const char *where, my_bool same_owner, bool no_cond) { THR_LOCK_DATA *data,**prev; uint count=0; pthread_t first_thread; LINT_INIT(first_thread); THR_LOCK_OWNER *first_owner; LINT_INIT(first_owner); prev= &list->data; if (list->data) { enum thr_lock_type last_lock_type=list->data->type; if (same_thread && list->data) first_thread=list->data->thread; if (same_owner && list->data) first_owner= list->data->owner; for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next) { if (data->type != last_lock_type) Loading @@ -139,7 +147,8 @@ static int check_lock(struct st_lock_list *list, const char* lock_type, count, lock_type, where); return 1; } if (same_thread && ! pthread_equal(data->thread,first_thread) && if (same_owner && !thr_lock_owner_equal(data->owner, first_owner) && last_lock_type != TL_WRITE_ALLOW_WRITE) { fprintf(stderr, Loading Loading @@ -255,8 +264,8 @@ static void check_locks(THR_LOCK *lock, const char *where, } if (lock->read.data) { if (!pthread_equal(lock->write.data->thread, lock->read.data->thread) && if (!thr_lock_owner_equal(lock->write.data->owner, lock->read.data->owner) && ((lock->write.data->type > TL_WRITE_DELAYED && lock->write.data->type != TL_WRITE_ONLY) || ((lock->write.data->type == TL_WRITE_CONCURRENT_INSERT || Loading Loading @@ -330,24 +339,32 @@ void thr_lock_delete(THR_LOCK *lock) DBUG_VOID_RETURN; } void thr_lock_info_init(THR_LOCK_INFO *info) { info->thread= pthread_self(); info->thread_id= my_thread_id(); /* for debugging */ info->n_cursors= 0; } /* Initialize a lock instance */ void thr_lock_data_init(THR_LOCK *lock,THR_LOCK_DATA *data, void *param) { data->lock=lock; data->type=TL_UNLOCK; data->thread=pthread_self(); data->thread_id=my_thread_id(); /* for debugging */ data->owner= 0; /* no owner yet */ data->status_param=param; data->cond=0; } static inline my_bool have_old_read_lock(THR_LOCK_DATA *data,pthread_t thread) static inline my_bool have_old_read_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner) { for ( ; data ; data=data->next) { if ((pthread_equal(data->thread,thread))) if (thr_lock_owner_equal(data->owner, owner)) return 1; /* Already locked by thread */ } return 0; Loading @@ -365,12 +382,16 @@ static inline my_bool have_specific_lock(THR_LOCK_DATA *data, } static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, static enum enum_thr_lock_result wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, my_bool in_wait_list) { pthread_cond_t *cond=get_cond(); struct st_my_thread_var *thread_var= my_thread_var; int result; pthread_cond_t *cond= &thread_var->suspend; struct timeval now; struct timespec wait_timeout; enum enum_thr_lock_result result= THR_LOCK_ABORTED; my_bool can_deadlock= test(data->owner->info->n_cursors); if (!in_wait_list) { Loading @@ -382,31 +403,56 @@ static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, /* Set up control struct to allow others to abort locks */ thread_var->current_mutex= &data->lock->mutex; thread_var->current_cond= cond; data->cond= cond; if (can_deadlock) { gettimeofday(&now, 0); wait_timeout.tv_sec= now.tv_sec + table_lock_wait_timeout; wait_timeout.tv_nsec= now.tv_usec * 1000; } while (!thread_var->abort || in_wait_list) { int rc= can_deadlock ? pthread_cond_timedwait(cond, &data->lock->mutex, &wait_timeout) : pthread_cond_wait(cond, &data->lock->mutex); if (data->cond != cond) /* We must break the wait if one of the following occurs: - the connection has been aborted (!thread_var->abort), but this is not a delayed insert thread (in_wait_list). For a delayed insert thread the proper action at shutdown is, apparently, to acquire the lock and complete the insert. - the lock has been granted (data->cond is set to NULL by the granter), or the waiting has been aborted (additionally data->type is set to TL_UNLOCK). - the wait has timed out (rc == ETIMEDOUT) Order of checks below is important to not report about timeout if the predicate is true. */ if (data->cond == 0) break; if (rc == ETIMEDOUT) { result= THR_LOCK_WAIT_TIMEOUT; break; } } if (data->cond || data->type == TL_UNLOCK) { if (data->cond) /* aborted */ if (data->cond) /* aborted or timed out */ { if (((*data->prev)=data->next)) /* remove from wait-list */ data->next->prev= data->prev; else wait->last=data->prev; } data->type= TL_UNLOCK; /* No lock */ result=1; /* Didn't get lock */ } check_locks(data->lock,"failed wait_for_lock",0); } else { result=0; result= THR_LOCK_SUCCESS; statistic_increment(locks_waited, &THR_LOCK_lock); if (data->lock->get_status) (*data->lock->get_status)(data->status_param, 0); Loading @@ -423,20 +469,24 @@ static my_bool wait_for_lock(struct st_lock_list *wait, THR_LOCK_DATA *data, } int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) enum enum_thr_lock_result thr_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner, enum thr_lock_type lock_type) { THR_LOCK *lock=data->lock; int result=0; enum enum_thr_lock_result result= THR_LOCK_SUCCESS; struct st_lock_list *wait_queue; THR_LOCK_DATA *lock_owner; DBUG_ENTER("thr_lock"); data->next=0; data->cond=0; /* safety */ data->type=lock_type; data->thread=pthread_self(); /* Must be reset ! */ data->thread_id=my_thread_id(); /* Must be reset ! */ data->owner= owner; /* Must be reset ! */ VOID(pthread_mutex_lock(&lock->mutex)); DBUG_PRINT("lock",("data: 0x%lx thread: %ld lock: 0x%lx type: %d", data,data->thread_id,lock,(int) lock_type)); data, data->owner->info->thread_id, lock, (int) lock_type)); check_locks(lock,(uint) lock_type <= (uint) TL_READ_NO_INSERT ? "enter read_lock" : "enter write_lock",0); if ((int) lock_type <= (int) TL_READ_NO_INSERT) Loading @@ -454,8 +504,8 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) */ DBUG_PRINT("lock",("write locked by thread: %ld", lock->write.data->thread_id)); if (pthread_equal(data->thread,lock->write.data->thread) || lock->write.data->owner->info->thread_id)); if (thr_lock_owner_equal(data->owner, lock->write.data->owner) || (lock->write.data->type <= TL_WRITE_DELAYED && (((int) lock_type <= (int) TL_READ_HIGH_PRIORITY) || (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT && Loading @@ -476,14 +526,14 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) { /* We are not allowed to get a READ lock in this case */ data->type=TL_UNLOCK; result=1; /* Can't wait for this one */ result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } } else if (!lock->write_wait.data || lock->write_wait.data->type <= TL_WRITE_LOW_PRIORITY || lock_type == TL_READ_HIGH_PRIORITY || have_old_read_lock(lock->read.data,data->thread)) have_old_read_lock(lock->read.data, data->owner)) { /* No important write-locks */ (*lock->read.last)=data; /* Add to running FIFO */ data->prev=lock->read.last; Loading @@ -496,8 +546,12 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) statistic_increment(locks_immediate,&THR_LOCK_lock); goto end; } /* Can't get lock yet; Wait for it */ DBUG_RETURN(wait_for_lock(&lock->read_wait,data,0)); /* We're here if there is an active write lock or no write lock but a high priority write waiting in the write_wait queue. In the latter case we should yield the lock to the writer. */ wait_queue= &lock->read_wait; } else /* Request for WRITE lock */ { Loading @@ -506,7 +560,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) if (lock->write.data && lock->write.data->type == TL_WRITE_ONLY) { data->type=TL_UNLOCK; result=1; /* Can't wait for this one */ result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } /* Loading Loading @@ -540,7 +594,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) { /* We are not allowed to get a lock in this case */ data->type=TL_UNLOCK; result=1; /* Can't wait for this one */ result= THR_LOCK_ABORTED; /* Can't wait for this one */ goto end; } Loading @@ -549,7 +603,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) TL_WRITE_ALLOW_WRITE, TL_WRITE_ALLOW_READ or TL_WRITE_DELAYED in the same thread, but this will never happen within MySQL. */ if (pthread_equal(data->thread,lock->write.data->thread) || if (thr_lock_owner_equal(data->owner, lock->write.data->owner) || (lock_type == TL_WRITE_ALLOW_WRITE && !lock->write_wait.data && lock->write.data->type == TL_WRITE_ALLOW_WRITE)) Loading @@ -572,7 +626,7 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) goto end; } DBUG_PRINT("lock",("write locked by thread: %ld", lock->write.data->thread_id)); lock->write.data->owner->info->thread_id)); } else { Loading Loading @@ -608,10 +662,24 @@ int thr_lock(THR_LOCK_DATA *data,enum thr_lock_type lock_type) } } DBUG_PRINT("lock",("write locked by thread: %ld, type: %ld", lock->read.data->thread_id,data->type)); lock->read.data->owner->info->thread_id, data->type)); } DBUG_RETURN(wait_for_lock(&lock->write_wait,data,0)); wait_queue= &lock->write_wait; } /* Try to detect a trivial deadlock when using cursors: attempt to lock a table that is already locked by an open cursor within the same connection. lock_owner can be zero if we succumbed to a high priority writer in the write_wait queue. */ lock_owner= lock->read.data ? lock->read.data : lock->write.data; if (lock_owner && lock_owner->owner->info == owner->info) { result= THR_LOCK_DEADLOCK; goto end; } /* Can't get lock yet; Wait for it */ DBUG_RETURN(wait_for_lock(wait_queue, data, 0)); end: pthread_mutex_unlock(&lock->mutex); DBUG_RETURN(result); Loading Loading @@ -656,7 +724,7 @@ static inline void free_all_read_locks(THR_LOCK *lock, lock->read_no_write_count++; } DBUG_PRINT("lock",("giving read lock to thread: %ld", data->thread_id)); data->owner->info->thread_id)); data->cond=0; /* Mark thread free */ VOID(pthread_cond_signal(cond)); } while ((data=data->next)); Loading @@ -674,7 +742,7 @@ void thr_unlock(THR_LOCK_DATA *data) enum thr_lock_type lock_type=data->type; DBUG_ENTER("thr_unlock"); DBUG_PRINT("lock",("data: 0x%lx thread: %ld lock: 0x%lx", data,data->thread_id,lock)); data, data->owner->info->thread_id, lock)); pthread_mutex_lock(&lock->mutex); check_locks(lock,"start of release lock",0); Loading Loading @@ -734,7 +802,7 @@ void thr_unlock(THR_LOCK_DATA *data) (*lock->check_status)(data->status_param)) data->type=TL_WRITE; /* Upgrade lock */ DBUG_PRINT("lock",("giving write lock of type %d to thread: %ld", data->type,data->thread_id)); data->type, data->owner->info->thread_id)); { pthread_cond_t *cond=data->cond; data->cond=0; /* Mark thread free */ Loading Loading @@ -842,7 +910,8 @@ static void sort_locks(THR_LOCK_DATA **data,uint count) } int thr_multi_lock(THR_LOCK_DATA **data,uint count) enum enum_thr_lock_result thr_multi_lock(THR_LOCK_DATA **data, uint count, THR_LOCK_OWNER *owner) { THR_LOCK_DATA **pos,**end; DBUG_ENTER("thr_multi_lock"); Loading @@ -852,10 +921,11 @@ int thr_multi_lock(THR_LOCK_DATA **data,uint count) /* lock everything */ for (pos=data,end=data+count; pos < end ; pos++) { if (thr_lock(*pos,(*pos)->type)) enum enum_thr_lock_result result= thr_lock(*pos, owner, (*pos)->type); if (result != THR_LOCK_SUCCESS) { /* Aborted */ thr_multi_unlock(data,(uint) (pos-data)); DBUG_RETURN(1); DBUG_RETURN(result); } #ifdef MAIN printf("Thread: %s Got lock: 0x%lx type: %d\n",my_thread_name(), Loading Loading @@ -909,7 +979,7 @@ int thr_multi_lock(THR_LOCK_DATA **data,uint count) } while (pos != data); } #endif DBUG_RETURN(0); DBUG_RETURN(THR_LOCK_SUCCESS); } /* free all locks */ Loading @@ -932,7 +1002,7 @@ void thr_multi_unlock(THR_LOCK_DATA **data,uint count) else { DBUG_PRINT("lock",("Free lock: data: 0x%lx thread: %ld lock: 0x%lx", *pos,(*pos)->thread_id,(*pos)->lock)); *pos, (*pos)->owner->info->thread_id, (*pos)->lock)); } } DBUG_VOID_RETURN; Loading @@ -952,6 +1022,7 @@ void thr_abort_locks(THR_LOCK *lock) for (data=lock->read_wait.data; data ; data=data->next) { data->type=TL_UNLOCK; /* Mark killed */ /* It's safe to signal the cond first: we're still holding the mutex. */ pthread_cond_signal(data->cond); data->cond=0; /* Removed from list */ } Loading Loading @@ -985,10 +1056,11 @@ void thr_abort_locks_for_thread(THR_LOCK *lock, pthread_t thread) pthread_mutex_lock(&lock->mutex); for (data= lock->read_wait.data; data ; data= data->next) { if (pthread_equal(thread, data->thread)) if (pthread_equal(thread, data->owner->info->thread)) { DBUG_PRINT("info",("Aborting read-wait lock")); data->type= TL_UNLOCK; /* Mark killed */ /* It's safe to signal the cond first: we're still holding the mutex. */ pthread_cond_signal(data->cond); data->cond= 0; /* Removed from list */ Loading @@ -1000,7 +1072,7 @@ void thr_abort_locks_for_thread(THR_LOCK *lock, pthread_t thread) } for (data= lock->write_wait.data; data ; data= data->next) { if (pthread_equal(thread, data->thread)) if (pthread_equal(thread, data->owner->info->thread)) { DBUG_PRINT("info",("Aborting write-wait lock")); data->type= TL_UNLOCK; Loading Loading @@ -1117,7 +1189,8 @@ static void thr_print_lock(const char* name,struct st_lock_list *list) prev= &list->data; for (data=list->data; data && count++ < MAX_LOCKS ; data=data->next) { printf("0x%lx (%lu:%d); ",(ulong) data,data->thread_id,(int) data->type); printf("0x%lx (%lu:%d); ", (ulong) data, data->owner->info->thread_id, (int) data->type); if (data->prev != prev) printf("\nWarning: prev didn't point at previous lock\n"); prev= &data->next; Loading Loading @@ -1250,11 +1323,16 @@ static void *test_thread(void *arg) { int i,j,param=*((int*) arg); THR_LOCK_DATA data[MAX_LOCK_COUNT]; THR_LOCK_OWNER owner; THR_LOCK_INFO lock_info; THR_LOCK_DATA *multi_locks[MAX_LOCK_COUNT]; my_thread_init(); printf("Thread %s (%d) started\n",my_thread_name(),param); fflush(stdout); thr_lock_info_init(&lock_info); thr_lock_owner_init(&owner, &lock_info); for (i=0; i < lock_counts[param] ; i++) thr_lock_data_init(locks+tests[param][i].lock_nr,data+i,NULL); for (j=1 ; j < 10 ; j++) /* try locking 10 times */ Loading @@ -1264,7 +1342,7 @@ static void *test_thread(void *arg) multi_locks[i]= &data[i]; data[i].type= tests[param][i].lock_type; } thr_multi_lock(multi_locks,lock_counts[param]); thr_multi_lock(multi_locks, lock_counts[param], &owner); pthread_mutex_lock(&LOCK_thread_count); { int tmp=rand() & 7; /* Do something from 0-2 sec */ Loading
sql/examples/ha_archive.cc +11 −1 Original line number Diff line number Diff line Loading @@ -149,7 +149,8 @@ static handlerton archive_hton = { 0, /* prepare */ 0, /* recover */ 0, /* commit_by_xid */ 0 /* rollback_by_xid */ 0, /* rollback_by_xid */ HTON_NO_FLAGS }; Loading Loading @@ -208,6 +209,15 @@ bool archive_db_end() return FALSE; } ha_archive::ha_archive(TABLE *table_arg) :handler(&archive_hton, table_arg), delayed_insert(0), bulk_insert(0) { /* Set our original buffer from pre-allocated memory */ buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); /* The size of the offset value we will use for position() */ ref_length = sizeof(z_off_t); } /* This method reads the header of a datafile and returns whether or not it was successful. Loading
sql/examples/ha_archive.h +1 −8 Original line number Diff line number Diff line Loading @@ -58,14 +58,7 @@ class ha_archive: public handler bool bulk_insert; /* If we are performing a bulk insert */ public: ha_archive(TABLE *table): handler(table), delayed_insert(0), bulk_insert(0) { /* Set our original buffer from pre-allocated memory */ buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); /* The size of the offset value we will use for position() */ ref_length = sizeof(z_off_t); } ha_archive(TABLE *table_arg); ~ha_archive() { } Loading
sql/examples/ha_example.cc +22 −0 Original line number Diff line number Diff line Loading @@ -72,6 +72,24 @@ #ifdef HAVE_EXAMPLE_DB #include "ha_example.h" static handlerton example_hton= { "CSV", 0, /* slot */ 0, /* savepoint size. */ 0, /* close_connection */ 0, /* savepoint */ 0, /* rollback to savepoint */ 0, /* release savepoint */ 0, /* commit */ 0, /* rollback */ 0, /* prepare */ 0, /* recover */ 0, /* commit_by_xid */ 0, /* rollback_by_xid */ HTON_NO_FLAGS }; /* Variables for example share methods */ static HASH example_open_tables; // Hash used to track open tables pthread_mutex_t example_mutex; // This is the mutex we use to init the hash Loading Loading @@ -179,6 +197,10 @@ static int free_share(EXAMPLE_SHARE *share) } ha_example::ha_example(TABLE *table_arg) :handler(&example_hton, table_arg) {} /* If frm_error() is called then we will use this to to find out what file extentions exist for the storage engine. This is also used by the default rename_table and Loading