Loading .bzrignore +2 −0 Original line number Diff line number Diff line Loading @@ -424,3 +424,5 @@ vio/test-ssl vio/test-sslclient vio/test-sslserver vio/viotest-ssl mysys/#mf_iocache.c# mysys/test_io_cache include/my_sys.h +18 −2 Original line number Diff line number Diff line Loading @@ -293,6 +293,16 @@ typedef struct st_dynamic_string { struct st_io_cache; typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); #ifdef THREAD #define lock_append_buffer(info) \ pthread_mutex_lock(&(info)->append_buffer_lock) #define unlock_append_buffer(info) \ pthread_mutex_unlock(&(info)->append_buffer_lock) #else #define lock_append_buffer(info) #define unlock_append_buffer(info) #endif typedef struct st_io_cache /* Used when cacheing files */ { my_off_t pos_in_file,end_of_file; Loading @@ -301,7 +311,7 @@ typedef struct st_io_cache /* Used when cacheing files */ that will use a buffer allocated somewhere else */ byte *append_buffer, *append_pos, *append_end; byte *append_buffer, *append_read_pos, *append_write_pos, *append_end; /* for append buffer used in READ_APPEND cache */ #ifdef THREAD pthread_mutex_t append_buffer_lock; Loading Loading @@ -348,10 +358,15 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); _my_b_get(info)) #define my_b_write(info,Buffer,Count) \ ((info)->type != SEQ_READ_APPEND) ? (\ ((info)->rc_pos + (Count) <= (info)->rc_end ?\ (memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \ ((info)->rc_pos+=(Count)),0) :\ _my_b_write(info,Buffer,Count)) _my_b_write(info,Buffer,Count))) : \ ((info)->append_write_pos + (Count) <= (info)->append_end ?\ (memcpy((info)->append_write_pos,Buffer,(size_t)Count), \ ((info)->append_write_pos+=(Count),0)) : \ _my_b_append(info,Buffer,Count)) /* my_b_write_byte dosn't have any err-check */ #define my_b_write_byte(info,chr) \ Loading Loading @@ -564,6 +579,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_get(IO_CACHE *info); extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_block_write(IO_CACHE *info, const byte *Buffer, uint Count, my_off_t pos); extern int flush_io_cache(IO_CACHE *info); Loading mysys/Makefile.am +4 −0 Original line number Diff line number Diff line Loading @@ -95,6 +95,10 @@ test_vsnprintf: my_vsnprintf.c $(LIBRARIES) $(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c $(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS) $(RM) -f test_vsnprintf.* test_io_cache: mf_iocache.c $(LIBRARIES) $(CP) $(srcdir)/mf_iocache.c test_io_cache.c $(LINK) $(FLAGS) -DMAIN ./test_io_cache.c $(LDADD) $(LIBS) $(RM) -f test_io_cache.* test_dir: test_dir.c $(LIBRARIES) $(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS) Loading mysys/mf_iocache.c +195 −22 Original line number Diff line number Diff line Loading @@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result); #include <assert.h> #include <errno.h> #ifdef MAIN #include <my_dir.h> #endif static void init_read_function(IO_CACHE* info, enum cache_type type); static void init_read_function(IO_CACHE* info, enum cache_type type) Loading Loading @@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->rc_request_pos=info->rc_pos=info->buffer; if (type == SEQ_READ_APPEND) { info->append_pos = info->append_buffer; info->append_read_pos = info->append_write_pos = info->append_buffer; info->append_end = info->append_buffer + info->buffer_length; #ifdef THREAD pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); Loading Loading @@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ~(my_off_t) 0); } } if (info->type == SEQ_READ_APPEND) { info->append_read_pos = info->append_write_pos = info->append_buffer; } info->type=type; info->error=0; init_read_function(info,type); Loading @@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, info->inited=0; #endif DBUG_RETURN(0); } /* init_io_cache */ } /* reinit_io_cache */ Loading Loading @@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) return 0; } /* Do sequential read from the SEQ_READ_APPEND cache we do this in three stages: - first read from info->buffer - then if there are still data to read, try the file descriptor - afterwards, if there are still data to read, try append buffer */ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) { uint length,diff_length,left_length; uint length,diff_length,left_length,save_count; my_off_t max_length, pos_in_file; save_count=Count; /* first, read the regular buffer */ if ((left_length=(uint) (info->rc_end-info->rc_pos))) { dbug_assert(Count >= left_length); /* User is not using my_b_read() */ Loading @@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) Count-=left_length; } /* pos_in_file always point on where info->buffer was read */ pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >= info->end_of_file) { info->pos_in_file=pos_in_file; goto read_append_buffer; } /* no need to seek since the read is guaranteed to be sequential */ diff_length=(uint) (pos_in_file & (IO_SIZE-1)); #ifdef THREAD pthread_mutex_lock(&info->append_buffer_lock); #endif #ifdef THREAD pthread_mutex_unlock(&info->append_buffer_lock); #endif /* now the second stage begins - read from file descriptor */ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; if (info->end_of_file == pos_in_file) { /* End of file */ info->error=(int) left_length; return 1; goto read_append_buffer; } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) != (uint) length) { info->error= read_length == (uint) -1 ? -1 : (int) (read_length+left_length); return 1; if (read_length != (uint)-1) { Count -= read_length; Buffer += read_length; } goto read_append_buffer; } Count-=length; Buffer+=length; Loading @@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) diff_length=0; } max_length=info->read_length-diff_length; if (info->type != READ_FIFO && (info->end_of_file - pos_in_file) < max_length) if ((info->end_of_file - pos_in_file) < max_length) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { info->error= left_length; /* We only got this many char */ return 1; goto read_append_buffer; } length=0; /* Didn't read any chars */ } Loading @@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) length == (uint) -1) { if (length != (uint) -1) { memcpy(Buffer,info->buffer,(size_t) length); info->error= length == (uint) -1 ? -1 : (int) (length+left_length); return 1; Count -= length; Buffer += length; } goto read_append_buffer; } info->rc_pos=info->buffer+Count; info->rc_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; read_append_buffer: lock_append_buffer(info); if (!Count) return 0; { uint copy_len = (uint)(info->append_read_pos - info->append_write_pos); dbug_assert(info->append_read_pos <= info->append_write_pos); if (copy_len > Count) copy_len = Count; memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; Count -= copy_len; if (Count) info->error = save_count - Count; } unlock_append_buffer(info); return Count ? 1 : 0; } #ifdef HAVE_AIOWAIT Loading Loading @@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) return 0; } int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; rest_length=(uint) (info->append_end - info->append_write_pos); memcpy(info->append_write_pos,Buffer,(size_t) rest_length); Buffer+=rest_length; Count-=rest_length; info->append_write_pos+=rest_length; if (flush_io_cache(info)) return 1; if (Count >= IO_SIZE) { /* Fill first intern buffer */ length=Count & (uint) ~(IO_SIZE-1); if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) return info->error= -1; Count-=length; Buffer+=length; } memcpy(info->append_write_pos,Buffer,(size_t) Count); info->append_write_pos+=Count; return 0; } /* Write a block to disk where part of the data may be inside the record Loading Loading @@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info) DBUG_RETURN(0); } } else if (info->type == SEQ_READ_APPEND) { if (info->file == -1) { if (real_open_cached_file(info)) DBUG_RETURN((info->error= -1)); } lock_append_buffer(info); if (info->append_write_pos != info->append_buffer) { length=(uint) (info->append_write_pos - info->append_buffer); info->append_read_pos=info->append_write_pos=info->append_buffer; info->append_end=(info->append_buffer+info->buffer_length- (info->pos_in_file & (IO_SIZE-1))); if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) { unlock_append_buffer(info); DBUG_RETURN((info->error= -1)); } unlock_append_buffer(info); DBUG_RETURN(0); } unlock_append_buffer(info); } #ifdef HAVE_AIOWAIT else if (info->type != READ_NET) { Loading Loading @@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info) DBUG_RETURN(error); } /* end_io_cache */ #ifdef MAIN void die(const char* fmt, ...) { va_list va_args; va_start(va_args,fmt); fprintf(stderr,"Error:"); vfprintf(stderr, fmt,va_args); fprintf(stderr,", errno=%d\n", errno); exit(1); } int open_file(const char* fname, IO_CACHE* info, int cache_size) { int fd; if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0) die("Could not open %s", fname); if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME))) die("failed in init_io_cache()"); return fd; } void close_file(IO_CACHE* info) { end_io_cache(info); my_close(info->file, MYF(MY_WME)); } int main(int argc, char** argv) { IO_CACHE sra_cache; /* SEQ_READ_APPEND */ MY_STAT status; const char* fname="/tmp/iocache.test"; int cache_size=16384; char llstr_buf[22]; int max_block,total_bytes=0; int i,num_loops=100,error=0; char *p; char* block, *block_end; MY_INIT(argv[0]); max_block = cache_size*3; if (!(block=(char*)my_malloc(max_block,MYF(MY_WME)))) die("Not enough memory to allocate test block"); block_end = block + max_block; for (p = block,i=0; p < block_end;i++) { *p++ = (char)i; } if (my_stat(fname,&status, MYF(0)) && my_delete(fname,MYF(MY_WME))) { die("Delete of %s failed, aborting", fname); } open_file(fname,&sra_cache, cache_size); for (i = 0; i < num_loops; i++) { char buf[4]; int block_size = abs(rand() % max_block); int4store(buf, block_size); if (my_b_write(&sra_cache,buf,4) || my_b_write(&sra_cache, block, block_size)) die("write failed"); total_bytes += 4+block_size; } close_file(&sra_cache); my_free(block,MYF(MY_WME)); if (!my_stat(fname,&status,MYF(MY_WME))) die("%s failed to stat, but I had just closed it,\ wonder how that happened"); printf("Final size of %s is %s, wrote %d bytes\n",fname, llstr(status.st_size,llstr_buf), total_bytes); my_delete(fname, MYF(MY_WME)); /* check correctness of tests */ if (total_bytes != status.st_size) { fprintf(stderr,"Not the same number of bytes acutally in file as bytes \ supposedly written\n"); error=1; } exit(error); return 0; } #endif sql/mysqld.cc +60 −11 Original line number Diff line number Diff line Loading @@ -45,6 +45,11 @@ char pstack_file_name[80]; #endif /* __linux__ */ #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) #define HAVE_CLOSE_SERVER_SOCK 1 void close_server_sock(); #endif extern "C" { // Because of SCO 3.2V4.2 #include <errno.h> #include <sys/stat.h> Loading Loading @@ -453,16 +458,7 @@ static void close_connections(void) sql_print_error("Got error %d from pthread_cond_timedwait",error); #endif #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) if (ip_sock != INVALID_SOCKET) { DBUG_PRINT("error",("closing TCP/IP and socket files")); VOID(shutdown(ip_sock,2)); VOID(closesocket(ip_sock)); VOID(shutdown(unix_sock,2)); VOID(closesocket(unix_sock)); VOID(unlink(mysql_unix_port)); ip_sock=unix_sock= INVALID_SOCKET; } close_server_sock(); #endif } (void) pthread_mutex_unlock(&LOCK_thread_count); Loading Loading @@ -577,10 +573,37 @@ static void close_connections(void) DBUG_VOID_RETURN; } #ifdef HAVE_CLOSE_SERVER_SOCK void close_server_sock() { DBUG_ENTER("close_server_sock"); if (ip_sock != INVALID_SOCKET) { DBUG_PRINT("info",("closing TCP/IP socket")); VOID(shutdown(ip_sock,2)); VOID(closesocket(ip_sock)); ip_sock=INVALID_SOCKET; } if (unix_sock != INVALID_SOCKET) { DBUG_PRINT("info",("closing Unix socket")); VOID(shutdown(unix_sock,2)); VOID(closesocket(unix_sock)); VOID(unlink(mysql_unix_port)); unix_sock=INVALID_SOCKET; } DBUG_VOID_RETURN; } #endif void kill_mysql(void) { DBUG_ENTER("kill_mysql"); #ifdef SIGNALS_DONT_BREAK_READ close_server_sock(); /* force accept to wake up */ #endif #if defined(__WIN__) { if (!SetEvent(hEventShutdown)) Loading @@ -604,6 +627,7 @@ void kill_mysql(void) #endif DBUG_PRINT("quit",("After pthread_kill")); shutdown_in_progress=1; // Safety if kill didn't work abort_loop=1; DBUG_VOID_RETURN; } Loading Loading @@ -2023,6 +2047,7 @@ The server will not act as a slave."); sql_print_error("Before Lock_thread_count"); #endif (void) pthread_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("Got thread_count mutex")); select_thread_in_use=0; // For close_connections (void) pthread_cond_broadcast(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); Loading Loading @@ -2054,10 +2079,14 @@ The server will not act as a slave."); #endif /* HAVE_OPENSSL */ /* Wait until cleanup is done */ (void) pthread_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("Got thread_count mutex for clean up wait")); while (!ready_to_exit) { DBUG_PRINT("quit", ("not yet ready to exit")); pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); } DBUG_PRINT("quit", ("ready to exit")); (void) pthread_mutex_unlock(&LOCK_thread_count); my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); exit(0); Loading Loading @@ -2253,6 +2282,20 @@ static void create_new_thread(THD *thd) DBUG_VOID_RETURN; } #ifdef SIGNALS_DONT_BREAK_READ inline void kill_broken_server() { /* hack to get around signals ignored in syscalls for problem OS's */ if (unix_sock == INVALID_SOCKET || ip_sock ==INVALID_SOCKET) { select_thread_in_use = 0; kill_server((void*)MYSQL_KILL_SIGNAL); /* never returns */ } } #define MAYBE_BROKEN_SYSCALL kill_broken_server(); #else #define MAYBE_BROKEN_SYSCALL #endif /* Handle new connections and spawn new process to handle them */ Loading Loading @@ -2288,6 +2331,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) #endif DBUG_PRINT("general",("Waiting for connections.")); MAYBE_BROKEN_SYSCALL; while (!abort_loop) { readFDs=clientFDs; Loading @@ -2302,12 +2346,15 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (!select_errors++ && !abort_loop) /* purecov: inspected */ sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */ } MAYBE_BROKEN_SYSCALL continue; } #endif /* HPUX */ if (abort_loop) { MAYBE_BROKEN_SYSCALL; break; } /* ** Is this a new connection request */ Loading Loading @@ -2343,6 +2390,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (new_sock != INVALID_SOCKET || (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN)) break; MAYBE_BROKEN_SYSCALL; #if !defined(NO_FCNTL_NONBLOCK) if (!(test_flags & TEST_BLOCKING)) { Loading @@ -2359,6 +2407,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) { if ((error_count++ & 255) == 0) // This can happen often sql_perror("Error in accept"); MAYBE_BROKEN_SYSCALL; if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE) sleep(1); // Give other threads some time continue; Loading Loading
.bzrignore +2 −0 Original line number Diff line number Diff line Loading @@ -424,3 +424,5 @@ vio/test-ssl vio/test-sslclient vio/test-sslserver vio/viotest-ssl mysys/#mf_iocache.c# mysys/test_io_cache
include/my_sys.h +18 −2 Original line number Diff line number Diff line Loading @@ -293,6 +293,16 @@ typedef struct st_dynamic_string { struct st_io_cache; typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*); #ifdef THREAD #define lock_append_buffer(info) \ pthread_mutex_lock(&(info)->append_buffer_lock) #define unlock_append_buffer(info) \ pthread_mutex_unlock(&(info)->append_buffer_lock) #else #define lock_append_buffer(info) #define unlock_append_buffer(info) #endif typedef struct st_io_cache /* Used when cacheing files */ { my_off_t pos_in_file,end_of_file; Loading @@ -301,7 +311,7 @@ typedef struct st_io_cache /* Used when cacheing files */ that will use a buffer allocated somewhere else */ byte *append_buffer, *append_pos, *append_end; byte *append_buffer, *append_read_pos, *append_write_pos, *append_end; /* for append buffer used in READ_APPEND cache */ #ifdef THREAD pthread_mutex_t append_buffer_lock; Loading Loading @@ -348,10 +358,15 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *); _my_b_get(info)) #define my_b_write(info,Buffer,Count) \ ((info)->type != SEQ_READ_APPEND) ? (\ ((info)->rc_pos + (Count) <= (info)->rc_end ?\ (memcpy((info)->rc_pos,Buffer,(size_t) (Count)), \ ((info)->rc_pos+=(Count)),0) :\ _my_b_write(info,Buffer,Count)) _my_b_write(info,Buffer,Count))) : \ ((info)->append_write_pos + (Count) <= (info)->append_end ?\ (memcpy((info)->append_write_pos,Buffer,(size_t)Count), \ ((info)->append_write_pos+=(Count),0)) : \ _my_b_append(info,Buffer,Count)) /* my_b_write_byte dosn't have any err-check */ #define my_b_write_byte(info,chr) \ Loading Loading @@ -564,6 +579,7 @@ extern int _my_b_net_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_get(IO_CACHE *info); extern int _my_b_async_read(IO_CACHE *info,byte *Buffer,uint Count); extern int _my_b_write(IO_CACHE *info,const byte *Buffer,uint Count); extern int _my_b_append(IO_CACHE *info,const byte *Buffer,uint Count); extern int my_block_write(IO_CACHE *info, const byte *Buffer, uint Count, my_off_t pos); extern int flush_io_cache(IO_CACHE *info); Loading
mysys/Makefile.am +4 −0 Original line number Diff line number Diff line Loading @@ -95,6 +95,10 @@ test_vsnprintf: my_vsnprintf.c $(LIBRARIES) $(CP) $(srcdir)/my_vsnprintf.c test_vsnprintf.c $(LINK) $(FLAGS) -DMAIN ./test_vsnprintf.c $(LDADD) $(LIBS) $(RM) -f test_vsnprintf.* test_io_cache: mf_iocache.c $(LIBRARIES) $(CP) $(srcdir)/mf_iocache.c test_io_cache.c $(LINK) $(FLAGS) -DMAIN ./test_io_cache.c $(LDADD) $(LIBS) $(RM) -f test_io_cache.* test_dir: test_dir.c $(LIBRARIES) $(LINK) $(FLAGS) -DMAIN $(srcdir)/test_dir.c $(LDADD) $(LIBS) Loading
mysys/mf_iocache.c +195 −22 Original line number Diff line number Diff line Loading @@ -41,6 +41,10 @@ static void my_aiowait(my_aio_result *result); #include <assert.h> #include <errno.h> #ifdef MAIN #include <my_dir.h> #endif static void init_read_function(IO_CACHE* info, enum cache_type type); static void init_read_function(IO_CACHE* info, enum cache_type type) Loading Loading @@ -152,7 +156,7 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, info->rc_request_pos=info->rc_pos=info->buffer; if (type == SEQ_READ_APPEND) { info->append_pos = info->append_buffer; info->append_read_pos = info->append_write_pos = info->append_buffer; info->append_end = info->append_buffer + info->buffer_length; #ifdef THREAD pthread_mutex_init(&info->append_buffer_lock,MY_MUTEX_INIT_FAST); Loading Loading @@ -277,6 +281,10 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ~(my_off_t) 0); } } if (info->type == SEQ_READ_APPEND) { info->append_read_pos = info->append_write_pos = info->append_buffer; } info->type=type; info->error=0; init_read_function(info,type); Loading @@ -294,7 +302,7 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, info->inited=0; #endif DBUG_RETURN(0); } /* init_io_cache */ } /* reinit_io_cache */ Loading Loading @@ -377,11 +385,19 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) return 0; } /* Do sequential read from the SEQ_READ_APPEND cache we do this in three stages: - first read from info->buffer - then if there are still data to read, try the file descriptor - afterwards, if there are still data to read, try append buffer */ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) { uint length,diff_length,left_length; uint length,diff_length,left_length,save_count; my_off_t max_length, pos_in_file; save_count=Count; /* first, read the regular buffer */ if ((left_length=(uint) (info->rc_end-info->rc_pos))) { dbug_assert(Count >= left_length); /* User is not using my_b_read() */ Loading @@ -390,30 +406,33 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) Count-=left_length; } /* pos_in_file always point on where info->buffer was read */ pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer); if ((pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer)) >= info->end_of_file) { info->pos_in_file=pos_in_file; goto read_append_buffer; } /* no need to seek since the read is guaranteed to be sequential */ diff_length=(uint) (pos_in_file & (IO_SIZE-1)); #ifdef THREAD pthread_mutex_lock(&info->append_buffer_lock); #endif #ifdef THREAD pthread_mutex_unlock(&info->append_buffer_lock); #endif /* now the second stage begins - read from file descriptor */ if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length))) { /* Fill first intern buffer */ uint read_length; if (info->end_of_file == pos_in_file) { /* End of file */ info->error=(int) left_length; return 1; goto read_append_buffer; } length=(Count & (uint) ~(IO_SIZE-1))-diff_length; if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags)) != (uint) length) { info->error= read_length == (uint) -1 ? -1 : (int) (read_length+left_length); return 1; if (read_length != (uint)-1) { Count -= read_length; Buffer += read_length; } goto read_append_buffer; } Count-=length; Buffer+=length; Loading @@ -422,15 +441,13 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) diff_length=0; } max_length=info->read_length-diff_length; if (info->type != READ_FIFO && (info->end_of_file - pos_in_file) < max_length) if ((info->end_of_file - pos_in_file) < max_length) max_length = info->end_of_file - pos_in_file; if (!max_length) { if (Count) { info->error= left_length; /* We only got this many char */ return 1; goto read_append_buffer; } length=0; /* Didn't read any chars */ } Loading @@ -439,15 +456,36 @@ int _my_b_seq_read(register IO_CACHE *info, byte *Buffer, uint Count) length == (uint) -1) { if (length != (uint) -1) { memcpy(Buffer,info->buffer,(size_t) length); info->error= length == (uint) -1 ? -1 : (int) (length+left_length); return 1; Count -= length; Buffer += length; } goto read_append_buffer; } info->rc_pos=info->buffer+Count; info->rc_end=info->buffer+length; info->pos_in_file=pos_in_file; memcpy(Buffer,info->buffer,(size_t) Count); return 0; read_append_buffer: lock_append_buffer(info); if (!Count) return 0; { uint copy_len = (uint)(info->append_read_pos - info->append_write_pos); dbug_assert(info->append_read_pos <= info->append_write_pos); if (copy_len > Count) copy_len = Count; memcpy(Buffer, info->append_read_pos, copy_len); info->append_read_pos += copy_len; Count -= copy_len; if (Count) info->error = save_count - Count; } unlock_append_buffer(info); return Count ? 1 : 0; } #ifdef HAVE_AIOWAIT Loading Loading @@ -672,6 +710,31 @@ int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count) return 0; } int _my_b_append(register IO_CACHE *info, const byte *Buffer, uint Count) { uint rest_length,length; rest_length=(uint) (info->append_end - info->append_write_pos); memcpy(info->append_write_pos,Buffer,(size_t) rest_length); Buffer+=rest_length; Count-=rest_length; info->append_write_pos+=rest_length; if (flush_io_cache(info)) return 1; if (Count >= IO_SIZE) { /* Fill first intern buffer */ length=Count & (uint) ~(IO_SIZE-1); if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP)) return info->error= -1; Count-=length; Buffer+=length; } memcpy(info->append_write_pos,Buffer,(size_t) Count); info->append_write_pos+=Count; return 0; } /* Write a block to disk where part of the data may be inside the record Loading Loading @@ -756,6 +819,30 @@ int flush_io_cache(IO_CACHE *info) DBUG_RETURN(0); } } else if (info->type == SEQ_READ_APPEND) { if (info->file == -1) { if (real_open_cached_file(info)) DBUG_RETURN((info->error= -1)); } lock_append_buffer(info); if (info->append_write_pos != info->append_buffer) { length=(uint) (info->append_write_pos - info->append_buffer); info->append_read_pos=info->append_write_pos=info->append_buffer; info->append_end=(info->append_buffer+info->buffer_length- (info->pos_in_file & (IO_SIZE-1))); if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP)) { unlock_append_buffer(info); DBUG_RETURN((info->error= -1)); } unlock_append_buffer(info); DBUG_RETURN(0); } unlock_append_buffer(info); } #ifdef HAVE_AIOWAIT else if (info->type != READ_NET) { Loading Loading @@ -784,3 +871,89 @@ int end_io_cache(IO_CACHE *info) DBUG_RETURN(error); } /* end_io_cache */ #ifdef MAIN void die(const char* fmt, ...) { va_list va_args; va_start(va_args,fmt); fprintf(stderr,"Error:"); vfprintf(stderr, fmt,va_args); fprintf(stderr,", errno=%d\n", errno); exit(1); } int open_file(const char* fname, IO_CACHE* info, int cache_size) { int fd; if ((fd=my_open(fname,O_CREAT|O_APPEND|O_RDWR,MYF(MY_WME))) < 0) die("Could not open %s", fname); if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME))) die("failed in init_io_cache()"); return fd; } void close_file(IO_CACHE* info) { end_io_cache(info); my_close(info->file, MYF(MY_WME)); } int main(int argc, char** argv) { IO_CACHE sra_cache; /* SEQ_READ_APPEND */ MY_STAT status; const char* fname="/tmp/iocache.test"; int cache_size=16384; char llstr_buf[22]; int max_block,total_bytes=0; int i,num_loops=100,error=0; char *p; char* block, *block_end; MY_INIT(argv[0]); max_block = cache_size*3; if (!(block=(char*)my_malloc(max_block,MYF(MY_WME)))) die("Not enough memory to allocate test block"); block_end = block + max_block; for (p = block,i=0; p < block_end;i++) { *p++ = (char)i; } if (my_stat(fname,&status, MYF(0)) && my_delete(fname,MYF(MY_WME))) { die("Delete of %s failed, aborting", fname); } open_file(fname,&sra_cache, cache_size); for (i = 0; i < num_loops; i++) { char buf[4]; int block_size = abs(rand() % max_block); int4store(buf, block_size); if (my_b_write(&sra_cache,buf,4) || my_b_write(&sra_cache, block, block_size)) die("write failed"); total_bytes += 4+block_size; } close_file(&sra_cache); my_free(block,MYF(MY_WME)); if (!my_stat(fname,&status,MYF(MY_WME))) die("%s failed to stat, but I had just closed it,\ wonder how that happened"); printf("Final size of %s is %s, wrote %d bytes\n",fname, llstr(status.st_size,llstr_buf), total_bytes); my_delete(fname, MYF(MY_WME)); /* check correctness of tests */ if (total_bytes != status.st_size) { fprintf(stderr,"Not the same number of bytes acutally in file as bytes \ supposedly written\n"); error=1; } exit(error); return 0; } #endif
sql/mysqld.cc +60 −11 Original line number Diff line number Diff line Loading @@ -45,6 +45,11 @@ char pstack_file_name[80]; #endif /* __linux__ */ #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) #define HAVE_CLOSE_SERVER_SOCK 1 void close_server_sock(); #endif extern "C" { // Because of SCO 3.2V4.2 #include <errno.h> #include <sys/stat.h> Loading Loading @@ -453,16 +458,7 @@ static void close_connections(void) sql_print_error("Got error %d from pthread_cond_timedwait",error); #endif #if defined(HAVE_DEC_3_2_THREADS) || defined(SIGNALS_DONT_BREAK_READ) if (ip_sock != INVALID_SOCKET) { DBUG_PRINT("error",("closing TCP/IP and socket files")); VOID(shutdown(ip_sock,2)); VOID(closesocket(ip_sock)); VOID(shutdown(unix_sock,2)); VOID(closesocket(unix_sock)); VOID(unlink(mysql_unix_port)); ip_sock=unix_sock= INVALID_SOCKET; } close_server_sock(); #endif } (void) pthread_mutex_unlock(&LOCK_thread_count); Loading Loading @@ -577,10 +573,37 @@ static void close_connections(void) DBUG_VOID_RETURN; } #ifdef HAVE_CLOSE_SERVER_SOCK void close_server_sock() { DBUG_ENTER("close_server_sock"); if (ip_sock != INVALID_SOCKET) { DBUG_PRINT("info",("closing TCP/IP socket")); VOID(shutdown(ip_sock,2)); VOID(closesocket(ip_sock)); ip_sock=INVALID_SOCKET; } if (unix_sock != INVALID_SOCKET) { DBUG_PRINT("info",("closing Unix socket")); VOID(shutdown(unix_sock,2)); VOID(closesocket(unix_sock)); VOID(unlink(mysql_unix_port)); unix_sock=INVALID_SOCKET; } DBUG_VOID_RETURN; } #endif void kill_mysql(void) { DBUG_ENTER("kill_mysql"); #ifdef SIGNALS_DONT_BREAK_READ close_server_sock(); /* force accept to wake up */ #endif #if defined(__WIN__) { if (!SetEvent(hEventShutdown)) Loading @@ -604,6 +627,7 @@ void kill_mysql(void) #endif DBUG_PRINT("quit",("After pthread_kill")); shutdown_in_progress=1; // Safety if kill didn't work abort_loop=1; DBUG_VOID_RETURN; } Loading Loading @@ -2023,6 +2047,7 @@ The server will not act as a slave."); sql_print_error("Before Lock_thread_count"); #endif (void) pthread_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("Got thread_count mutex")); select_thread_in_use=0; // For close_connections (void) pthread_cond_broadcast(&COND_thread_count); (void) pthread_mutex_unlock(&LOCK_thread_count); Loading Loading @@ -2054,10 +2079,14 @@ The server will not act as a slave."); #endif /* HAVE_OPENSSL */ /* Wait until cleanup is done */ (void) pthread_mutex_lock(&LOCK_thread_count); DBUG_PRINT("quit", ("Got thread_count mutex for clean up wait")); while (!ready_to_exit) { DBUG_PRINT("quit", ("not yet ready to exit")); pthread_cond_wait(&COND_thread_count,&LOCK_thread_count); } DBUG_PRINT("quit", ("ready to exit")); (void) pthread_mutex_unlock(&LOCK_thread_count); my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0); exit(0); Loading Loading @@ -2253,6 +2282,20 @@ static void create_new_thread(THD *thd) DBUG_VOID_RETURN; } #ifdef SIGNALS_DONT_BREAK_READ inline void kill_broken_server() { /* hack to get around signals ignored in syscalls for problem OS's */ if (unix_sock == INVALID_SOCKET || ip_sock ==INVALID_SOCKET) { select_thread_in_use = 0; kill_server((void*)MYSQL_KILL_SIGNAL); /* never returns */ } } #define MAYBE_BROKEN_SYSCALL kill_broken_server(); #else #define MAYBE_BROKEN_SYSCALL #endif /* Handle new connections and spawn new process to handle them */ Loading Loading @@ -2288,6 +2331,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) #endif DBUG_PRINT("general",("Waiting for connections.")); MAYBE_BROKEN_SYSCALL; while (!abort_loop) { readFDs=clientFDs; Loading @@ -2302,12 +2346,15 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (!select_errors++ && !abort_loop) /* purecov: inspected */ sql_print_error("mysqld: Got error %d from select",socket_errno); /* purecov: inspected */ } MAYBE_BROKEN_SYSCALL continue; } #endif /* HPUX */ if (abort_loop) { MAYBE_BROKEN_SYSCALL; break; } /* ** Is this a new connection request */ Loading Loading @@ -2343,6 +2390,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) if (new_sock != INVALID_SOCKET || (socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN)) break; MAYBE_BROKEN_SYSCALL; #if !defined(NO_FCNTL_NONBLOCK) if (!(test_flags & TEST_BLOCKING)) { Loading @@ -2359,6 +2407,7 @@ pthread_handler_decl(handle_connections_sockets,arg __attribute__((unused))) { if ((error_count++ & 255) == 0) // This can happen often sql_perror("Error in accept"); MAYBE_BROKEN_SYSCALL; if (socket_errno == SOCKET_ENFILE || socket_errno == SOCKET_EMFILE) sleep(1); // Give other threads some time continue; Loading