Ruby
2.0.0p247(2013-06-27revision41674)
|
00001 /********************************************************************** 00002 00003 thread.c - 00004 00005 $Author: nagachika $ 00006 00007 Copyright (C) 2004-2007 Koichi Sasada 00008 00009 **********************************************************************/ 00010 00011 /* 00012 YARV Thread Design 00013 00014 model 1: Userlevel Thread 00015 Same as traditional ruby thread. 00016 00017 model 2: Native Thread with Global VM lock 00018 Using pthread (or Windows thread) and Ruby threads run concurrent. 00019 00020 model 3: Native Thread with fine grain lock 00021 Using pthread and Ruby threads run concurrent or parallel. 00022 00023 ------------------------------------------------------------------------ 00024 00025 model 2: 00026 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run. 00027 When thread scheduling, running thread release GVL. If running thread 00028 try blocking operation, this thread must release GVL and another 00029 thread can continue this flow. After blocking operation, thread 00030 must check interrupt (RUBY_VM_CHECK_INTS). 00031 00032 Every VM can run parallel. 00033 00034 Ruby threads are scheduled by OS thread scheduler. 00035 00036 ------------------------------------------------------------------------ 00037 00038 model 3: 00039 Every threads run concurrent or parallel and to access shared object 00040 exclusive access control is needed. For example, to access String 00041 object or Array object, fine grain lock must be locked every time. 00042 */ 00043 00044 00045 /* 00046 * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc 00047 * 2.15 or later and set _FORTIFY_SOURCE > 0. 00048 * However, the implementation is wrong. Even though Linux's select(2) 00049 * support large fd size (>FD_SETSIZE), it wrongly assume fd is always 00050 * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT, 00051 * it doesn't work correctly and makes program abort. Therefore we need to 00052 * disable FORTY_SOURCE until glibc fixes it. 00053 */ 00054 #undef _FORTIFY_SOURCE 00055 #undef __USE_FORTIFY_LEVEL 00056 #define __USE_FORTIFY_LEVEL 0 00057 00058 /* for model 2 */ 00059 00060 #include "eval_intern.h" 00061 #include "gc.h" 00062 #include "internal.h" 00063 #include "ruby/io.h" 00064 #include "ruby/thread.h" 00065 00066 #ifndef USE_NATIVE_THREAD_PRIORITY 00067 #define USE_NATIVE_THREAD_PRIORITY 0 00068 #define RUBY_THREAD_PRIORITY_MAX 3 00069 #define RUBY_THREAD_PRIORITY_MIN -3 00070 #endif 00071 00072 #ifndef THREAD_DEBUG 00073 #define THREAD_DEBUG 0 00074 #endif 00075 00076 #define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0)) 00077 #define TIMET_MIN (~(time_t)0 <= 0 ? (time_t)(((unsigned_time_t)1) << (sizeof(time_t) * CHAR_BIT - 1)) : (time_t)0) 00078 00079 VALUE rb_cMutex; 00080 VALUE rb_cThreadShield; 00081 00082 static VALUE sym_immediate; 00083 static VALUE sym_on_blocking; 00084 static VALUE sym_never; 00085 00086 static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check); 00087 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check); 00088 static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check); 00089 static double timeofday(void); 00090 static int rb_threadptr_dead(rb_thread_t *th); 00091 static void rb_check_deadlock(rb_vm_t *vm); 00092 static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th); 00093 00094 #define eKillSignal INT2FIX(0) 00095 #define eTerminateSignal INT2FIX(1) 00096 static volatile int system_working = 1; 00097 00098 #define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] 00099 00100 inline static void 00101 st_delete_wrap(st_table *table, st_data_t key) 00102 { 00103 st_delete(table, &key, 0); 00104 } 00105 00106 /********************************************************************************/ 00107 00108 #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION 00109 00110 struct rb_blocking_region_buffer { 00111 enum rb_thread_status prev_status; 00112 struct rb_unblock_callback oldubf; 00113 }; 00114 00115 static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00116 struct rb_unblock_callback *old, int fail_if_interrupted); 00117 static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); 00118 00119 static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, 00120 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); 00121 static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); 00122 00123 #ifdef __ia64 00124 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \ 00125 do{(th)->machine_register_stack_end = rb_ia64_bsp();}while(0) 00126 #else 00127 #define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) 00128 #endif 00129 #define RB_GC_SAVE_MACHINE_CONTEXT(th) \ 00130 do { \ 00131 FLUSH_REGISTER_WINDOWS; \ 00132 RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \ 00133 setjmp((th)->machine_regs); \ 00134 SET_MACHINE_STACK_END(&(th)->machine_stack_end); \ 00135 } while (0) 00136 00137 #define GVL_UNLOCK_BEGIN() do { \ 00138 rb_thread_t *_th_stored = GET_THREAD(); \ 00139 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \ 00140 gvl_release(_th_stored->vm); 00141 00142 #define GVL_UNLOCK_END() \ 00143 gvl_acquire(_th_stored->vm, _th_stored); \ 00144 rb_thread_set_current(_th_stored); \ 00145 } while(0) 00146 00147 #ifdef __GNUC__ 00148 #define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst)) 00149 #else 00150 #define only_if_constant(expr, notconst) notconst 00151 #endif 00152 #define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \ 00153 rb_thread_t *__th = GET_THREAD(); \ 00154 struct rb_blocking_region_buffer __region; \ 00155 if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ 00156 /* always return true unless fail_if_interrupted */ \ 00157 !only_if_constant(fail_if_interrupted, TRUE)) { \ 00158 exec; \ 00159 blocking_region_end(__th, &__region); \ 00160 }; \ 00161 } while(0) 00162 00163 #if THREAD_DEBUG 00164 #ifdef HAVE_VA_ARGS_MACRO 00165 void rb_thread_debug(const char *file, int line, const char *fmt, ...); 00166 #define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__) 00167 #define POSITION_FORMAT "%s:%d:" 00168 #define POSITION_ARGS ,file, line 00169 #else 00170 void rb_thread_debug(const char *fmt, ...); 00171 #define thread_debug rb_thread_debug 00172 #define POSITION_FORMAT 00173 #define POSITION_ARGS 00174 #endif 00175 00176 # if THREAD_DEBUG < 0 00177 static int rb_thread_debug_enabled; 00178 00179 /* 00180 * call-seq: 00181 * Thread.DEBUG -> num 00182 * 00183 * Returns the thread debug level. Available only if compiled with 00184 * THREAD_DEBUG=-1. 00185 */ 00186 00187 static VALUE 00188 rb_thread_s_debug(void) 00189 { 00190 return INT2NUM(rb_thread_debug_enabled); 00191 } 00192 00193 /* 00194 * call-seq: 00195 * Thread.DEBUG = num 00196 * 00197 * Sets the thread debug level. Available only if compiled with 00198 * THREAD_DEBUG=-1. 00199 */ 00200 00201 static VALUE 00202 rb_thread_s_debug_set(VALUE self, VALUE val) 00203 { 00204 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0; 00205 return val; 00206 } 00207 # else 00208 # define rb_thread_debug_enabled THREAD_DEBUG 00209 # endif 00210 #else 00211 #define thread_debug if(0)printf 00212 #endif 00213 00214 #ifndef __ia64 00215 #define thread_start_func_2(th, st, rst) thread_start_func_2(th, st) 00216 #endif 00217 NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, 00218 VALUE *register_stack_start)); 00219 static void timer_thread_function(void *); 00220 00221 #if defined(_WIN32) 00222 #include "thread_win32.c" 00223 00224 #define DEBUG_OUT() \ 00225 WaitForSingleObject(&debug_mutex, INFINITE); \ 00226 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \ 00227 fflush(stdout); \ 00228 ReleaseMutex(&debug_mutex); 00229 00230 #elif defined(HAVE_PTHREAD_H) 00231 #include "thread_pthread.c" 00232 00233 #define DEBUG_OUT() \ 00234 pthread_mutex_lock(&debug_mutex); \ 00235 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \ 00236 fflush(stdout); \ 00237 pthread_mutex_unlock(&debug_mutex); 00238 00239 #else 00240 #error "unsupported thread type" 00241 #endif 00242 00243 #if THREAD_DEBUG 00244 static int debug_mutex_initialized = 1; 00245 static rb_thread_lock_t debug_mutex; 00246 00247 void 00248 rb_thread_debug( 00249 #ifdef HAVE_VA_ARGS_MACRO 00250 const char *file, int line, 00251 #endif 00252 const char *fmt, ...) 00253 { 00254 va_list args; 00255 char buf[BUFSIZ]; 00256 00257 if (!rb_thread_debug_enabled) return; 00258 00259 if (debug_mutex_initialized == 1) { 00260 debug_mutex_initialized = 0; 00261 native_mutex_initialize(&debug_mutex); 00262 } 00263 00264 va_start(args, fmt); 00265 vsnprintf(buf, BUFSIZ, fmt, args); 00266 va_end(args); 00267 00268 DEBUG_OUT(); 00269 } 00270 #endif 00271 00272 void 00273 rb_vm_gvl_destroy(rb_vm_t *vm) 00274 { 00275 gvl_release(vm); 00276 gvl_destroy(vm); 00277 native_mutex_destroy(&vm->thread_destruct_lock); 00278 } 00279 00280 void 00281 rb_thread_lock_unlock(rb_thread_lock_t *lock) 00282 { 00283 native_mutex_unlock(lock); 00284 } 00285 00286 void 00287 rb_thread_lock_destroy(rb_thread_lock_t *lock) 00288 { 00289 native_mutex_destroy(lock); 00290 } 00291 00292 static int 00293 set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 00294 struct rb_unblock_callback *old, int fail_if_interrupted) 00295 { 00296 check_ints: 00297 if (fail_if_interrupted) { 00298 if (RUBY_VM_INTERRUPTED_ANY(th)) { 00299 return FALSE; 00300 } 00301 } 00302 else { 00303 RUBY_VM_CHECK_INTS(th); 00304 } 00305 00306 native_mutex_lock(&th->interrupt_lock); 00307 if (RUBY_VM_INTERRUPTED_ANY(th)) { 00308 native_mutex_unlock(&th->interrupt_lock); 00309 goto check_ints; 00310 } 00311 else { 00312 if (old) *old = th->unblock; 00313 th->unblock.func = func; 00314 th->unblock.arg = arg; 00315 } 00316 native_mutex_unlock(&th->interrupt_lock); 00317 00318 return TRUE; 00319 } 00320 00321 static void 00322 reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old) 00323 { 00324 native_mutex_lock(&th->interrupt_lock); 00325 th->unblock = *old; 00326 native_mutex_unlock(&th->interrupt_lock); 00327 } 00328 00329 static void 00330 rb_threadptr_interrupt_common(rb_thread_t *th, int trap) 00331 { 00332 native_mutex_lock(&th->interrupt_lock); 00333 if (trap) 00334 RUBY_VM_SET_TRAP_INTERRUPT(th); 00335 else 00336 RUBY_VM_SET_INTERRUPT(th); 00337 if (th->unblock.func) { 00338 (th->unblock.func)(th->unblock.arg); 00339 } 00340 else { 00341 /* none */ 00342 } 00343 native_mutex_unlock(&th->interrupt_lock); 00344 } 00345 00346 void 00347 rb_threadptr_interrupt(rb_thread_t *th) 00348 { 00349 rb_threadptr_interrupt_common(th, 0); 00350 } 00351 00352 void 00353 rb_threadptr_trap_interrupt(rb_thread_t *th) 00354 { 00355 rb_threadptr_interrupt_common(th, 1); 00356 } 00357 00358 static int 00359 terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread) 00360 { 00361 VALUE thval = key; 00362 rb_thread_t *th; 00363 GetThreadPtr(thval, th); 00364 00365 if (th != main_thread) { 00366 thread_debug("terminate_i: %p\n", (void *)th); 00367 rb_threadptr_pending_interrupt_enque(th, eTerminateSignal); 00368 rb_threadptr_interrupt(th); 00369 } 00370 else { 00371 thread_debug("terminate_i: main thread (%p)\n", (void *)th); 00372 } 00373 return ST_CONTINUE; 00374 } 00375 00376 typedef struct rb_mutex_struct 00377 { 00378 rb_thread_lock_t lock; 00379 rb_thread_cond_t cond; 00380 struct rb_thread_struct volatile *th; 00381 int cond_waiting; 00382 struct rb_mutex_struct *next_mutex; 00383 int allow_trap; 00384 } rb_mutex_t; 00385 00386 static void rb_mutex_abandon_all(rb_mutex_t *mutexes); 00387 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 00388 00389 void 00390 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) 00391 { 00392 const char *err; 00393 rb_mutex_t *mutex; 00394 rb_mutex_t *mutexes = th->keeping_mutexes; 00395 00396 while (mutexes) { 00397 mutex = mutexes; 00398 /* rb_warn("mutex #<%p> remains to be locked by terminated thread", 00399 mutexes); */ 00400 mutexes = mutex->next_mutex; 00401 err = rb_mutex_unlock_th(mutex, th); 00402 if (err) rb_bug("invalid keeping_mutexes: %s", err); 00403 } 00404 } 00405 00406 void 00407 rb_thread_terminate_all(void) 00408 { 00409 rb_thread_t *th = GET_THREAD(); /* main thread */ 00410 rb_vm_t *vm = th->vm; 00411 00412 if (vm->main_thread != th) { 00413 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", 00414 (void *)vm->main_thread, (void *)th); 00415 } 00416 00417 /* unlock all locking mutexes */ 00418 rb_threadptr_unlock_all_locking_mutexes(th); 00419 00420 retry: 00421 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); 00422 st_foreach(vm->living_threads, terminate_i, (st_data_t)th); 00423 00424 while (!rb_thread_alone()) { 00425 int state; 00426 00427 TH_PUSH_TAG(th); 00428 if ((state = TH_EXEC_TAG()) == 0) { 00429 native_sleep(th, 0); 00430 RUBY_VM_CHECK_INTS_BLOCKING(th); 00431 } 00432 TH_POP_TAG(); 00433 00434 if (state) { 00435 goto retry; 00436 } 00437 } 00438 } 00439 00440 static void 00441 thread_cleanup_func_before_exec(void *th_ptr) 00442 { 00443 rb_thread_t *th = th_ptr; 00444 th->status = THREAD_KILLED; 00445 th->machine_stack_start = th->machine_stack_end = 0; 00446 #ifdef __ia64 00447 th->machine_register_stack_start = th->machine_register_stack_end = 0; 00448 #endif 00449 } 00450 00451 static void 00452 thread_cleanup_func(void *th_ptr, int atfork) 00453 { 00454 rb_thread_t *th = th_ptr; 00455 00456 th->locking_mutex = Qfalse; 00457 thread_cleanup_func_before_exec(th_ptr); 00458 00459 /* 00460 * Unfortunately, we can't release native threading resource at fork 00461 * because libc may have unstable locking state therefore touching 00462 * a threading resource may cause a deadlock. 00463 */ 00464 if (atfork) 00465 return; 00466 00467 native_mutex_destroy(&th->interrupt_lock); 00468 native_thread_destroy(th); 00469 } 00470 00471 static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); 00472 00473 void 00474 ruby_thread_init_stack(rb_thread_t *th) 00475 { 00476 native_thread_init_stack(th); 00477 } 00478 00479 static int 00480 thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start) 00481 { 00482 int state; 00483 VALUE args = th->first_args; 00484 rb_proc_t *proc; 00485 rb_thread_list_t *join_list; 00486 rb_thread_t *main_th; 00487 VALUE errinfo = Qnil; 00488 # ifdef USE_SIGALTSTACK 00489 void rb_register_sigaltstack(rb_thread_t *th); 00490 00491 rb_register_sigaltstack(th); 00492 # endif 00493 00494 if (th == th->vm->main_thread) 00495 rb_bug("thread_start_func_2 must not used for main thread"); 00496 00497 ruby_thread_set_native(th); 00498 00499 th->machine_stack_start = stack_start; 00500 #ifdef __ia64 00501 th->machine_register_stack_start = register_stack_start; 00502 #endif 00503 thread_debug("thread start: %p\n", (void *)th); 00504 00505 gvl_acquire(th->vm, th); 00506 { 00507 thread_debug("thread start (get lock): %p\n", (void *)th); 00508 rb_thread_set_current(th); 00509 00510 TH_PUSH_TAG(th); 00511 if ((state = EXEC_TAG()) == 0) { 00512 SAVE_ROOT_JMPBUF(th, { 00513 if (!th->first_func) { 00514 GetProcPtr(th->first_proc, proc); 00515 th->errinfo = Qnil; 00516 th->root_lep = rb_vm_ep_local_ep(proc->block.ep); 00517 th->root_svar = Qnil; 00518 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, Qundef); 00519 th->value = rb_vm_invoke_proc(th, proc, (int)RARRAY_LEN(args), RARRAY_PTR(args), 0); 00520 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_END, th->self, 0, 0, Qundef); 00521 } 00522 else { 00523 th->value = (*th->first_func)((void *)args); 00524 } 00525 }); 00526 } 00527 else { 00528 errinfo = th->errinfo; 00529 if (state == TAG_FATAL) { 00530 /* fatal error within this thread, need to stop whole script */ 00531 } 00532 else if (th->safe_level >= 4) { 00533 /* Ignore it. Main thread shouldn't be harmed from untrusted thread. */ 00534 errinfo = Qnil; 00535 } 00536 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { 00537 /* exit on main_thread. */ 00538 } 00539 else if (th->vm->thread_abort_on_exception || 00540 th->abort_on_exception || RTEST(ruby_debug)) { 00541 /* exit on main_thread */ 00542 } 00543 else { 00544 errinfo = Qnil; 00545 } 00546 th->value = Qnil; 00547 } 00548 00549 th->status = THREAD_KILLED; 00550 thread_debug("thread end: %p\n", (void *)th); 00551 00552 main_th = th->vm->main_thread; 00553 if (RB_TYPE_P(errinfo, T_OBJECT)) { 00554 /* treat with normal error object */ 00555 rb_threadptr_raise(main_th, 1, &errinfo); 00556 } 00557 TH_POP_TAG(); 00558 00559 /* locking_mutex must be Qfalse */ 00560 if (th->locking_mutex != Qfalse) { 00561 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", 00562 (void *)th, th->locking_mutex); 00563 } 00564 00565 /* delete self other than main thread from living_threads */ 00566 st_delete_wrap(th->vm->living_threads, th->self); 00567 if (rb_thread_alone()) { 00568 /* I'm last thread. wake up main thread from rb_thread_terminate_all */ 00569 rb_threadptr_interrupt(main_th); 00570 } 00571 00572 /* wake up joining threads */ 00573 join_list = th->join_list; 00574 while (join_list) { 00575 rb_threadptr_interrupt(join_list->th); 00576 switch (join_list->th->status) { 00577 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: 00578 join_list->th->status = THREAD_RUNNABLE; 00579 default: break; 00580 } 00581 join_list = join_list->next; 00582 } 00583 00584 rb_threadptr_unlock_all_locking_mutexes(th); 00585 rb_check_deadlock(th->vm); 00586 00587 if (!th->root_fiber) { 00588 rb_thread_recycle_stack_release(th->stack); 00589 th->stack = 0; 00590 } 00591 } 00592 native_mutex_lock(&th->vm->thread_destruct_lock); 00593 /* make sure vm->running_thread never point me after this point.*/ 00594 th->vm->running_thread = NULL; 00595 native_mutex_unlock(&th->vm->thread_destruct_lock); 00596 thread_cleanup_func(th, FALSE); 00597 gvl_release(th->vm); 00598 00599 return 0; 00600 } 00601 00602 static VALUE 00603 thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) 00604 { 00605 rb_thread_t *th, *current_th = GET_THREAD(); 00606 int err; 00607 00608 if (OBJ_FROZEN(GET_THREAD()->thgroup)) { 00609 rb_raise(rb_eThreadError, 00610 "can't start a new thread (frozen ThreadGroup)"); 00611 } 00612 GetThreadPtr(thval, th); 00613 00614 /* setup thread environment */ 00615 th->first_func = fn; 00616 th->first_proc = fn ? Qfalse : rb_block_proc(); 00617 th->first_args = args; /* GC: shouldn't put before above line */ 00618 00619 th->priority = current_th->priority; 00620 th->thgroup = current_th->thgroup; 00621 00622 th->pending_interrupt_queue = rb_ary_tmp_new(0); 00623 th->pending_interrupt_queue_checked = 0; 00624 th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack); 00625 RBASIC(th->pending_interrupt_mask_stack)->klass = 0; 00626 00627 th->interrupt_mask = 0; 00628 00629 native_mutex_initialize(&th->interrupt_lock); 00630 00631 /* kick thread */ 00632 err = native_thread_create(th); 00633 if (err) { 00634 th->status = THREAD_KILLED; 00635 rb_raise(rb_eThreadError, "can't create Thread (%d)", err); 00636 } 00637 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id); 00638 return thval; 00639 } 00640 00641 /* 00642 * call-seq: 00643 * Thread.new { ... } -> thread 00644 * Thread.new(*args, &proc) -> thread 00645 * Thread.new(*args) { |args| ... } -> thread 00646 * 00647 * Creates a new thread executing the given block. 00648 * 00649 * Any +args+ given to ::new will be passed to the block: 00650 * 00651 * arr = [] 00652 * a, b, c = 1, 2, 3 00653 * Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join 00654 * arr #=> [1, 2, 3] 00655 * 00656 * A ThreadError exception is raised if ::new is called without a block. 00657 * 00658 * If you're going to subclass Thread, be sure to call super in your 00659 * +initialize+ method, otherwise a ThreadError will be raised. 00660 */ 00661 static VALUE 00662 thread_s_new(int argc, VALUE *argv, VALUE klass) 00663 { 00664 rb_thread_t *th; 00665 VALUE thread = rb_thread_alloc(klass); 00666 00667 if (GET_VM()->main_thread->status == THREAD_KILLED) 00668 rb_raise(rb_eThreadError, "can't alloc thread"); 00669 00670 rb_obj_call_init(thread, argc, argv); 00671 GetThreadPtr(thread, th); 00672 if (!th->first_args) { 00673 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'", 00674 rb_class2name(klass)); 00675 } 00676 return thread; 00677 } 00678 00679 /* 00680 * call-seq: 00681 * Thread.start([args]*) {|args| block } -> thread 00682 * Thread.fork([args]*) {|args| block } -> thread 00683 * 00684 * Basically the same as ::new. However, if class Thread is subclassed, then 00685 * calling +start+ in that subclass will not invoke the subclass's 00686 * +initialize+ method. 00687 */ 00688 00689 static VALUE 00690 thread_start(VALUE klass, VALUE args) 00691 { 00692 return thread_create_core(rb_thread_alloc(klass), args, 0); 00693 } 00694 00695 /* :nodoc: */ 00696 static VALUE 00697 thread_initialize(VALUE thread, VALUE args) 00698 { 00699 rb_thread_t *th; 00700 if (!rb_block_given_p()) { 00701 rb_raise(rb_eThreadError, "must be called with a block"); 00702 } 00703 GetThreadPtr(thread, th); 00704 if (th->first_args) { 00705 VALUE proc = th->first_proc, line, loc; 00706 const char *file; 00707 if (!proc || !RTEST(loc = rb_proc_location(proc))) { 00708 rb_raise(rb_eThreadError, "already initialized thread"); 00709 } 00710 file = RSTRING_PTR(RARRAY_PTR(loc)[0]); 00711 if (NIL_P(line = RARRAY_PTR(loc)[1])) { 00712 rb_raise(rb_eThreadError, "already initialized thread - %s", 00713 file); 00714 } 00715 rb_raise(rb_eThreadError, "already initialized thread - %s:%d", 00716 file, NUM2INT(line)); 00717 } 00718 return thread_create_core(thread, args, 0); 00719 } 00720 00721 VALUE 00722 rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) 00723 { 00724 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); 00725 } 00726 00727 00728 /* +infty, for this purpose */ 00729 #define DELAY_INFTY 1E30 00730 00731 struct join_arg { 00732 rb_thread_t *target, *waiting; 00733 double limit; 00734 int forever; 00735 }; 00736 00737 static VALUE 00738 remove_from_join_list(VALUE arg) 00739 { 00740 struct join_arg *p = (struct join_arg *)arg; 00741 rb_thread_t *target_th = p->target, *th = p->waiting; 00742 00743 if (target_th->status != THREAD_KILLED) { 00744 rb_thread_list_t **p = &target_th->join_list; 00745 00746 while (*p) { 00747 if ((*p)->th == th) { 00748 *p = (*p)->next; 00749 break; 00750 } 00751 p = &(*p)->next; 00752 } 00753 } 00754 00755 return Qnil; 00756 } 00757 00758 static VALUE 00759 thread_join_sleep(VALUE arg) 00760 { 00761 struct join_arg *p = (struct join_arg *)arg; 00762 rb_thread_t *target_th = p->target, *th = p->waiting; 00763 double now, limit = p->limit; 00764 00765 while (target_th->status != THREAD_KILLED) { 00766 if (p->forever) { 00767 sleep_forever(th, 1, 0); 00768 } 00769 else { 00770 now = timeofday(); 00771 if (now > limit) { 00772 thread_debug("thread_join: timeout (thid: %p)\n", 00773 (void *)target_th->thread_id); 00774 return Qfalse; 00775 } 00776 sleep_wait_for_interrupt(th, limit - now, 0); 00777 } 00778 thread_debug("thread_join: interrupted (thid: %p)\n", 00779 (void *)target_th->thread_id); 00780 } 00781 return Qtrue; 00782 } 00783 00784 static VALUE 00785 thread_join(rb_thread_t *target_th, double delay) 00786 { 00787 rb_thread_t *th = GET_THREAD(); 00788 struct join_arg arg; 00789 00790 if (th == target_th) { 00791 rb_raise(rb_eThreadError, "Target thread must not be current thread"); 00792 } 00793 if (GET_VM()->main_thread == target_th) { 00794 rb_raise(rb_eThreadError, "Target thread must not be main thread"); 00795 } 00796 00797 arg.target = target_th; 00798 arg.waiting = th; 00799 arg.limit = timeofday() + delay; 00800 arg.forever = delay == DELAY_INFTY; 00801 00802 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id); 00803 00804 if (target_th->status != THREAD_KILLED) { 00805 rb_thread_list_t list; 00806 list.next = target_th->join_list; 00807 list.th = th; 00808 target_th->join_list = &list; 00809 if (!rb_ensure(thread_join_sleep, (VALUE)&arg, 00810 remove_from_join_list, (VALUE)&arg)) { 00811 return Qnil; 00812 } 00813 } 00814 00815 thread_debug("thread_join: success (thid: %p)\n", 00816 (void *)target_th->thread_id); 00817 00818 if (target_th->errinfo != Qnil) { 00819 VALUE err = target_th->errinfo; 00820 00821 if (FIXNUM_P(err)) { 00822 /* */ 00823 } 00824 else if (RB_TYPE_P(target_th->errinfo, T_NODE)) { 00825 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump( 00826 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err))); 00827 } 00828 else { 00829 /* normal exception */ 00830 rb_exc_raise(err); 00831 } 00832 } 00833 return target_th->self; 00834 } 00835 00836 /* 00837 * call-seq: 00838 * thr.join -> thr 00839 * thr.join(limit) -> thr 00840 * 00841 * The calling thread will suspend execution and run <i>thr</i>. Does not 00842 * return until <i>thr</i> exits or until <i>limit</i> seconds have passed. If 00843 * the time limit expires, <code>nil</code> will be returned, otherwise 00844 * <i>thr</i> is returned. 00845 * 00846 * Any threads not joined will be killed when the main program exits. If 00847 * <i>thr</i> had previously raised an exception and the 00848 * <code>abort_on_exception</code> and <code>$DEBUG</code> flags are not set 00849 * (so the exception has not yet been processed) it will be processed at this 00850 * time. 00851 * 00852 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" } 00853 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } 00854 * x.join # Let x thread finish, a will be killed on exit. 00855 * 00856 * <em>produces:</em> 00857 * 00858 * axyz 00859 * 00860 * The following example illustrates the <i>limit</i> parameter. 00861 * 00862 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} 00863 * puts "Waiting" until y.join(0.15) 00864 * 00865 * <em>produces:</em> 00866 * 00867 * tick... 00868 * Waiting 00869 * tick... 00870 * Waitingtick... 00871 * 00872 * 00873 * tick... 00874 */ 00875 00876 static VALUE 00877 thread_join_m(int argc, VALUE *argv, VALUE self) 00878 { 00879 rb_thread_t *target_th; 00880 double delay = DELAY_INFTY; 00881 VALUE limit; 00882 00883 GetThreadPtr(self, target_th); 00884 00885 rb_scan_args(argc, argv, "01", &limit); 00886 if (!NIL_P(limit)) { 00887 delay = rb_num2dbl(limit); 00888 } 00889 00890 return thread_join(target_th, delay); 00891 } 00892 00893 /* 00894 * call-seq: 00895 * thr.value -> obj 00896 * 00897 * Waits for <i>thr</i> to complete (via <code>Thread#join</code>) and returns 00898 * its value. 00899 * 00900 * a = Thread.new { 2 + 2 } 00901 * a.value #=> 4 00902 */ 00903 00904 static VALUE 00905 thread_value(VALUE self) 00906 { 00907 rb_thread_t *th; 00908 GetThreadPtr(self, th); 00909 thread_join(th, DELAY_INFTY); 00910 return th->value; 00911 } 00912 00913 /* 00914 * Thread Scheduling 00915 */ 00916 00917 static struct timeval 00918 double2timeval(double d) 00919 { 00920 struct timeval time; 00921 00922 if (isinf(d)) { 00923 time.tv_sec = TIMET_MAX; 00924 time.tv_usec = 0; 00925 return time; 00926 } 00927 00928 time.tv_sec = (int)d; 00929 time.tv_usec = (int)((d - (int)d) * 1e6); 00930 if (time.tv_usec < 0) { 00931 time.tv_usec += (int)1e6; 00932 time.tv_sec -= 1; 00933 } 00934 return time; 00935 } 00936 00937 static void 00938 sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check) 00939 { 00940 enum rb_thread_status prev_status = th->status; 00941 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; 00942 00943 th->status = status; 00944 RUBY_VM_CHECK_INTS_BLOCKING(th); 00945 while (th->status == status) { 00946 if (deadlockable) { 00947 th->vm->sleeper++; 00948 rb_check_deadlock(th->vm); 00949 } 00950 native_sleep(th, 0); 00951 if (deadlockable) { 00952 th->vm->sleeper--; 00953 } 00954 RUBY_VM_CHECK_INTS_BLOCKING(th); 00955 if (!spurious_check) 00956 break; 00957 } 00958 th->status = prev_status; 00959 } 00960 00961 static void 00962 getclockofday(struct timeval *tp) 00963 { 00964 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 00965 struct timespec ts; 00966 00967 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { 00968 tp->tv_sec = ts.tv_sec; 00969 tp->tv_usec = ts.tv_nsec / 1000; 00970 } else 00971 #endif 00972 { 00973 gettimeofday(tp, NULL); 00974 } 00975 } 00976 00977 static void 00978 sleep_timeval(rb_thread_t *th, struct timeval tv, int spurious_check) 00979 { 00980 struct timeval to, tvn; 00981 enum rb_thread_status prev_status = th->status; 00982 00983 getclockofday(&to); 00984 if (TIMET_MAX - tv.tv_sec < to.tv_sec) 00985 to.tv_sec = TIMET_MAX; 00986 else 00987 to.tv_sec += tv.tv_sec; 00988 if ((to.tv_usec += tv.tv_usec) >= 1000000) { 00989 if (to.tv_sec == TIMET_MAX) 00990 to.tv_usec = 999999; 00991 else { 00992 to.tv_sec++; 00993 to.tv_usec -= 1000000; 00994 } 00995 } 00996 00997 th->status = THREAD_STOPPED; 00998 RUBY_VM_CHECK_INTS_BLOCKING(th); 00999 while (th->status == THREAD_STOPPED) { 01000 native_sleep(th, &tv); 01001 RUBY_VM_CHECK_INTS_BLOCKING(th); 01002 getclockofday(&tvn); 01003 if (to.tv_sec < tvn.tv_sec) break; 01004 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; 01005 thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", 01006 (long)to.tv_sec, (long)to.tv_usec, 01007 (long)tvn.tv_sec, (long)tvn.tv_usec); 01008 tv.tv_sec = to.tv_sec - tvn.tv_sec; 01009 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { 01010 --tv.tv_sec; 01011 tv.tv_usec += 1000000; 01012 } 01013 if (!spurious_check) 01014 break; 01015 } 01016 th->status = prev_status; 01017 } 01018 01019 void 01020 rb_thread_sleep_forever(void) 01021 { 01022 thread_debug("rb_thread_sleep_forever\n"); 01023 sleep_forever(GET_THREAD(), 0, 1); 01024 } 01025 01026 static void 01027 rb_thread_sleep_deadly(void) 01028 { 01029 thread_debug("rb_thread_sleep_deadly\n"); 01030 sleep_forever(GET_THREAD(), 1, 1); 01031 } 01032 01033 static double 01034 timeofday(void) 01035 { 01036 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 01037 struct timespec tp; 01038 01039 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) { 01040 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9; 01041 } else 01042 #endif 01043 { 01044 struct timeval tv; 01045 gettimeofday(&tv, NULL); 01046 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6; 01047 } 01048 } 01049 01050 static void 01051 sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check) 01052 { 01053 sleep_timeval(th, double2timeval(sleepsec), spurious_check); 01054 } 01055 01056 static void 01057 sleep_for_polling(rb_thread_t *th) 01058 { 01059 struct timeval time; 01060 time.tv_sec = 0; 01061 time.tv_usec = 100 * 1000; /* 0.1 sec */ 01062 sleep_timeval(th, time, 1); 01063 } 01064 01065 void 01066 rb_thread_wait_for(struct timeval time) 01067 { 01068 rb_thread_t *th = GET_THREAD(); 01069 sleep_timeval(th, time, 1); 01070 } 01071 01072 void 01073 rb_thread_polling(void) 01074 { 01075 if (!rb_thread_alone()) { 01076 rb_thread_t *th = GET_THREAD(); 01077 RUBY_VM_CHECK_INTS_BLOCKING(th); 01078 sleep_for_polling(th); 01079 } 01080 } 01081 01082 /* 01083 * CAUTION: This function causes thread switching. 01084 * rb_thread_check_ints() check ruby's interrupts. 01085 * some interrupt needs thread switching/invoke handlers, 01086 * and so on. 01087 */ 01088 01089 void 01090 rb_thread_check_ints(void) 01091 { 01092 RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD()); 01093 } 01094 01095 /* 01096 * Hidden API for tcl/tk wrapper. 01097 * There is no guarantee to perpetuate it. 01098 */ 01099 int 01100 rb_thread_check_trap_pending(void) 01101 { 01102 return rb_signal_buff_size() != 0; 01103 } 01104 01105 /* This function can be called in blocking region. */ 01106 int 01107 rb_thread_interrupted(VALUE thval) 01108 { 01109 rb_thread_t *th; 01110 GetThreadPtr(thval, th); 01111 return (int)RUBY_VM_INTERRUPTED(th); 01112 } 01113 01114 void 01115 rb_thread_sleep(int sec) 01116 { 01117 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); 01118 } 01119 01120 static void 01121 rb_thread_schedule_limits(unsigned long limits_us) 01122 { 01123 thread_debug("rb_thread_schedule\n"); 01124 if (!rb_thread_alone()) { 01125 rb_thread_t *th = GET_THREAD(); 01126 01127 if (th->running_time_us >= limits_us) { 01128 thread_debug("rb_thread_schedule/switch start\n"); 01129 RB_GC_SAVE_MACHINE_CONTEXT(th); 01130 gvl_yield(th->vm, th); 01131 rb_thread_set_current(th); 01132 thread_debug("rb_thread_schedule/switch done\n"); 01133 } 01134 } 01135 } 01136 01137 void 01138 rb_thread_schedule(void) 01139 { 01140 rb_thread_t *cur_th = GET_THREAD(); 01141 rb_thread_schedule_limits(0); 01142 01143 if (UNLIKELY(RUBY_VM_INTERRUPTED_ANY(cur_th))) { 01144 rb_threadptr_execute_interrupts(cur_th, 0); 01145 } 01146 } 01147 01148 /* blocking region */ 01149 01150 static inline int 01151 blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, 01152 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted) 01153 { 01154 region->prev_status = th->status; 01155 if (set_unblock_function(th, ubf, arg, ®ion->oldubf, fail_if_interrupted)) { 01156 th->blocking_region_buffer = region; 01157 th->status = THREAD_STOPPED; 01158 thread_debug("enter blocking region (%p)\n", (void *)th); 01159 RB_GC_SAVE_MACHINE_CONTEXT(th); 01160 gvl_release(th->vm); 01161 return TRUE; 01162 } 01163 else { 01164 return FALSE; 01165 } 01166 } 01167 01168 static inline void 01169 blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) 01170 { 01171 gvl_acquire(th->vm, th); 01172 rb_thread_set_current(th); 01173 thread_debug("leave blocking region (%p)\n", (void *)th); 01174 remove_signal_thread_list(th); 01175 th->blocking_region_buffer = 0; 01176 reset_unblock_function(th, ®ion->oldubf); 01177 if (th->status == THREAD_STOPPED) { 01178 th->status = region->prev_status; 01179 } 01180 } 01181 01182 struct rb_blocking_region_buffer * 01183 rb_thread_blocking_region_begin(void) 01184 { 01185 rb_thread_t *th = GET_THREAD(); 01186 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer); 01187 blocking_region_begin(th, region, ubf_select, th, FALSE); 01188 return region; 01189 } 01190 01191 void 01192 rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) 01193 { 01194 int saved_errno = errno; 01195 rb_thread_t *th = ruby_thread_from_native(); 01196 blocking_region_end(th, region); 01197 xfree(region); 01198 RUBY_VM_CHECK_INTS_BLOCKING(th); 01199 errno = saved_errno; 01200 } 01201 01202 static void * 01203 call_without_gvl(void *(*func)(void *), void *data1, 01204 rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted) 01205 { 01206 void *val = 0; 01207 01208 rb_thread_t *th = GET_THREAD(); 01209 int saved_errno = 0; 01210 01211 th->waiting_fd = -1; 01212 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { 01213 ubf = ubf_select; 01214 data2 = th; 01215 } 01216 01217 BLOCKING_REGION({ 01218 val = func(data1); 01219 saved_errno = errno; 01220 }, ubf, data2, fail_if_interrupted); 01221 01222 if (!fail_if_interrupted) { 01223 RUBY_VM_CHECK_INTS_BLOCKING(th); 01224 } 01225 01226 errno = saved_errno; 01227 01228 return val; 01229 } 01230 01231 /* 01232 * rb_thread_call_without_gvl - permit concurrent/parallel execution. 01233 * rb_thread_call_without_gvl2 - permit concurrent/parallel execution 01234 * without interrupt proceess. 01235 * 01236 * rb_thread_call_without_gvl() does: 01237 * (1) Check interrupts. 01238 * (2) release GVL. 01239 * Other Ruby threads may run in parallel. 01240 * (3) call func with data1 01241 * (4) acquire GVL. 01242 * Other Ruby threads can not run in parallel any more. 01243 * (5) Check interrupts. 01244 * 01245 * rb_thread_call_without_gvl2() does: 01246 * (1) Check interrupt and return if interrupted. 01247 * (2) release GVL. 01248 * (3) call func with data1 and a pointer to the flags. 01249 * (4) acquire GVL. 01250 * 01251 * If another thread interrupts this thread (Thread#kill, signal delivery, 01252 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means 01253 * "un-blocking function"). `ubf()' should interrupt `func()' execution by 01254 * toggling a cancellation flag, canceling the invocation of a call inside 01255 * `func()' or similar. Note that `ubf()' may not be called with the GVL. 01256 * 01257 * There are built-in ubfs and you can specify these ubfs: 01258 * 01259 * * RUBY_UBF_IO: ubf for IO operation 01260 * * RUBY_UBF_PROCESS: ubf for process operation 01261 * 01262 * However, we can not guarantee our built-in ubfs interrupt your `func()' 01263 * correctly. Be careful to use rb_thread_call_without_gvl(). If you don't 01264 * provide proper ubf(), your program will not stop for Control+C or other 01265 * shutdown events. 01266 * 01267 * "Check interrupts" on above list means that check asynchronous 01268 * interrupt events (such as Thread#kill, signal delivery, VM-shutdown 01269 * request, and so on) and call corresponding procedures 01270 * (such as `trap' for signals, raise an exception for Thread#raise). 01271 * If `func()' finished and receive interrupts, you may skip interrupt 01272 * checking. For example, assume the following func() it read data from file. 01273 * 01274 * read_func(...) { 01275 * // (a) before read 01276 * read(buffer); // (b) reading 01277 * // (c) after read 01278 * } 01279 * 01280 * If an interrupt occurs at (a) or (b), then `ubf()' cancels this 01281 * `read_func()' and interrupts are checked. However, if an interrupt occurs 01282 * at (c), after *read* operation is completed, check intterrupts is harmful 01283 * because it causes irrevocable side-effect, the read data will vanish. To 01284 * avoid such problem, the `read_func()' should be used with 01285 * `rb_thread_call_without_gvl2()'. 01286 * 01287 * If `rb_thread_call_without_gvl2()' detects interrupt, return its execution 01288 * immediately. This function does not show when the execution was interrupted. 01289 * For example, there are 4 possible timing (a), (b), (c) and before calling 01290 * read_func(). You need to record progress of a read_func() and check 01291 * the progress after `rb_thread_call_without_gvl2()'. You may need to call 01292 * `rb_thread_check_ints()' correctly or your program can not process proper 01293 * process such as `trap' and so on. 01294 * 01295 * NOTE: You can not execute most of Ruby C API and touch Ruby 01296 * objects in `func()' and `ubf()', including raising an 01297 * exception, because current thread doesn't acquire GVL 01298 * (it causes synchronization problems). If you need to 01299 * call ruby functions either use rb_thread_call_with_gvl() 01300 * or read source code of C APIs and confirm safety by 01301 * yourself. 01302 * 01303 * NOTE: In short, this API is difficult to use safely. I recommend you 01304 * use other ways if you have. We lack experiences to use this API. 01305 * Please report your problem related on it. 01306 * 01307 * NOTE: Releasing GVL and re-acquiring GVL may be expensive operations 01308 * for a short running `func()'. Be sure to benchmark and use this 01309 * mechanism when `func()' consumes enough time. 01310 * 01311 * Safe C API: 01312 * * rb_thread_interrupted() - check interrupt flag 01313 * * ruby_xmalloc(), ruby_xrealloc(), ruby_xfree() - 01314 * they will work without GVL, and may acquire GVL when GC is needed. 01315 */ 01316 void * 01317 rb_thread_call_without_gvl2(void *(*func)(void *), void *data1, 01318 rb_unblock_function_t *ubf, void *data2) 01319 { 01320 return call_without_gvl(func, data1, ubf, data2, TRUE); 01321 } 01322 01323 void * 01324 rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, 01325 rb_unblock_function_t *ubf, void *data2) 01326 { 01327 return call_without_gvl(func, data1, ubf, data2, FALSE); 01328 } 01329 01330 VALUE 01331 rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) 01332 { 01333 VALUE val = Qundef; /* shouldn't be used */ 01334 rb_thread_t *th = GET_THREAD(); 01335 int saved_errno = 0; 01336 int state; 01337 01338 th->waiting_fd = fd; 01339 01340 TH_PUSH_TAG(th); 01341 if ((state = EXEC_TAG()) == 0) { 01342 BLOCKING_REGION({ 01343 val = func(data1); 01344 saved_errno = errno; 01345 }, ubf_select, th, FALSE); 01346 } 01347 TH_POP_TAG(); 01348 01349 /* clear waitinf_fd anytime */ 01350 th->waiting_fd = -1; 01351 01352 if (state) { 01353 JUMP_TAG(state); 01354 } 01355 /* TODO: check func() */ 01356 RUBY_VM_CHECK_INTS_BLOCKING(th); 01357 01358 errno = saved_errno; 01359 01360 return val; 01361 } 01362 01363 VALUE 01364 rb_thread_blocking_region( 01365 rb_blocking_function_t *func, void *data1, 01366 rb_unblock_function_t *ubf, void *data2) 01367 { 01368 void *(*f)(void*) = (void *(*)(void*))func; 01369 return (VALUE)rb_thread_call_without_gvl(f, data1, ubf, data2); 01370 } 01371 01372 /* 01373 * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release. 01374 * 01375 * After releasing GVL using rb_thread_blocking_region() or 01376 * rb_thread_call_without_gvl() you can not access Ruby values or invoke 01377 * methods. If you need to access Ruby you must use this function 01378 * rb_thread_call_with_gvl(). 01379 * 01380 * This function rb_thread_call_with_gvl() does: 01381 * (1) acquire GVL. 01382 * (2) call passed function `func'. 01383 * (3) release GVL. 01384 * (4) return a value which is returned at (2). 01385 * 01386 * NOTE: You should not return Ruby object at (2) because such Object 01387 * will not marked. 01388 * 01389 * NOTE: If an exception is raised in `func', this function DOES NOT 01390 * protect (catch) the exception. If you have any resources 01391 * which should free before throwing exception, you need use 01392 * rb_protect() in `func' and return a value which represents 01393 * exception is raised. 01394 * 01395 * NOTE: This function should not be called by a thread which was not 01396 * created as Ruby thread (created by Thread.new or so). In other 01397 * words, this function *DOES NOT* associate or convert a NON-Ruby 01398 * thread to a Ruby thread. 01399 */ 01400 void * 01401 rb_thread_call_with_gvl(void *(*func)(void *), void *data1) 01402 { 01403 rb_thread_t *th = ruby_thread_from_native(); 01404 struct rb_blocking_region_buffer *brb; 01405 struct rb_unblock_callback prev_unblock; 01406 void *r; 01407 01408 if (th == 0) { 01409 /* Error is occurred, but we can't use rb_bug() 01410 * because this thread is not Ruby's thread. 01411 * What should we do? 01412 */ 01413 01414 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); 01415 exit(EXIT_FAILURE); 01416 } 01417 01418 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer; 01419 prev_unblock = th->unblock; 01420 01421 if (brb == 0) { 01422 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); 01423 } 01424 01425 blocking_region_end(th, brb); 01426 /* enter to Ruby world: You can access Ruby values, methods and so on. */ 01427 r = (*func)(data1); 01428 /* leave from Ruby world: You can not access Ruby values, etc. */ 01429 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); 01430 return r; 01431 } 01432 01433 /* 01434 * ruby_thread_has_gvl_p - check if current native thread has GVL. 01435 * 01436 *** 01437 *** This API is EXPERIMENTAL! 01438 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 01439 *** 01440 */ 01441 01442 int 01443 ruby_thread_has_gvl_p(void) 01444 { 01445 rb_thread_t *th = ruby_thread_from_native(); 01446 01447 if (th && th->blocking_region_buffer == 0) { 01448 return 1; 01449 } 01450 else { 01451 return 0; 01452 } 01453 } 01454 01455 /* 01456 * call-seq: 01457 * Thread.pass -> nil 01458 * 01459 * Give the thread scheduler a hint to pass execution to another thread. 01460 * A running thread may or may not switch, it depends on OS and processor. 01461 */ 01462 01463 static VALUE 01464 thread_s_pass(VALUE klass) 01465 { 01466 rb_thread_schedule(); 01467 return Qnil; 01468 } 01469 01470 /*****************************************************/ 01471 01472 /* 01473 * rb_threadptr_pending_interrupt_* - manage asynchronous error queue 01474 * 01475 * Async events such as an exception throwed by Thread#raise, 01476 * Thread#kill and thread termination (after main thread termination) 01477 * will be queued to th->pending_interrupt_queue. 01478 * - clear: clear the queue. 01479 * - enque: enque err object into queue. 01480 * - deque: deque err object from queue. 01481 * - active_p: return 1 if the queue should be checked. 01482 * 01483 * All rb_threadptr_pending_interrupt_* functions are called by 01484 * a GVL acquired thread, of course. 01485 * Note that all "rb_" prefix APIs need GVL to call. 01486 */ 01487 01488 void 01489 rb_threadptr_pending_interrupt_clear(rb_thread_t *th) 01490 { 01491 rb_ary_clear(th->pending_interrupt_queue); 01492 } 01493 01494 void 01495 rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v) 01496 { 01497 rb_ary_push(th->pending_interrupt_queue, v); 01498 th->pending_interrupt_queue_checked = 0; 01499 } 01500 01501 enum handle_interrupt_timing { 01502 INTERRUPT_NONE, 01503 INTERRUPT_IMMEDIATE, 01504 INTERRUPT_ON_BLOCKING, 01505 INTERRUPT_NEVER 01506 }; 01507 01508 static enum handle_interrupt_timing 01509 rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) 01510 { 01511 VALUE mask; 01512 long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack); 01513 VALUE *mask_stack = RARRAY_PTR(th->pending_interrupt_mask_stack); 01514 VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ 01515 long ancestors_len = RARRAY_LEN(ancestors); 01516 VALUE *ancestors_ptr = RARRAY_PTR(ancestors); 01517 int i, j; 01518 01519 for (i=0; i<mask_stack_len; i++) { 01520 mask = mask_stack[mask_stack_len-(i+1)]; 01521 01522 for (j=0; j<ancestors_len; j++) { 01523 VALUE klass = ancestors_ptr[j]; 01524 VALUE sym; 01525 01526 /* TODO: remove rb_intern() */ 01527 if ((sym = rb_hash_aref(mask, klass)) != Qnil) { 01528 if (sym == sym_immediate) { 01529 return INTERRUPT_IMMEDIATE; 01530 } 01531 else if (sym == sym_on_blocking) { 01532 return INTERRUPT_ON_BLOCKING; 01533 } 01534 else if (sym == sym_never) { 01535 return INTERRUPT_NEVER; 01536 } 01537 else { 01538 rb_raise(rb_eThreadError, "unknown mask signature"); 01539 } 01540 } 01541 } 01542 /* try to next mask */ 01543 } 01544 return INTERRUPT_NONE; 01545 } 01546 01547 static int 01548 rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th) 01549 { 01550 return RARRAY_LEN(th->pending_interrupt_queue) == 0; 01551 } 01552 01553 static int 01554 rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err) 01555 { 01556 int i; 01557 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) { 01558 VALUE e = RARRAY_PTR(th->pending_interrupt_queue)[i]; 01559 if (rb_class_inherited_p(e, err)) { 01560 return TRUE; 01561 } 01562 } 01563 return FALSE; 01564 } 01565 01566 static VALUE 01567 rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing) 01568 { 01569 #if 1 /* 1 to enable Thread#handle_interrupt, 0 to ignore it */ 01570 int i; 01571 01572 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) { 01573 VALUE err = RARRAY_PTR(th->pending_interrupt_queue)[i]; 01574 01575 enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err)); 01576 01577 switch (mask_timing) { 01578 case INTERRUPT_ON_BLOCKING: 01579 if (timing != INTERRUPT_ON_BLOCKING) { 01580 break; 01581 } 01582 /* fall through */ 01583 case INTERRUPT_NONE: /* default: IMMEDIATE */ 01584 case INTERRUPT_IMMEDIATE: 01585 rb_ary_delete_at(th->pending_interrupt_queue, i); 01586 return err; 01587 case INTERRUPT_NEVER: 01588 break; 01589 } 01590 } 01591 01592 th->pending_interrupt_queue_checked = 1; 01593 return Qundef; 01594 #else 01595 VALUE err = rb_ary_shift(th->pending_interrupt_queue); 01596 if (rb_threadptr_pending_interrupt_empty_p(th)) { 01597 th->pending_interrupt_queue_checked = 1; 01598 } 01599 return err; 01600 #endif 01601 } 01602 01603 int 01604 rb_threadptr_pending_interrupt_active_p(rb_thread_t *th) 01605 { 01606 /* 01607 * For optimization, we don't check async errinfo queue 01608 * if it nor a thread interrupt mask were not changed 01609 * since last check. 01610 */ 01611 if (th->pending_interrupt_queue_checked) { 01612 return 0; 01613 } 01614 01615 if (rb_threadptr_pending_interrupt_empty_p(th)) { 01616 return 0; 01617 } 01618 01619 return 1; 01620 } 01621 01622 static int 01623 handle_interrupt_arg_check_i(VALUE key, VALUE val) 01624 { 01625 if (val != sym_immediate && val != sym_on_blocking && val != sym_never) { 01626 rb_raise(rb_eArgError, "unknown mask signature"); 01627 } 01628 01629 return ST_CONTINUE; 01630 } 01631 01632 /* 01633 * call-seq: 01634 * Thread.handle_interrupt(hash) { ... } -> result of the block 01635 * 01636 * Changes asynchronous interrupt timing. 01637 * 01638 * _interrupt_ means asynchronous event and corresponding procedure 01639 * by Thread#raise, Thread#kill, signal trap (not supported yet) 01640 * and main thread termination (if main thread terminates, then all 01641 * other thread will be killed). 01642 * 01643 * The given +hash+ has pairs like <code>ExceptionClass => 01644 * :TimingSymbol</code>. Where the ExceptionClass is the interrupt handled by 01645 * the given block. The TimingSymbol can be one of the following symbols: 01646 * 01647 * [+:immediate+] Invoke interrupts immediately. 01648 * [+:on_blocking+] Invoke interrupts while _BlockingOperation_. 01649 * [+:never+] Never invoke all interrupts. 01650 * 01651 * _BlockingOperation_ means that the operation will block the calling thread, 01652 * such as read and write. On CRuby implementation, _BlockingOperation_ is any 01653 * operation executed without GVL. 01654 * 01655 * Masked asynchronous interrupts are delayed until they are enabled. 01656 * This method is similar to sigprocmask(3). 01657 * 01658 * === NOTE 01659 * 01660 * Asynchronous interrupts are difficult to use. 01661 * 01662 * If you need to communicate between threads, please consider to use another way such as Queue. 01663 * 01664 * Or use them with deep understanding about this method. 01665 * 01666 * === Usage 01667 * 01668 * In this example, we can guard from Thread#raise exceptions. 01669 * 01670 * Using the +:never+ TimingSymbol the RuntimeError exception will always be 01671 * ignored in the first block of the main thread. In the second 01672 * ::handle_interrupt block we can purposefully handle RuntimeError exceptions. 01673 * 01674 * th = Thread.new do 01675 * Thead.handle_interrupt(RuntimeError => :never) { 01676 * begin 01677 * # You can write resource allocation code safely. 01678 * Thread.handle_interrupt(RuntimeError => :immediate) { 01679 * # ... 01680 * } 01681 * ensure 01682 * # You can write resource deallocation code safely. 01683 * end 01684 * } 01685 * end 01686 * Thread.pass 01687 * # ... 01688 * th.raise "stop" 01689 * 01690 * While we are ignoring the RuntimeError exception, it's safe to write our 01691 * resource allocation code. Then, the ensure block is where we can safely 01692 * deallocate your resources. 01693 * 01694 * ==== Guarding from TimeoutError 01695 * 01696 * In the next example, we will guard from the TimeoutError exception. This 01697 * will help prevent from leaking resources when TimeoutError exceptions occur 01698 * during normal ensure clause. For this example we use the help of the 01699 * standard library Timeout, from lib/timeout.rb 01700 * 01701 * require 'timeout' 01702 * Thread.handle_interrupt(TimeoutError => :never) { 01703 * timeout(10){ 01704 * # TimeoutError doesn't occur here 01705 * Thread.handle_interrupt(TimeoutError => :on_blocking) { 01706 * # possible to be killed by TimeoutError 01707 * # while blocking operation 01708 * } 01709 * # TimeoutError doesn't occur here 01710 * } 01711 * } 01712 * 01713 * In the first part of the +timeout+ block, we can rely on TimeoutError being 01714 * ignored. Then in the <code>TimeoutError => :on_blocking</code> block, any 01715 * operation that will block the calling thread is susceptible to a 01716 * TimeoutError exception being raised. 01717 * 01718 * ==== Stack control settings 01719 * 01720 * It's possible to stack multiple levels of ::handle_interrupt blocks in order 01721 * to control more than one ExceptionClass and TimingSymbol at a time. 01722 * 01723 * Thread.handle_interrupt(FooError => :never) { 01724 * Thread.handle_interrupt(BarError => :never) { 01725 * # FooError and BarError are prohibited. 01726 * } 01727 * } 01728 * 01729 * ==== Inheritance with ExceptionClass 01730 * 01731 * All exceptions inherited from the ExceptionClass parameter will be considered. 01732 * 01733 * Thread.handle_interrupt(Exception => :never) { 01734 * # all exceptions inherited from Exception are prohibited. 01735 * } 01736 * 01737 */ 01738 static VALUE 01739 rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) 01740 { 01741 VALUE mask; 01742 rb_thread_t *th = GET_THREAD(); 01743 VALUE r = Qnil; 01744 int state; 01745 01746 if (!rb_block_given_p()) { 01747 rb_raise(rb_eArgError, "block is needed."); 01748 } 01749 01750 mask = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"); 01751 rb_hash_foreach(mask, handle_interrupt_arg_check_i, 0); 01752 rb_ary_push(th->pending_interrupt_mask_stack, mask); 01753 if (!rb_threadptr_pending_interrupt_empty_p(th)) { 01754 th->pending_interrupt_queue_checked = 0; 01755 RUBY_VM_SET_INTERRUPT(th); 01756 } 01757 01758 TH_PUSH_TAG(th); 01759 if ((state = EXEC_TAG()) == 0) { 01760 r = rb_yield(Qnil); 01761 } 01762 TH_POP_TAG(); 01763 01764 rb_ary_pop(th->pending_interrupt_mask_stack); 01765 if (!rb_threadptr_pending_interrupt_empty_p(th)) { 01766 th->pending_interrupt_queue_checked = 0; 01767 RUBY_VM_SET_INTERRUPT(th); 01768 } 01769 01770 RUBY_VM_CHECK_INTS(th); 01771 01772 if (state) { 01773 JUMP_TAG(state); 01774 } 01775 01776 return r; 01777 } 01778 01779 /* 01780 * call-seq: 01781 * target_thread.pending_interrupt?(error = nil) -> true/false 01782 * 01783 * Returns whether or not the asychronous queue is empty for the target thread. 01784 * 01785 * If +error+ is given, then check only for +error+ type deferred events. 01786 * 01787 * See ::pending_interrupt? for more information. 01788 */ 01789 static VALUE 01790 rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread) 01791 { 01792 rb_thread_t *target_th; 01793 01794 GetThreadPtr(target_thread, target_th); 01795 01796 if (rb_threadptr_pending_interrupt_empty_p(target_th)) { 01797 return Qfalse; 01798 } 01799 else { 01800 if (argc == 1) { 01801 VALUE err; 01802 rb_scan_args(argc, argv, "01", &err); 01803 if (!rb_obj_is_kind_of(err, rb_cModule)) { 01804 rb_raise(rb_eTypeError, "class or module required for rescue clause"); 01805 } 01806 if (rb_threadptr_pending_interrupt_include_p(target_th, err)) { 01807 return Qtrue; 01808 } 01809 else { 01810 return Qfalse; 01811 } 01812 } 01813 return Qtrue; 01814 } 01815 } 01816 01817 /* 01818 * call-seq: 01819 * Thread.pending_interrupt?(error = nil) -> true/false 01820 * 01821 * Returns whether or not the asynchronous queue is empty. 01822 * 01823 * Since Thread::handle_interrupt can be used to defer asynchronous events. 01824 * This method can be used to determine if there are any deferred events. 01825 * 01826 * If you find this method returns true, then you may finish +:never+ blocks. 01827 * 01828 * For example, the following method processes deferred asynchronous events 01829 * immediately. 01830 * 01831 * def Thread.kick_interrupt_immediately 01832 * Thread.handle_interrupt(Object => :immediate) { 01833 * Thread.pass 01834 * } 01835 * end 01836 * 01837 * If +error+ is given, then check only for +error+ type deferred events. 01838 * 01839 * === Usage 01840 * 01841 * th = Thread.new{ 01842 * Thread.handle_interrupt(RuntimeError => :on_blocking){ 01843 * while true 01844 * ... 01845 * # reach safe point to invoke interrupt 01846 * if Thread.pending_interrupt? 01847 * Thread.handle_interrupt(Object => :immediate){} 01848 * end 01849 * ... 01850 * end 01851 * } 01852 * } 01853 * ... 01854 * th.raise # stop thread 01855 * 01856 * This example can also be written as the following, which you should use to 01857 * avoid asynchronous interrupts. 01858 * 01859 * flag = true 01860 * th = Thread.new{ 01861 * Thread.handle_interrupt(RuntimeError => :on_blocking){ 01862 * while true 01863 * ... 01864 * # reach safe point to invoke interrupt 01865 * break if flag == false 01866 * ... 01867 * end 01868 * } 01869 * } 01870 * ... 01871 * flag = false # stop thread 01872 */ 01873 01874 static VALUE 01875 rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self) 01876 { 01877 return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self); 01878 } 01879 01880 static void 01881 rb_threadptr_to_kill(rb_thread_t *th) 01882 { 01883 rb_threadptr_pending_interrupt_clear(th); 01884 th->status = THREAD_RUNNABLE; 01885 th->to_kill = 1; 01886 th->errinfo = INT2FIX(TAG_FATAL); 01887 TH_JUMP_TAG(th, TAG_FATAL); 01888 } 01889 01890 void 01891 rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) 01892 { 01893 if (th->raised_flag) return; 01894 01895 while (1) { 01896 rb_atomic_t interrupt; 01897 rb_atomic_t old; 01898 int sig; 01899 int timer_interrupt; 01900 int pending_interrupt; 01901 int finalizer_interrupt; 01902 int trap_interrupt; 01903 01904 do { 01905 interrupt = th->interrupt_flag; 01906 old = ATOMIC_CAS(th->interrupt_flag, interrupt, interrupt & th->interrupt_mask); 01907 } while (old != interrupt); 01908 01909 interrupt &= (rb_atomic_t)~th->interrupt_mask; 01910 if (!interrupt) 01911 return; 01912 01913 timer_interrupt = interrupt & TIMER_INTERRUPT_MASK; 01914 pending_interrupt = interrupt & PENDING_INTERRUPT_MASK; 01915 finalizer_interrupt = interrupt & FINALIZER_INTERRUPT_MASK; 01916 trap_interrupt = interrupt & TRAP_INTERRUPT_MASK; 01917 01918 /* signal handling */ 01919 if (trap_interrupt && (th == th->vm->main_thread)) { 01920 enum rb_thread_status prev_status = th->status; 01921 th->status = THREAD_RUNNABLE; 01922 while ((sig = rb_get_next_signal()) != 0) { 01923 rb_signal_exec(th, sig); 01924 } 01925 th->status = prev_status; 01926 } 01927 01928 /* exception from another thread */ 01929 if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) { 01930 VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); 01931 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); 01932 01933 if (err == Qundef) { 01934 /* no error */ 01935 } 01936 else if (err == eKillSignal /* Thread#kill receieved */ || 01937 err == eTerminateSignal /* Terminate thread */ || 01938 err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { 01939 rb_threadptr_to_kill(th); 01940 } 01941 else { 01942 /* set runnable if th was slept. */ 01943 if (th->status == THREAD_STOPPED || 01944 th->status == THREAD_STOPPED_FOREVER) 01945 th->status = THREAD_RUNNABLE; 01946 rb_exc_raise(err); 01947 } 01948 } 01949 01950 if (finalizer_interrupt) { 01951 rb_gc_finalize_deferred(); 01952 } 01953 01954 if (timer_interrupt) { 01955 unsigned long limits_us = TIME_QUANTUM_USEC; 01956 01957 if (th->priority > 0) 01958 limits_us <<= th->priority; 01959 else 01960 limits_us >>= -th->priority; 01961 01962 if (th->status == THREAD_RUNNABLE) 01963 th->running_time_us += TIME_QUANTUM_USEC; 01964 01965 EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0, Qundef); 01966 01967 rb_thread_schedule_limits(limits_us); 01968 } 01969 } 01970 } 01971 01972 void 01973 rb_thread_execute_interrupts(VALUE thval) 01974 { 01975 rb_thread_t *th; 01976 GetThreadPtr(thval, th); 01977 rb_threadptr_execute_interrupts(th, 1); 01978 } 01979 01980 static void 01981 rb_threadptr_ready(rb_thread_t *th) 01982 { 01983 rb_threadptr_interrupt(th); 01984 } 01985 01986 static VALUE 01987 rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) 01988 { 01989 VALUE exc; 01990 01991 if (rb_threadptr_dead(th)) { 01992 return Qnil; 01993 } 01994 01995 if (argc == 0) { 01996 exc = rb_exc_new(rb_eRuntimeError, 0, 0); 01997 } 01998 else { 01999 exc = rb_make_exception(argc, argv); 02000 } 02001 rb_threadptr_pending_interrupt_enque(th, exc); 02002 rb_threadptr_interrupt(th); 02003 return Qnil; 02004 } 02005 02006 void 02007 rb_threadptr_signal_raise(rb_thread_t *th, int sig) 02008 { 02009 VALUE argv[2]; 02010 02011 argv[0] = rb_eSignal; 02012 argv[1] = INT2FIX(sig); 02013 rb_threadptr_raise(th->vm->main_thread, 2, argv); 02014 } 02015 02016 void 02017 rb_threadptr_signal_exit(rb_thread_t *th) 02018 { 02019 VALUE argv[2]; 02020 02021 argv[0] = rb_eSystemExit; 02022 argv[1] = rb_str_new2("exit"); 02023 rb_threadptr_raise(th->vm->main_thread, 2, argv); 02024 } 02025 02026 #if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK) 02027 #define USE_SIGALTSTACK 02028 #endif 02029 02030 void 02031 ruby_thread_stack_overflow(rb_thread_t *th) 02032 { 02033 th->raised_flag = 0; 02034 #ifdef USE_SIGALTSTACK 02035 rb_exc_raise(sysstack_error); 02036 #else 02037 th->errinfo = sysstack_error; 02038 TH_JUMP_TAG(th, TAG_RAISE); 02039 #endif 02040 } 02041 02042 int 02043 rb_threadptr_set_raised(rb_thread_t *th) 02044 { 02045 if (th->raised_flag & RAISED_EXCEPTION) { 02046 return 1; 02047 } 02048 th->raised_flag |= RAISED_EXCEPTION; 02049 return 0; 02050 } 02051 02052 int 02053 rb_threadptr_reset_raised(rb_thread_t *th) 02054 { 02055 if (!(th->raised_flag & RAISED_EXCEPTION)) { 02056 return 0; 02057 } 02058 th->raised_flag &= ~RAISED_EXCEPTION; 02059 return 1; 02060 } 02061 02062 static int 02063 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) 02064 { 02065 int fd = (int)data; 02066 rb_thread_t *th; 02067 GetThreadPtr((VALUE)key, th); 02068 02069 if (th->waiting_fd == fd) { 02070 VALUE err = th->vm->special_exceptions[ruby_error_closed_stream]; 02071 rb_threadptr_pending_interrupt_enque(th, err); 02072 rb_threadptr_interrupt(th); 02073 } 02074 return ST_CONTINUE; 02075 } 02076 02077 void 02078 rb_thread_fd_close(int fd) 02079 { 02080 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd); 02081 } 02082 02083 /* 02084 * call-seq: 02085 * thr.raise 02086 * thr.raise(string) 02087 * thr.raise(exception [, string [, array]]) 02088 * 02089 * Raises an exception (see <code>Kernel::raise</code>) from <i>thr</i>. The 02090 * caller does not have to be <i>thr</i>. 02091 * 02092 * Thread.abort_on_exception = true 02093 * a = Thread.new { sleep(200) } 02094 * a.raise("Gotcha") 02095 * 02096 * <em>produces:</em> 02097 * 02098 * prog.rb:3: Gotcha (RuntimeError) 02099 * from prog.rb:2:in `initialize' 02100 * from prog.rb:2:in `new' 02101 * from prog.rb:2 02102 */ 02103 02104 static VALUE 02105 thread_raise_m(int argc, VALUE *argv, VALUE self) 02106 { 02107 rb_thread_t *target_th; 02108 rb_thread_t *th = GET_THREAD(); 02109 GetThreadPtr(self, target_th); 02110 rb_threadptr_raise(target_th, argc, argv); 02111 02112 /* To perform Thread.current.raise as Kernel.raise */ 02113 if (th == target_th) { 02114 RUBY_VM_CHECK_INTS(th); 02115 } 02116 return Qnil; 02117 } 02118 02119 02120 /* 02121 * call-seq: 02122 * thr.exit -> thr or nil 02123 * thr.kill -> thr or nil 02124 * thr.terminate -> thr or nil 02125 * 02126 * Terminates <i>thr</i> and schedules another thread to be run. If this thread 02127 * is already marked to be killed, <code>exit</code> returns the 02128 * <code>Thread</code>. If this is the main thread, or the last thread, exits 02129 * the process. 02130 */ 02131 02132 VALUE 02133 rb_thread_kill(VALUE thread) 02134 { 02135 rb_thread_t *th; 02136 02137 GetThreadPtr(thread, th); 02138 02139 if (th != GET_THREAD() && th->safe_level < 4) { 02140 rb_secure(4); 02141 } 02142 if (th->to_kill || th->status == THREAD_KILLED) { 02143 return thread; 02144 } 02145 if (th == th->vm->main_thread) { 02146 rb_exit(EXIT_SUCCESS); 02147 } 02148 02149 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); 02150 02151 if (th == GET_THREAD()) { 02152 /* kill myself immediately */ 02153 rb_threadptr_to_kill(th); 02154 } 02155 else { 02156 rb_threadptr_pending_interrupt_enque(th, eKillSignal); 02157 rb_threadptr_interrupt(th); 02158 } 02159 return thread; 02160 } 02161 02162 02163 /* 02164 * call-seq: 02165 * Thread.kill(thread) -> thread 02166 * 02167 * Causes the given <em>thread</em> to exit (see <code>Thread::exit</code>). 02168 * 02169 * count = 0 02170 * a = Thread.new { loop { count += 1 } } 02171 * sleep(0.1) #=> 0 02172 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead> 02173 * count #=> 93947 02174 * a.alive? #=> false 02175 */ 02176 02177 static VALUE 02178 rb_thread_s_kill(VALUE obj, VALUE th) 02179 { 02180 return rb_thread_kill(th); 02181 } 02182 02183 02184 /* 02185 * call-seq: 02186 * Thread.exit -> thread 02187 * 02188 * Terminates the currently running thread and schedules another thread to be 02189 * run. If this thread is already marked to be killed, <code>exit</code> 02190 * returns the <code>Thread</code>. If this is the main thread, or the last 02191 * thread, exit the process. 02192 */ 02193 02194 static VALUE 02195 rb_thread_exit(void) 02196 { 02197 rb_thread_t *th = GET_THREAD(); 02198 return rb_thread_kill(th->self); 02199 } 02200 02201 02202 /* 02203 * call-seq: 02204 * thr.wakeup -> thr 02205 * 02206 * Marks <i>thr</i> as eligible for scheduling (it may still remain blocked on 02207 * I/O, however). Does not invoke the scheduler (see <code>Thread#run</code>). 02208 * 02209 * c = Thread.new { Thread.stop; puts "hey!" } 02210 * sleep 0.1 while c.status!='sleep' 02211 * c.wakeup 02212 * c.join 02213 * 02214 * <em>produces:</em> 02215 * 02216 * hey! 02217 */ 02218 02219 VALUE 02220 rb_thread_wakeup(VALUE thread) 02221 { 02222 if (!RTEST(rb_thread_wakeup_alive(thread))) { 02223 rb_raise(rb_eThreadError, "killed thread"); 02224 } 02225 return thread; 02226 } 02227 02228 VALUE 02229 rb_thread_wakeup_alive(VALUE thread) 02230 { 02231 rb_thread_t *th; 02232 GetThreadPtr(thread, th); 02233 02234 if (th->status == THREAD_KILLED) { 02235 return Qnil; 02236 } 02237 rb_threadptr_ready(th); 02238 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 02239 th->status = THREAD_RUNNABLE; 02240 return thread; 02241 } 02242 02243 02244 /* 02245 * call-seq: 02246 * thr.run -> thr 02247 * 02248 * Wakes up <i>thr</i>, making it eligible for scheduling. 02249 * 02250 * a = Thread.new { puts "a"; Thread.stop; puts "c" } 02251 * sleep 0.1 while a.status!='sleep' 02252 * puts "Got here" 02253 * a.run 02254 * a.join 02255 * 02256 * <em>produces:</em> 02257 * 02258 * a 02259 * Got here 02260 * c 02261 */ 02262 02263 VALUE 02264 rb_thread_run(VALUE thread) 02265 { 02266 rb_thread_wakeup(thread); 02267 rb_thread_schedule(); 02268 return thread; 02269 } 02270 02271 02272 /* 02273 * call-seq: 02274 * Thread.stop -> nil 02275 * 02276 * Stops execution of the current thread, putting it into a ``sleep'' state, 02277 * and schedules execution of another thread. 02278 * 02279 * a = Thread.new { print "a"; Thread.stop; print "c" } 02280 * sleep 0.1 while a.status!='sleep' 02281 * print "b" 02282 * a.run 02283 * a.join 02284 * 02285 * <em>produces:</em> 02286 * 02287 * abc 02288 */ 02289 02290 VALUE 02291 rb_thread_stop(void) 02292 { 02293 if (rb_thread_alone()) { 02294 rb_raise(rb_eThreadError, 02295 "stopping only thread\n\tnote: use sleep to stop forever"); 02296 } 02297 rb_thread_sleep_deadly(); 02298 return Qnil; 02299 } 02300 02301 static int 02302 thread_list_i(st_data_t key, st_data_t val, void *data) 02303 { 02304 VALUE ary = (VALUE)data; 02305 rb_thread_t *th; 02306 GetThreadPtr((VALUE)key, th); 02307 02308 switch (th->status) { 02309 case THREAD_RUNNABLE: 02310 case THREAD_STOPPED: 02311 case THREAD_STOPPED_FOREVER: 02312 rb_ary_push(ary, th->self); 02313 default: 02314 break; 02315 } 02316 return ST_CONTINUE; 02317 } 02318 02319 /********************************************************************/ 02320 02321 /* 02322 * call-seq: 02323 * Thread.list -> array 02324 * 02325 * Returns an array of <code>Thread</code> objects for all threads that are 02326 * either runnable or stopped. 02327 * 02328 * Thread.new { sleep(200) } 02329 * Thread.new { 1000000.times {|i| i*i } } 02330 * Thread.new { Thread.stop } 02331 * Thread.list.each {|t| p t} 02332 * 02333 * <em>produces:</em> 02334 * 02335 * #<Thread:0x401b3e84 sleep> 02336 * #<Thread:0x401b3f38 run> 02337 * #<Thread:0x401b3fb0 sleep> 02338 * #<Thread:0x401bdf4c run> 02339 */ 02340 02341 VALUE 02342 rb_thread_list(void) 02343 { 02344 VALUE ary = rb_ary_new(); 02345 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary); 02346 return ary; 02347 } 02348 02349 VALUE 02350 rb_thread_current(void) 02351 { 02352 return GET_THREAD()->self; 02353 } 02354 02355 /* 02356 * call-seq: 02357 * Thread.current -> thread 02358 * 02359 * Returns the currently executing thread. 02360 * 02361 * Thread.current #=> #<Thread:0x401bdf4c run> 02362 */ 02363 02364 static VALUE 02365 thread_s_current(VALUE klass) 02366 { 02367 return rb_thread_current(); 02368 } 02369 02370 VALUE 02371 rb_thread_main(void) 02372 { 02373 return GET_THREAD()->vm->main_thread->self; 02374 } 02375 02376 /* 02377 * call-seq: 02378 * Thread.main -> thread 02379 * 02380 * Returns the main thread. 02381 */ 02382 02383 static VALUE 02384 rb_thread_s_main(VALUE klass) 02385 { 02386 return rb_thread_main(); 02387 } 02388 02389 02390 /* 02391 * call-seq: 02392 * Thread.abort_on_exception -> true or false 02393 * 02394 * Returns the status of the global ``abort on exception'' condition. The 02395 * default is <code>false</code>. When set to <code>true</code>, or if the 02396 * global <code>$DEBUG</code> flag is <code>true</code> (perhaps because the 02397 * command line option <code>-d</code> was specified) all threads will abort 02398 * (the process will <code>exit(0)</code>) if an exception is raised in any 02399 * thread. See also <code>Thread::abort_on_exception=</code>. 02400 */ 02401 02402 static VALUE 02403 rb_thread_s_abort_exc(void) 02404 { 02405 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; 02406 } 02407 02408 02409 /* 02410 * call-seq: 02411 * Thread.abort_on_exception= boolean -> true or false 02412 * 02413 * When set to <code>true</code>, all threads will abort if an exception is 02414 * raised. Returns the new state. 02415 * 02416 * Thread.abort_on_exception = true 02417 * t1 = Thread.new do 02418 * puts "In new thread" 02419 * raise "Exception from thread" 02420 * end 02421 * sleep(1) 02422 * puts "not reached" 02423 * 02424 * <em>produces:</em> 02425 * 02426 * In new thread 02427 * prog.rb:4: Exception from thread (RuntimeError) 02428 * from prog.rb:2:in `initialize' 02429 * from prog.rb:2:in `new' 02430 * from prog.rb:2 02431 */ 02432 02433 static VALUE 02434 rb_thread_s_abort_exc_set(VALUE self, VALUE val) 02435 { 02436 rb_secure(4); 02437 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val); 02438 return val; 02439 } 02440 02441 02442 /* 02443 * call-seq: 02444 * thr.abort_on_exception -> true or false 02445 * 02446 * Returns the status of the thread-local ``abort on exception'' condition for 02447 * <i>thr</i>. The default is <code>false</code>. See also 02448 * <code>Thread::abort_on_exception=</code>. 02449 */ 02450 02451 static VALUE 02452 rb_thread_abort_exc(VALUE thread) 02453 { 02454 rb_thread_t *th; 02455 GetThreadPtr(thread, th); 02456 return th->abort_on_exception ? Qtrue : Qfalse; 02457 } 02458 02459 02460 /* 02461 * call-seq: 02462 * thr.abort_on_exception= boolean -> true or false 02463 * 02464 * When set to <code>true</code>, causes all threads (including the main 02465 * program) to abort if an exception is raised in <i>thr</i>. The process will 02466 * effectively <code>exit(0)</code>. 02467 */ 02468 02469 static VALUE 02470 rb_thread_abort_exc_set(VALUE thread, VALUE val) 02471 { 02472 rb_thread_t *th; 02473 rb_secure(4); 02474 02475 GetThreadPtr(thread, th); 02476 th->abort_on_exception = RTEST(val); 02477 return val; 02478 } 02479 02480 02481 /* 02482 * call-seq: 02483 * thr.group -> thgrp or nil 02484 * 02485 * Returns the <code>ThreadGroup</code> which contains <i>thr</i>, or nil if 02486 * the thread is not a member of any group. 02487 * 02488 * Thread.main.group #=> #<ThreadGroup:0x4029d914> 02489 */ 02490 02491 VALUE 02492 rb_thread_group(VALUE thread) 02493 { 02494 rb_thread_t *th; 02495 VALUE group; 02496 GetThreadPtr(thread, th); 02497 group = th->thgroup; 02498 02499 if (!group) { 02500 group = Qnil; 02501 } 02502 return group; 02503 } 02504 02505 static const char * 02506 thread_status_name(rb_thread_t *th) 02507 { 02508 switch (th->status) { 02509 case THREAD_RUNNABLE: 02510 if (th->to_kill) 02511 return "aborting"; 02512 else 02513 return "run"; 02514 case THREAD_STOPPED: 02515 case THREAD_STOPPED_FOREVER: 02516 return "sleep"; 02517 case THREAD_KILLED: 02518 return "dead"; 02519 default: 02520 return "unknown"; 02521 } 02522 } 02523 02524 static int 02525 rb_threadptr_dead(rb_thread_t *th) 02526 { 02527 return th->status == THREAD_KILLED; 02528 } 02529 02530 02531 /* 02532 * call-seq: 02533 * thr.status -> string, false or nil 02534 * 02535 * Returns the status of <i>thr</i>: ``<code>sleep</code>'' if <i>thr</i> is 02536 * sleeping or waiting on I/O, ``<code>run</code>'' if <i>thr</i> is executing, 02537 * ``<code>aborting</code>'' if <i>thr</i> is aborting, <code>false</code> if 02538 * <i>thr</i> terminated normally, and <code>nil</code> if <i>thr</i> 02539 * terminated with an exception. 02540 * 02541 * a = Thread.new { raise("die now") } 02542 * b = Thread.new { Thread.stop } 02543 * c = Thread.new { Thread.exit } 02544 * d = Thread.new { sleep } 02545 * d.kill #=> #<Thread:0x401b3678 aborting> 02546 * a.status #=> nil 02547 * b.status #=> "sleep" 02548 * c.status #=> false 02549 * d.status #=> "aborting" 02550 * Thread.current.status #=> "run" 02551 */ 02552 02553 static VALUE 02554 rb_thread_status(VALUE thread) 02555 { 02556 rb_thread_t *th; 02557 GetThreadPtr(thread, th); 02558 02559 if (rb_threadptr_dead(th)) { 02560 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo) 02561 /* TODO */ ) { 02562 return Qnil; 02563 } 02564 return Qfalse; 02565 } 02566 return rb_str_new2(thread_status_name(th)); 02567 } 02568 02569 02570 /* 02571 * call-seq: 02572 * thr.alive? -> true or false 02573 * 02574 * Returns <code>true</code> if <i>thr</i> is running or sleeping. 02575 * 02576 * thr = Thread.new { } 02577 * thr.join #=> #<Thread:0x401b3fb0 dead> 02578 * Thread.current.alive? #=> true 02579 * thr.alive? #=> false 02580 */ 02581 02582 static VALUE 02583 rb_thread_alive_p(VALUE thread) 02584 { 02585 rb_thread_t *th; 02586 GetThreadPtr(thread, th); 02587 02588 if (rb_threadptr_dead(th)) 02589 return Qfalse; 02590 return Qtrue; 02591 } 02592 02593 /* 02594 * call-seq: 02595 * thr.stop? -> true or false 02596 * 02597 * Returns <code>true</code> if <i>thr</i> is dead or sleeping. 02598 * 02599 * a = Thread.new { Thread.stop } 02600 * b = Thread.current 02601 * a.stop? #=> true 02602 * b.stop? #=> false 02603 */ 02604 02605 static VALUE 02606 rb_thread_stop_p(VALUE thread) 02607 { 02608 rb_thread_t *th; 02609 GetThreadPtr(thread, th); 02610 02611 if (rb_threadptr_dead(th)) 02612 return Qtrue; 02613 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 02614 return Qtrue; 02615 return Qfalse; 02616 } 02617 02618 /* 02619 * call-seq: 02620 * thr.safe_level -> integer 02621 * 02622 * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe 02623 * levels can help when implementing sandboxes which run insecure code. 02624 * 02625 * thr = Thread.new { $SAFE = 3; sleep } 02626 * Thread.current.safe_level #=> 0 02627 * thr.safe_level #=> 3 02628 */ 02629 02630 static VALUE 02631 rb_thread_safe_level(VALUE thread) 02632 { 02633 rb_thread_t *th; 02634 GetThreadPtr(thread, th); 02635 02636 return INT2NUM(th->safe_level); 02637 } 02638 02639 /* 02640 * call-seq: 02641 * thr.inspect -> string 02642 * 02643 * Dump the name, id, and status of _thr_ to a string. 02644 */ 02645 02646 static VALUE 02647 rb_thread_inspect(VALUE thread) 02648 { 02649 const char *cname = rb_obj_classname(thread); 02650 rb_thread_t *th; 02651 const char *status; 02652 VALUE str; 02653 02654 GetThreadPtr(thread, th); 02655 status = thread_status_name(th); 02656 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status); 02657 OBJ_INFECT(str, thread); 02658 02659 return str; 02660 } 02661 02662 VALUE 02663 rb_thread_local_aref(VALUE thread, ID id) 02664 { 02665 rb_thread_t *th; 02666 st_data_t val; 02667 02668 GetThreadPtr(thread, th); 02669 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02670 rb_raise(rb_eSecurityError, "Insecure: thread locals"); 02671 } 02672 if (!th->local_storage) { 02673 return Qnil; 02674 } 02675 if (st_lookup(th->local_storage, id, &val)) { 02676 return (VALUE)val; 02677 } 02678 return Qnil; 02679 } 02680 02681 /* 02682 * call-seq: 02683 * thr[sym] -> obj or nil 02684 * 02685 * Attribute Reference---Returns the value of a fiber-local variable (current thread's root fiber 02686 * if not explicitely inside a Fiber), using either a symbol or a string name. 02687 * If the specified variable does not exist, returns <code>nil</code>. 02688 * 02689 * [ 02690 * Thread.new { Thread.current["name"] = "A" }, 02691 * Thread.new { Thread.current[:name] = "B" }, 02692 * Thread.new { Thread.current["name"] = "C" } 02693 * ].each do |th| 02694 * th.join 02695 * puts "#{th.inspect}: #{th[:name]}" 02696 * end 02697 * 02698 * <em>produces:</em> 02699 * 02700 * #<Thread:0x00000002a54220 dead>: A 02701 * #<Thread:0x00000002a541a8 dead>: B 02702 * #<Thread:0x00000002a54130 dead>: C 02703 * 02704 * Thread#[] and Thread#[]= are not thread-local but fiber-local. 02705 * This confusion did not exist in Ruby 1.8 because 02706 * fibers were only available since Ruby 1.9. 02707 * Ruby 1.9 chooses that the methods behaves fiber-local to save 02708 * following idiom for dynamic scope. 02709 * 02710 * def meth(newvalue) 02711 * begin 02712 * oldvalue = Thread.current[:name] 02713 * Thread.current[:name] = newvalue 02714 * yield 02715 * ensure 02716 * Thread.current[:name] = oldvalue 02717 * end 02718 * end 02719 * 02720 * The idiom may not work as dynamic scope if the methods are thread-local 02721 * and a given block switches fiber. 02722 * 02723 * f = Fiber.new { 02724 * meth(1) { 02725 * Fiber.yield 02726 * } 02727 * } 02728 * meth(2) { 02729 * f.resume 02730 * } 02731 * f.resume 02732 * p Thread.current[:name] 02733 * #=> nil if fiber-local 02734 * #=> 2 if thread-local (The value 2 is leaked to outside of meth method.) 02735 * 02736 * For thread-local variables, please see <code>Thread#thread_local_get</code> 02737 * and <code>Thread#thread_local_set</code>. 02738 * 02739 */ 02740 02741 static VALUE 02742 rb_thread_aref(VALUE thread, VALUE id) 02743 { 02744 return rb_thread_local_aref(thread, rb_to_id(id)); 02745 } 02746 02747 VALUE 02748 rb_thread_local_aset(VALUE thread, ID id, VALUE val) 02749 { 02750 rb_thread_t *th; 02751 GetThreadPtr(thread, th); 02752 02753 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02754 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 02755 } 02756 if (OBJ_FROZEN(thread)) { 02757 rb_error_frozen("thread locals"); 02758 } 02759 if (!th->local_storage) { 02760 th->local_storage = st_init_numtable(); 02761 } 02762 if (NIL_P(val)) { 02763 st_delete_wrap(th->local_storage, id); 02764 return Qnil; 02765 } 02766 st_insert(th->local_storage, id, val); 02767 return val; 02768 } 02769 02770 /* 02771 * call-seq: 02772 * thr[sym] = obj -> obj 02773 * 02774 * Attribute Assignment---Sets or creates the value of a fiber-local variable, 02775 * using either a symbol or a string. See also <code>Thread#[]</code>. For 02776 * thread-local variables, please see <code>Thread#thread_variable_set</code> 02777 * and <code>Thread#thread_variable_get</code>. 02778 */ 02779 02780 static VALUE 02781 rb_thread_aset(VALUE self, VALUE id, VALUE val) 02782 { 02783 return rb_thread_local_aset(self, rb_to_id(id), val); 02784 } 02785 02786 /* 02787 * call-seq: 02788 * thr.thread_variable_get(key) -> obj or nil 02789 * 02790 * Returns the value of a thread local variable that has been set. Note that 02791 * these are different than fiber local values. For fiber local values, 02792 * please see Thread#[] and Thread#[]=. 02793 * 02794 * Thread local values are carried along with threads, and do not respect 02795 * fibers. For example: 02796 * 02797 * Thread.new { 02798 * Thread.current.thread_variable_set("foo", "bar") # set a thread local 02799 * Thread.current["foo"] = "bar" # set a fiber local 02800 * 02801 * Fiber.new { 02802 * Fiber.yield [ 02803 * Thread.current.thread_variable_get("foo"), # get the thread local 02804 * Thread.current["foo"], # get the fiber local 02805 * ] 02806 * }.resume 02807 * }.join.value # => ['bar', nil] 02808 * 02809 * The value "bar" is returned for the thread local, where nil is returned 02810 * for the fiber local. The fiber is executed in the same thread, so the 02811 * thread local values are available. 02812 * 02813 * See also Thread#[] 02814 */ 02815 02816 static VALUE 02817 rb_thread_variable_get(VALUE thread, VALUE id) 02818 { 02819 VALUE locals; 02820 rb_thread_t *th; 02821 02822 GetThreadPtr(thread, th); 02823 02824 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02825 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 02826 } 02827 02828 locals = rb_iv_get(thread, "locals"); 02829 return rb_hash_aref(locals, ID2SYM(rb_to_id(id))); 02830 } 02831 02832 /* 02833 * call-seq: 02834 * thr.thread_variable_set(key, value) 02835 * 02836 * Sets a thread local with +key+ to +value+. Note that these are local to 02837 * threads, and not to fibers. Please see Thread#thread_variable_get and 02838 * Thread#[] for more information. 02839 */ 02840 02841 static VALUE 02842 rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) 02843 { 02844 VALUE locals; 02845 rb_thread_t *th; 02846 02847 GetThreadPtr(thread, th); 02848 02849 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 02850 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 02851 } 02852 if (OBJ_FROZEN(thread)) { 02853 rb_error_frozen("thread locals"); 02854 } 02855 02856 locals = rb_iv_get(thread, "locals"); 02857 return rb_hash_aset(locals, ID2SYM(rb_to_id(id)), val); 02858 } 02859 02860 /* 02861 * call-seq: 02862 * thr.key?(sym) -> true or false 02863 * 02864 * Returns <code>true</code> if the given string (or symbol) exists as a 02865 * fiber-local variable. 02866 * 02867 * me = Thread.current 02868 * me[:oliver] = "a" 02869 * me.key?(:oliver) #=> true 02870 * me.key?(:stanley) #=> false 02871 */ 02872 02873 static VALUE 02874 rb_thread_key_p(VALUE self, VALUE key) 02875 { 02876 rb_thread_t *th; 02877 ID id = rb_to_id(key); 02878 02879 GetThreadPtr(self, th); 02880 02881 if (!th->local_storage) { 02882 return Qfalse; 02883 } 02884 if (st_lookup(th->local_storage, id, 0)) { 02885 return Qtrue; 02886 } 02887 return Qfalse; 02888 } 02889 02890 static int 02891 thread_keys_i(ID key, VALUE value, VALUE ary) 02892 { 02893 rb_ary_push(ary, ID2SYM(key)); 02894 return ST_CONTINUE; 02895 } 02896 02897 static int 02898 vm_living_thread_num(rb_vm_t *vm) 02899 { 02900 return (int)vm->living_threads->num_entries; 02901 } 02902 02903 int 02904 rb_thread_alone(void) 02905 { 02906 int num = 1; 02907 if (GET_THREAD()->vm->living_threads) { 02908 num = vm_living_thread_num(GET_THREAD()->vm); 02909 thread_debug("rb_thread_alone: %d\n", num); 02910 } 02911 return num == 1; 02912 } 02913 02914 /* 02915 * call-seq: 02916 * thr.keys -> array 02917 * 02918 * Returns an an array of the names of the fiber-local variables (as Symbols). 02919 * 02920 * thr = Thread.new do 02921 * Thread.current[:cat] = 'meow' 02922 * Thread.current["dog"] = 'woof' 02923 * end 02924 * thr.join #=> #<Thread:0x401b3f10 dead> 02925 * thr.keys #=> [:dog, :cat] 02926 */ 02927 02928 static VALUE 02929 rb_thread_keys(VALUE self) 02930 { 02931 rb_thread_t *th; 02932 VALUE ary = rb_ary_new(); 02933 GetThreadPtr(self, th); 02934 02935 if (th->local_storage) { 02936 st_foreach(th->local_storage, thread_keys_i, ary); 02937 } 02938 return ary; 02939 } 02940 02941 static int 02942 keys_i(VALUE key, VALUE value, VALUE ary) 02943 { 02944 rb_ary_push(ary, key); 02945 return ST_CONTINUE; 02946 } 02947 02948 /* 02949 * call-seq: 02950 * thr.thread_variables -> array 02951 * 02952 * Returns an an array of the names of the thread-local variables (as Symbols). 02953 * 02954 * thr = Thread.new do 02955 * Thread.current.thread_variable_set(:cat, 'meow') 02956 * Thread.current.thread_variable_set("dog", 'woof') 02957 * end 02958 * thr.join #=> #<Thread:0x401b3f10 dead> 02959 * thr.thread_variables #=> [:dog, :cat] 02960 * 02961 * Note that these are not fiber local variables. Please see Thread#[] and 02962 * Thread#thread_variable_get for more details. 02963 */ 02964 02965 static VALUE 02966 rb_thread_variables(VALUE thread) 02967 { 02968 VALUE locals; 02969 VALUE ary; 02970 02971 locals = rb_iv_get(thread, "locals"); 02972 ary = rb_ary_new(); 02973 rb_hash_foreach(locals, keys_i, ary); 02974 02975 return ary; 02976 } 02977 02978 /* 02979 * call-seq: 02980 * thr.thread_variable?(key) -> true or false 02981 * 02982 * Returns <code>true</code> if the given string (or symbol) exists as a 02983 * thread-local variable. 02984 * 02985 * me = Thread.current 02986 * me.thread_variable_set(:oliver, "a") 02987 * me.thread_variable?(:oliver) #=> true 02988 * me.thread_variable?(:stanley) #=> false 02989 * 02990 * Note that these are not fiber local variables. Please see Thread#[] and 02991 * Thread#thread_variable_get for more details. 02992 */ 02993 02994 static VALUE 02995 rb_thread_variable_p(VALUE thread, VALUE key) 02996 { 02997 VALUE locals; 02998 02999 locals = rb_iv_get(thread, "locals"); 03000 03001 if (!RHASH(locals)->ntbl) 03002 return Qfalse; 03003 03004 if (st_lookup(RHASH(locals)->ntbl, ID2SYM(rb_to_id(key)), 0)) { 03005 return Qtrue; 03006 } 03007 03008 return Qfalse; 03009 } 03010 03011 /* 03012 * call-seq: 03013 * thr.priority -> integer 03014 * 03015 * Returns the priority of <i>thr</i>. Default is inherited from the 03016 * current thread which creating the new thread, or zero for the 03017 * initial main thread; higher-priority thread will run more frequently 03018 * than lower-priority threads (but lower-priority threads can also run). 03019 * 03020 * This is just hint for Ruby thread scheduler. It may be ignored on some 03021 * platform. 03022 * 03023 * Thread.current.priority #=> 0 03024 */ 03025 03026 static VALUE 03027 rb_thread_priority(VALUE thread) 03028 { 03029 rb_thread_t *th; 03030 GetThreadPtr(thread, th); 03031 return INT2NUM(th->priority); 03032 } 03033 03034 03035 /* 03036 * call-seq: 03037 * thr.priority= integer -> thr 03038 * 03039 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads 03040 * will run more frequently than lower-priority threads (but lower-priority 03041 * threads can also run). 03042 * 03043 * This is just hint for Ruby thread scheduler. It may be ignored on some 03044 * platform. 03045 * 03046 * count1 = count2 = 0 03047 * a = Thread.new do 03048 * loop { count1 += 1 } 03049 * end 03050 * a.priority = -1 03051 * 03052 * b = Thread.new do 03053 * loop { count2 += 1 } 03054 * end 03055 * b.priority = -2 03056 * sleep 1 #=> 1 03057 * count1 #=> 622504 03058 * count2 #=> 5832 03059 */ 03060 03061 static VALUE 03062 rb_thread_priority_set(VALUE thread, VALUE prio) 03063 { 03064 rb_thread_t *th; 03065 int priority; 03066 GetThreadPtr(thread, th); 03067 03068 rb_secure(4); 03069 03070 #if USE_NATIVE_THREAD_PRIORITY 03071 th->priority = NUM2INT(prio); 03072 native_thread_apply_priority(th); 03073 #else 03074 priority = NUM2INT(prio); 03075 if (priority > RUBY_THREAD_PRIORITY_MAX) { 03076 priority = RUBY_THREAD_PRIORITY_MAX; 03077 } 03078 else if (priority < RUBY_THREAD_PRIORITY_MIN) { 03079 priority = RUBY_THREAD_PRIORITY_MIN; 03080 } 03081 th->priority = priority; 03082 #endif 03083 return INT2NUM(th->priority); 03084 } 03085 03086 /* for IO */ 03087 03088 #if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) 03089 03090 /* 03091 * several Unix platforms support file descriptors bigger than FD_SETSIZE 03092 * in select(2) system call. 03093 * 03094 * - Linux 2.2.12 (?) 03095 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25) 03096 * select(2) documents how to allocate fd_set dynamically. 03097 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0 03098 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19) 03099 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4) 03100 * select(2) documents how to allocate fd_set dynamically. 03101 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4 03102 * - HP-UX documents how to allocate fd_set dynamically. 03103 * http://docs.hp.com/en/B2355-60105/select.2.html 03104 * - Solaris 8 has select_large_fdset 03105 * - Mac OS X 10.7 (Lion) 03106 * select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and 03107 * _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined. 03108 * http://developer.apple.com/library/mac/#releasenotes/Darwin/SymbolVariantsRelNotes/_index.html 03109 * 03110 * When fd_set is not big enough to hold big file descriptors, 03111 * it should be allocated dynamically. 03112 * Note that this assumes fd_set is structured as bitmap. 03113 * 03114 * rb_fd_init allocates the memory. 03115 * rb_fd_term free the memory. 03116 * rb_fd_set may re-allocates bitmap. 03117 * 03118 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE. 03119 */ 03120 03121 void 03122 rb_fd_init(rb_fdset_t *fds) 03123 { 03124 fds->maxfd = 0; 03125 fds->fdset = ALLOC(fd_set); 03126 FD_ZERO(fds->fdset); 03127 } 03128 03129 void 03130 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 03131 { 03132 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 03133 03134 if (size < sizeof(fd_set)) 03135 size = sizeof(fd_set); 03136 dst->maxfd = src->maxfd; 03137 dst->fdset = xmalloc(size); 03138 memcpy(dst->fdset, src->fdset, size); 03139 } 03140 03141 void 03142 rb_fd_term(rb_fdset_t *fds) 03143 { 03144 if (fds->fdset) xfree(fds->fdset); 03145 fds->maxfd = 0; 03146 fds->fdset = 0; 03147 } 03148 03149 void 03150 rb_fd_zero(rb_fdset_t *fds) 03151 { 03152 if (fds->fdset) 03153 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); 03154 } 03155 03156 static void 03157 rb_fd_resize(int n, rb_fdset_t *fds) 03158 { 03159 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); 03160 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); 03161 03162 if (m < sizeof(fd_set)) m = sizeof(fd_set); 03163 if (o < sizeof(fd_set)) o = sizeof(fd_set); 03164 03165 if (m > o) { 03166 fds->fdset = xrealloc(fds->fdset, m); 03167 memset((char *)fds->fdset + o, 0, m - o); 03168 } 03169 if (n >= fds->maxfd) fds->maxfd = n + 1; 03170 } 03171 03172 void 03173 rb_fd_set(int n, rb_fdset_t *fds) 03174 { 03175 rb_fd_resize(n, fds); 03176 FD_SET(n, fds->fdset); 03177 } 03178 03179 void 03180 rb_fd_clr(int n, rb_fdset_t *fds) 03181 { 03182 if (n >= fds->maxfd) return; 03183 FD_CLR(n, fds->fdset); 03184 } 03185 03186 int 03187 rb_fd_isset(int n, const rb_fdset_t *fds) 03188 { 03189 if (n >= fds->maxfd) return 0; 03190 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */ 03191 } 03192 03193 void 03194 rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) 03195 { 03196 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); 03197 03198 if (size < sizeof(fd_set)) size = sizeof(fd_set); 03199 dst->maxfd = max; 03200 dst->fdset = xrealloc(dst->fdset, size); 03201 memcpy(dst->fdset, src, size); 03202 } 03203 03204 static void 03205 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 03206 { 03207 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 03208 03209 if (size > sizeof(fd_set)) { 03210 rb_raise(rb_eArgError, "too large fdsets"); 03211 } 03212 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set)); 03213 } 03214 03215 void 03216 rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) 03217 { 03218 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 03219 03220 if (size < sizeof(fd_set)) 03221 size = sizeof(fd_set); 03222 dst->maxfd = src->maxfd; 03223 dst->fdset = xrealloc(dst->fdset, size); 03224 memcpy(dst->fdset, src->fdset, size); 03225 } 03226 03227 #ifdef __native_client__ 03228 int select(int nfds, fd_set *readfds, fd_set *writefds, 03229 fd_set *exceptfds, struct timeval *timeout); 03230 #endif 03231 03232 int 03233 rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) 03234 { 03235 fd_set *r = NULL, *w = NULL, *e = NULL; 03236 if (readfds) { 03237 rb_fd_resize(n - 1, readfds); 03238 r = rb_fd_ptr(readfds); 03239 } 03240 if (writefds) { 03241 rb_fd_resize(n - 1, writefds); 03242 w = rb_fd_ptr(writefds); 03243 } 03244 if (exceptfds) { 03245 rb_fd_resize(n - 1, exceptfds); 03246 e = rb_fd_ptr(exceptfds); 03247 } 03248 return select(n, r, w, e, timeout); 03249 } 03250 03251 #undef FD_ZERO 03252 #undef FD_SET 03253 #undef FD_CLR 03254 #undef FD_ISSET 03255 03256 #define FD_ZERO(f) rb_fd_zero(f) 03257 #define FD_SET(i, f) rb_fd_set((i), (f)) 03258 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 03259 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 03260 03261 #elif defined(_WIN32) 03262 03263 void 03264 rb_fd_init(rb_fdset_t *set) 03265 { 03266 set->capa = FD_SETSIZE; 03267 set->fdset = ALLOC(fd_set); 03268 FD_ZERO(set->fdset); 03269 } 03270 03271 void 03272 rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 03273 { 03274 rb_fd_init(dst); 03275 rb_fd_dup(dst, src); 03276 } 03277 03278 static void 03279 rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 03280 { 03281 int max = rb_fd_max(src); 03282 03283 /* we assume src is the result of select() with dst, so dst should be 03284 * larger or equal than src. */ 03285 if (max > FD_SETSIZE || (UINT)max > dst->fd_count) { 03286 rb_raise(rb_eArgError, "too large fdsets"); 03287 } 03288 03289 memcpy(dst->fd_array, src->fdset->fd_array, max); 03290 dst->fd_count = max; 03291 } 03292 03293 void 03294 rb_fd_term(rb_fdset_t *set) 03295 { 03296 xfree(set->fdset); 03297 set->fdset = NULL; 03298 set->capa = 0; 03299 } 03300 03301 void 03302 rb_fd_set(int fd, rb_fdset_t *set) 03303 { 03304 unsigned int i; 03305 SOCKET s = rb_w32_get_osfhandle(fd); 03306 03307 for (i = 0; i < set->fdset->fd_count; i++) { 03308 if (set->fdset->fd_array[i] == s) { 03309 return; 03310 } 03311 } 03312 if (set->fdset->fd_count >= (unsigned)set->capa) { 03313 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; 03314 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); 03315 } 03316 set->fdset->fd_array[set->fdset->fd_count++] = s; 03317 } 03318 03319 #undef FD_ZERO 03320 #undef FD_SET 03321 #undef FD_CLR 03322 #undef FD_ISSET 03323 03324 #define FD_ZERO(f) rb_fd_zero(f) 03325 #define FD_SET(i, f) rb_fd_set((i), (f)) 03326 #define FD_CLR(i, f) rb_fd_clr((i), (f)) 03327 #define FD_ISSET(i, f) rb_fd_isset((i), (f)) 03328 03329 #else 03330 #define rb_fd_rcopy(d, s) (*(d) = *(s)) 03331 #endif 03332 03333 static int 03334 do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, 03335 struct timeval *timeout) 03336 { 03337 int UNINITIALIZED_VAR(result); 03338 int lerrno; 03339 rb_fdset_t UNINITIALIZED_VAR(orig_read); 03340 rb_fdset_t UNINITIALIZED_VAR(orig_write); 03341 rb_fdset_t UNINITIALIZED_VAR(orig_except); 03342 double limit = 0; 03343 struct timeval wait_rest; 03344 rb_thread_t *th = GET_THREAD(); 03345 03346 if (timeout) { 03347 limit = timeofday(); 03348 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; 03349 wait_rest = *timeout; 03350 timeout = &wait_rest; 03351 } 03352 03353 if (read) 03354 rb_fd_init_copy(&orig_read, read); 03355 if (write) 03356 rb_fd_init_copy(&orig_write, write); 03357 if (except) 03358 rb_fd_init_copy(&orig_except, except); 03359 03360 retry: 03361 lerrno = 0; 03362 03363 BLOCKING_REGION({ 03364 result = native_fd_select(n, read, write, except, timeout, th); 03365 if (result < 0) lerrno = errno; 03366 }, ubf_select, th, FALSE); 03367 03368 RUBY_VM_CHECK_INTS_BLOCKING(th); 03369 03370 errno = lerrno; 03371 03372 if (result < 0) { 03373 switch (errno) { 03374 case EINTR: 03375 #ifdef ERESTART 03376 case ERESTART: 03377 #endif 03378 if (read) 03379 rb_fd_dup(read, &orig_read); 03380 if (write) 03381 rb_fd_dup(write, &orig_write); 03382 if (except) 03383 rb_fd_dup(except, &orig_except); 03384 03385 if (timeout) { 03386 double d = limit - timeofday(); 03387 03388 wait_rest.tv_sec = (time_t)d; 03389 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6); 03390 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0; 03391 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0; 03392 } 03393 03394 goto retry; 03395 default: 03396 break; 03397 } 03398 } 03399 03400 if (read) 03401 rb_fd_term(&orig_read); 03402 if (write) 03403 rb_fd_term(&orig_write); 03404 if (except) 03405 rb_fd_term(&orig_except); 03406 03407 return result; 03408 } 03409 03410 static void 03411 rb_thread_wait_fd_rw(int fd, int read) 03412 { 03413 int result = 0; 03414 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; 03415 03416 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); 03417 03418 if (fd < 0) { 03419 rb_raise(rb_eIOError, "closed stream"); 03420 } 03421 03422 result = rb_wait_for_single_fd(fd, events, NULL); 03423 if (result < 0) { 03424 rb_sys_fail(0); 03425 } 03426 03427 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); 03428 } 03429 03430 void 03431 rb_thread_wait_fd(int fd) 03432 { 03433 rb_thread_wait_fd_rw(fd, 1); 03434 } 03435 03436 int 03437 rb_thread_fd_writable(int fd) 03438 { 03439 rb_thread_wait_fd_rw(fd, 0); 03440 return TRUE; 03441 } 03442 03443 int 03444 rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except, 03445 struct timeval *timeout) 03446 { 03447 rb_fdset_t fdsets[3]; 03448 rb_fdset_t *rfds = NULL; 03449 rb_fdset_t *wfds = NULL; 03450 rb_fdset_t *efds = NULL; 03451 int retval; 03452 03453 if (read) { 03454 rfds = &fdsets[0]; 03455 rb_fd_init(rfds); 03456 rb_fd_copy(rfds, read, max); 03457 } 03458 if (write) { 03459 wfds = &fdsets[1]; 03460 rb_fd_init(wfds); 03461 rb_fd_copy(wfds, write, max); 03462 } 03463 if (except) { 03464 efds = &fdsets[2]; 03465 rb_fd_init(efds); 03466 rb_fd_copy(efds, except, max); 03467 } 03468 03469 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout); 03470 03471 if (rfds) { 03472 rb_fd_rcopy(read, rfds); 03473 rb_fd_term(rfds); 03474 } 03475 if (wfds) { 03476 rb_fd_rcopy(write, wfds); 03477 rb_fd_term(wfds); 03478 } 03479 if (efds) { 03480 rb_fd_rcopy(except, efds); 03481 rb_fd_term(efds); 03482 } 03483 03484 return retval; 03485 } 03486 03487 int 03488 rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, 03489 struct timeval *timeout) 03490 { 03491 if (!read && !write && !except) { 03492 if (!timeout) { 03493 rb_thread_sleep_forever(); 03494 return 0; 03495 } 03496 rb_thread_wait_for(*timeout); 03497 return 0; 03498 } 03499 03500 if (read) { 03501 rb_fd_resize(max - 1, read); 03502 } 03503 if (write) { 03504 rb_fd_resize(max - 1, write); 03505 } 03506 if (except) { 03507 rb_fd_resize(max - 1, except); 03508 } 03509 return do_select(max, read, write, except, timeout); 03510 } 03511 03512 /* 03513 * poll() is supported by many OSes, but so far Linux is the only 03514 * one we know of that supports using poll() in all places select() 03515 * would work. 03516 */ 03517 #if defined(HAVE_POLL) && defined(__linux__) 03518 # define USE_POLL 03519 #endif 03520 03521 #ifdef USE_POLL 03522 03523 /* The same with linux kernel. TODO: make platform independent definition. */ 03524 #define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR) 03525 #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) 03526 #define POLLEX_SET (POLLPRI) 03527 03528 #ifndef HAVE_PPOLL 03529 /* TODO: don't ignore sigmask */ 03530 int 03531 ppoll(struct pollfd *fds, nfds_t nfds, 03532 const struct timespec *ts, const sigset_t *sigmask) 03533 { 03534 int timeout_ms; 03535 03536 if (ts) { 03537 int tmp, tmp2; 03538 03539 if (ts->tv_sec > TIMET_MAX/1000) 03540 timeout_ms = -1; 03541 else { 03542 tmp = ts->tv_sec * 1000; 03543 tmp2 = ts->tv_nsec / (1000 * 1000); 03544 if (TIMET_MAX - tmp < tmp2) 03545 timeout_ms = -1; 03546 else 03547 timeout_ms = tmp + tmp2; 03548 } 03549 } 03550 else 03551 timeout_ms = -1; 03552 03553 return poll(fds, nfds, timeout_ms); 03554 } 03555 #endif 03556 03557 /* 03558 * returns a mask of events 03559 */ 03560 int 03561 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 03562 { 03563 struct pollfd fds; 03564 int result = 0, lerrno; 03565 double limit = 0; 03566 struct timespec ts; 03567 struct timespec *timeout = NULL; 03568 rb_thread_t *th = GET_THREAD(); 03569 03570 if (tv) { 03571 ts.tv_sec = tv->tv_sec; 03572 ts.tv_nsec = tv->tv_usec * 1000; 03573 limit = timeofday(); 03574 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; 03575 timeout = &ts; 03576 } 03577 03578 fds.fd = fd; 03579 fds.events = (short)events; 03580 03581 retry: 03582 lerrno = 0; 03583 BLOCKING_REGION({ 03584 result = ppoll(&fds, 1, timeout, NULL); 03585 if (result < 0) lerrno = errno; 03586 }, ubf_select, th, FALSE); 03587 03588 RUBY_VM_CHECK_INTS_BLOCKING(th); 03589 03590 if (result < 0) { 03591 errno = lerrno; 03592 switch (errno) { 03593 case EINTR: 03594 #ifdef ERESTART 03595 case ERESTART: 03596 #endif 03597 if (timeout) { 03598 double d = limit - timeofday(); 03599 03600 ts.tv_sec = (long)d; 03601 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9); 03602 if (ts.tv_sec < 0) 03603 ts.tv_sec = 0; 03604 if (ts.tv_nsec < 0) 03605 ts.tv_nsec = 0; 03606 } 03607 goto retry; 03608 } 03609 return -1; 03610 } 03611 03612 if (fds.revents & POLLNVAL) { 03613 errno = EBADF; 03614 return -1; 03615 } 03616 03617 /* 03618 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit. 03619 * Therefore we need fix it up. 03620 */ 03621 result = 0; 03622 if (fds.revents & POLLIN_SET) 03623 result |= RB_WAITFD_IN; 03624 if (fds.revents & POLLOUT_SET) 03625 result |= RB_WAITFD_OUT; 03626 if (fds.revents & POLLEX_SET) 03627 result |= RB_WAITFD_PRI; 03628 03629 return result; 03630 } 03631 #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ 03632 static rb_fdset_t * 03633 init_set_fd(int fd, rb_fdset_t *fds) 03634 { 03635 rb_fd_init(fds); 03636 rb_fd_set(fd, fds); 03637 03638 return fds; 03639 } 03640 03641 struct select_args { 03642 union { 03643 int fd; 03644 int error; 03645 } as; 03646 rb_fdset_t *read; 03647 rb_fdset_t *write; 03648 rb_fdset_t *except; 03649 struct timeval *tv; 03650 }; 03651 03652 static VALUE 03653 select_single(VALUE ptr) 03654 { 03655 struct select_args *args = (struct select_args *)ptr; 03656 int r; 03657 03658 r = rb_thread_fd_select(args->as.fd + 1, 03659 args->read, args->write, args->except, args->tv); 03660 if (r == -1) 03661 args->as.error = errno; 03662 if (r > 0) { 03663 r = 0; 03664 if (args->read && rb_fd_isset(args->as.fd, args->read)) 03665 r |= RB_WAITFD_IN; 03666 if (args->write && rb_fd_isset(args->as.fd, args->write)) 03667 r |= RB_WAITFD_OUT; 03668 if (args->except && rb_fd_isset(args->as.fd, args->except)) 03669 r |= RB_WAITFD_PRI; 03670 } 03671 return (VALUE)r; 03672 } 03673 03674 static VALUE 03675 select_single_cleanup(VALUE ptr) 03676 { 03677 struct select_args *args = (struct select_args *)ptr; 03678 03679 if (args->read) rb_fd_term(args->read); 03680 if (args->write) rb_fd_term(args->write); 03681 if (args->except) rb_fd_term(args->except); 03682 03683 return (VALUE)-1; 03684 } 03685 03686 int 03687 rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 03688 { 03689 rb_fdset_t rfds, wfds, efds; 03690 struct select_args args; 03691 int r; 03692 VALUE ptr = (VALUE)&args; 03693 03694 args.as.fd = fd; 03695 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; 03696 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; 03697 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; 03698 args.tv = tv; 03699 03700 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); 03701 if (r == -1) 03702 errno = args.as.error; 03703 03704 return r; 03705 } 03706 #endif /* ! USE_POLL */ 03707 03708 /* 03709 * for GC 03710 */ 03711 03712 #ifdef USE_CONSERVATIVE_STACK_END 03713 void 03714 rb_gc_set_stack_end(VALUE **stack_end_p) 03715 { 03716 VALUE stack_end; 03717 *stack_end_p = &stack_end; 03718 } 03719 #endif 03720 03721 03722 /* 03723 * 03724 */ 03725 03726 void 03727 rb_threadptr_check_signal(rb_thread_t *mth) 03728 { 03729 /* mth must be main_thread */ 03730 if (rb_signal_buff_size() > 0) { 03731 /* wakeup main thread */ 03732 rb_threadptr_trap_interrupt(mth); 03733 } 03734 } 03735 03736 static void 03737 timer_thread_function(void *arg) 03738 { 03739 rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ 03740 03741 /* 03742 * Tricky: thread_destruct_lock doesn't close a race against 03743 * vm->running_thread switch. however it guarantee th->running_thread 03744 * point to valid pointer or NULL. 03745 */ 03746 native_mutex_lock(&vm->thread_destruct_lock); 03747 /* for time slice */ 03748 if (vm->running_thread) 03749 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread); 03750 native_mutex_unlock(&vm->thread_destruct_lock); 03751 03752 /* check signal */ 03753 rb_threadptr_check_signal(vm->main_thread); 03754 03755 #if 0 03756 /* prove profiler */ 03757 if (vm->prove_profile.enable) { 03758 rb_thread_t *th = vm->running_thread; 03759 03760 if (vm->during_gc) { 03761 /* GC prove profiling */ 03762 } 03763 } 03764 #endif 03765 } 03766 03767 void 03768 rb_thread_stop_timer_thread(int close_anyway) 03769 { 03770 if (timer_thread_id && native_stop_timer_thread(close_anyway)) { 03771 native_reset_timer_thread(); 03772 } 03773 } 03774 03775 void 03776 rb_thread_reset_timer_thread(void) 03777 { 03778 native_reset_timer_thread(); 03779 } 03780 03781 void 03782 rb_thread_start_timer_thread(void) 03783 { 03784 system_working = 1; 03785 rb_thread_create_timer_thread(); 03786 } 03787 03788 static int 03789 clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) 03790 { 03791 int i; 03792 VALUE lines = (VALUE)val; 03793 03794 for (i = 0; i < RARRAY_LEN(lines); i++) { 03795 if (RARRAY_PTR(lines)[i] != Qnil) { 03796 RARRAY_PTR(lines)[i] = INT2FIX(0); 03797 } 03798 } 03799 return ST_CONTINUE; 03800 } 03801 03802 static void 03803 clear_coverage(void) 03804 { 03805 VALUE coverages = rb_get_coverages(); 03806 if (RTEST(coverages)) { 03807 st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0); 03808 } 03809 } 03810 03811 static void 03812 rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t)) 03813 { 03814 rb_thread_t *th = GET_THREAD(); 03815 rb_vm_t *vm = th->vm; 03816 VALUE thval = th->self; 03817 vm->main_thread = th; 03818 03819 gvl_atfork(th->vm); 03820 st_foreach(vm->living_threads, atfork, (st_data_t)th); 03821 st_clear(vm->living_threads); 03822 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id); 03823 vm->sleeper = 0; 03824 clear_coverage(); 03825 } 03826 03827 static int 03828 terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th) 03829 { 03830 VALUE thval = key; 03831 rb_thread_t *th; 03832 GetThreadPtr(thval, th); 03833 03834 if (th != (rb_thread_t *)current_th) { 03835 if (th->keeping_mutexes) { 03836 rb_mutex_abandon_all(th->keeping_mutexes); 03837 } 03838 th->keeping_mutexes = NULL; 03839 thread_cleanup_func(th, TRUE); 03840 } 03841 return ST_CONTINUE; 03842 } 03843 03844 void 03845 rb_thread_atfork(void) 03846 { 03847 rb_thread_atfork_internal(terminate_atfork_i); 03848 GET_THREAD()->join_list = NULL; 03849 03850 /* We don't want reproduce CVE-2003-0900. */ 03851 rb_reset_random_seed(); 03852 } 03853 03854 static int 03855 terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th) 03856 { 03857 VALUE thval = key; 03858 rb_thread_t *th; 03859 GetThreadPtr(thval, th); 03860 03861 if (th != (rb_thread_t *)current_th) { 03862 thread_cleanup_func_before_exec(th); 03863 } 03864 return ST_CONTINUE; 03865 } 03866 03867 void 03868 rb_thread_atfork_before_exec(void) 03869 { 03870 rb_thread_atfork_internal(terminate_atfork_before_exec_i); 03871 } 03872 03873 struct thgroup { 03874 int enclosed; 03875 VALUE group; 03876 }; 03877 03878 static size_t 03879 thgroup_memsize(const void *ptr) 03880 { 03881 return ptr ? sizeof(struct thgroup) : 0; 03882 } 03883 03884 static const rb_data_type_t thgroup_data_type = { 03885 "thgroup", 03886 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, 03887 }; 03888 03889 /* 03890 * Document-class: ThreadGroup 03891 * 03892 * <code>ThreadGroup</code> provides a means of keeping track of a number of 03893 * threads as a group. A <code>Thread</code> can belong to only one 03894 * <code>ThreadGroup</code> at a time; adding a thread to a new group will 03895 * remove it from any previous group. 03896 * 03897 * Newly created threads belong to the same group as the thread from which they 03898 * were created. 03899 */ 03900 03901 /* 03902 * Document-const: Default 03903 * 03904 * The default ThreadGroup created when Ruby starts; all Threads belong to it 03905 * by default. 03906 */ 03907 static VALUE 03908 thgroup_s_alloc(VALUE klass) 03909 { 03910 VALUE group; 03911 struct thgroup *data; 03912 03913 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data); 03914 data->enclosed = 0; 03915 data->group = group; 03916 03917 return group; 03918 } 03919 03920 struct thgroup_list_params { 03921 VALUE ary; 03922 VALUE group; 03923 }; 03924 03925 static int 03926 thgroup_list_i(st_data_t key, st_data_t val, st_data_t data) 03927 { 03928 VALUE thread = (VALUE)key; 03929 VALUE ary = ((struct thgroup_list_params *)data)->ary; 03930 VALUE group = ((struct thgroup_list_params *)data)->group; 03931 rb_thread_t *th; 03932 GetThreadPtr(thread, th); 03933 03934 if (th->thgroup == group) { 03935 rb_ary_push(ary, thread); 03936 } 03937 return ST_CONTINUE; 03938 } 03939 03940 /* 03941 * call-seq: 03942 * thgrp.list -> array 03943 * 03944 * Returns an array of all existing <code>Thread</code> objects that belong to 03945 * this group. 03946 * 03947 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>] 03948 */ 03949 03950 static VALUE 03951 thgroup_list(VALUE group) 03952 { 03953 VALUE ary = rb_ary_new(); 03954 struct thgroup_list_params param; 03955 03956 param.ary = ary; 03957 param.group = group; 03958 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param); 03959 return ary; 03960 } 03961 03962 03963 /* 03964 * call-seq: 03965 * thgrp.enclose -> thgrp 03966 * 03967 * Prevents threads from being added to or removed from the receiving 03968 * <code>ThreadGroup</code>. New threads can still be started in an enclosed 03969 * <code>ThreadGroup</code>. 03970 * 03971 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914> 03972 * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep> 03973 * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4> 03974 * tg.add thr 03975 * 03976 * <em>produces:</em> 03977 * 03978 * ThreadError: can't move from the enclosed thread group 03979 */ 03980 03981 static VALUE 03982 thgroup_enclose(VALUE group) 03983 { 03984 struct thgroup *data; 03985 03986 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 03987 data->enclosed = 1; 03988 03989 return group; 03990 } 03991 03992 03993 /* 03994 * call-seq: 03995 * thgrp.enclosed? -> true or false 03996 * 03997 * Returns <code>true</code> if <em>thgrp</em> is enclosed. See also 03998 * ThreadGroup#enclose. 03999 */ 04000 04001 static VALUE 04002 thgroup_enclosed_p(VALUE group) 04003 { 04004 struct thgroup *data; 04005 04006 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 04007 if (data->enclosed) 04008 return Qtrue; 04009 return Qfalse; 04010 } 04011 04012 04013 /* 04014 * call-seq: 04015 * thgrp.add(thread) -> thgrp 04016 * 04017 * Adds the given <em>thread</em> to this group, removing it from any other 04018 * group to which it may have previously belonged. 04019 * 04020 * puts "Initial group is #{ThreadGroup::Default.list}" 04021 * tg = ThreadGroup.new 04022 * t1 = Thread.new { sleep } 04023 * t2 = Thread.new { sleep } 04024 * puts "t1 is #{t1}" 04025 * puts "t2 is #{t2}" 04026 * tg.add(t1) 04027 * puts "Initial group now #{ThreadGroup::Default.list}" 04028 * puts "tg group now #{tg.list}" 04029 * 04030 * <em>produces:</em> 04031 * 04032 * Initial group is #<Thread:0x401bdf4c> 04033 * t1 is #<Thread:0x401b3c90> 04034 * t2 is #<Thread:0x401b3c18> 04035 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c> 04036 * tg group now #<Thread:0x401b3c90> 04037 */ 04038 04039 static VALUE 04040 thgroup_add(VALUE group, VALUE thread) 04041 { 04042 rb_thread_t *th; 04043 struct thgroup *data; 04044 04045 rb_secure(4); 04046 GetThreadPtr(thread, th); 04047 04048 if (OBJ_FROZEN(group)) { 04049 rb_raise(rb_eThreadError, "can't move to the frozen thread group"); 04050 } 04051 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 04052 if (data->enclosed) { 04053 rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); 04054 } 04055 04056 if (!th->thgroup) { 04057 return Qnil; 04058 } 04059 04060 if (OBJ_FROZEN(th->thgroup)) { 04061 rb_raise(rb_eThreadError, "can't move from the frozen thread group"); 04062 } 04063 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data); 04064 if (data->enclosed) { 04065 rb_raise(rb_eThreadError, 04066 "can't move from the enclosed thread group"); 04067 } 04068 04069 th->thgroup = group; 04070 return group; 04071 } 04072 04073 04074 /* 04075 * Document-class: Mutex 04076 * 04077 * Mutex implements a simple semaphore that can be used to coordinate access to 04078 * shared data from multiple concurrent threads. 04079 * 04080 * Example: 04081 * 04082 * require 'thread' 04083 * semaphore = Mutex.new 04084 * 04085 * a = Thread.new { 04086 * semaphore.synchronize { 04087 * # access shared resource 04088 * } 04089 * } 04090 * 04091 * b = Thread.new { 04092 * semaphore.synchronize { 04093 * # access shared resource 04094 * } 04095 * } 04096 * 04097 */ 04098 04099 #define GetMutexPtr(obj, tobj) \ 04100 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) 04101 04102 static const char *rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 04103 04104 #define mutex_mark NULL 04105 04106 static void 04107 mutex_free(void *ptr) 04108 { 04109 if (ptr) { 04110 rb_mutex_t *mutex = ptr; 04111 if (mutex->th) { 04112 /* rb_warn("free locked mutex"); */ 04113 const char *err = rb_mutex_unlock_th(mutex, mutex->th); 04114 if (err) rb_bug("%s", err); 04115 } 04116 native_mutex_destroy(&mutex->lock); 04117 native_cond_destroy(&mutex->cond); 04118 } 04119 ruby_xfree(ptr); 04120 } 04121 04122 static size_t 04123 mutex_memsize(const void *ptr) 04124 { 04125 return ptr ? sizeof(rb_mutex_t) : 0; 04126 } 04127 04128 static const rb_data_type_t mutex_data_type = { 04129 "mutex", 04130 {mutex_mark, mutex_free, mutex_memsize,}, 04131 }; 04132 04133 VALUE 04134 rb_obj_is_mutex(VALUE obj) 04135 { 04136 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { 04137 return Qtrue; 04138 } 04139 else { 04140 return Qfalse; 04141 } 04142 } 04143 04144 static VALUE 04145 mutex_alloc(VALUE klass) 04146 { 04147 VALUE volatile obj; 04148 rb_mutex_t *mutex; 04149 04150 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); 04151 native_mutex_initialize(&mutex->lock); 04152 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); 04153 return obj; 04154 } 04155 04156 /* 04157 * call-seq: 04158 * Mutex.new -> mutex 04159 * 04160 * Creates a new Mutex 04161 */ 04162 static VALUE 04163 mutex_initialize(VALUE self) 04164 { 04165 return self; 04166 } 04167 04168 VALUE 04169 rb_mutex_new(void) 04170 { 04171 return mutex_alloc(rb_cMutex); 04172 } 04173 04174 /* 04175 * call-seq: 04176 * mutex.locked? -> true or false 04177 * 04178 * Returns +true+ if this lock is currently held by some thread. 04179 */ 04180 VALUE 04181 rb_mutex_locked_p(VALUE self) 04182 { 04183 rb_mutex_t *mutex; 04184 GetMutexPtr(self, mutex); 04185 return mutex->th ? Qtrue : Qfalse; 04186 } 04187 04188 static void 04189 mutex_locked(rb_thread_t *th, VALUE self) 04190 { 04191 rb_mutex_t *mutex; 04192 GetMutexPtr(self, mutex); 04193 04194 if (th->keeping_mutexes) { 04195 mutex->next_mutex = th->keeping_mutexes; 04196 } 04197 th->keeping_mutexes = mutex; 04198 } 04199 04200 /* 04201 * call-seq: 04202 * mutex.try_lock -> true or false 04203 * 04204 * Attempts to obtain the lock and returns immediately. Returns +true+ if the 04205 * lock was granted. 04206 */ 04207 VALUE 04208 rb_mutex_trylock(VALUE self) 04209 { 04210 rb_mutex_t *mutex; 04211 VALUE locked = Qfalse; 04212 GetMutexPtr(self, mutex); 04213 04214 native_mutex_lock(&mutex->lock); 04215 if (mutex->th == 0) { 04216 mutex->th = GET_THREAD(); 04217 locked = Qtrue; 04218 04219 mutex_locked(GET_THREAD(), self); 04220 } 04221 native_mutex_unlock(&mutex->lock); 04222 04223 return locked; 04224 } 04225 04226 static int 04227 lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) 04228 { 04229 int interrupted = 0; 04230 int err = 0; 04231 04232 mutex->cond_waiting++; 04233 for (;;) { 04234 if (!mutex->th) { 04235 mutex->th = th; 04236 break; 04237 } 04238 if (RUBY_VM_INTERRUPTED(th)) { 04239 interrupted = 1; 04240 break; 04241 } 04242 if (err == ETIMEDOUT) { 04243 interrupted = 2; 04244 break; 04245 } 04246 04247 if (timeout_ms) { 04248 struct timespec timeout_rel; 04249 struct timespec timeout; 04250 04251 timeout_rel.tv_sec = 0; 04252 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; 04253 timeout = native_cond_timeout(&mutex->cond, timeout_rel); 04254 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); 04255 } 04256 else { 04257 native_cond_wait(&mutex->cond, &mutex->lock); 04258 err = 0; 04259 } 04260 } 04261 mutex->cond_waiting--; 04262 04263 return interrupted; 04264 } 04265 04266 static void 04267 lock_interrupt(void *ptr) 04268 { 04269 rb_mutex_t *mutex = (rb_mutex_t *)ptr; 04270 native_mutex_lock(&mutex->lock); 04271 if (mutex->cond_waiting > 0) 04272 native_cond_broadcast(&mutex->cond); 04273 native_mutex_unlock(&mutex->lock); 04274 } 04275 04276 /* 04277 * At maximum, only one thread can use cond_timedwait and watch deadlock 04278 * periodically. Multiple polling thread (i.e. concurrent deadlock check) 04279 * introduces new race conditions. [Bug #6278] [ruby-core:44275] 04280 */ 04281 static const rb_thread_t *patrol_thread = NULL; 04282 04283 /* 04284 * call-seq: 04285 * mutex.lock -> self 04286 * 04287 * Attempts to grab the lock and waits if it isn't available. 04288 * Raises +ThreadError+ if +mutex+ was locked by the current thread. 04289 */ 04290 VALUE 04291 rb_mutex_lock(VALUE self) 04292 { 04293 rb_thread_t *th = GET_THREAD(); 04294 rb_mutex_t *mutex; 04295 GetMutexPtr(self, mutex); 04296 04297 /* When running trap handler */ 04298 if (!mutex->allow_trap && th->interrupt_mask & TRAP_INTERRUPT_MASK) { 04299 rb_raise(rb_eThreadError, "can't be called from trap context"); 04300 } 04301 04302 if (rb_mutex_trylock(self) == Qfalse) { 04303 if (mutex->th == GET_THREAD()) { 04304 rb_raise(rb_eThreadError, "deadlock; recursive locking"); 04305 } 04306 04307 while (mutex->th != th) { 04308 int interrupted; 04309 enum rb_thread_status prev_status = th->status; 04310 volatile int timeout_ms = 0; 04311 struct rb_unblock_callback oldubf; 04312 04313 set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); 04314 th->status = THREAD_STOPPED_FOREVER; 04315 th->locking_mutex = self; 04316 04317 native_mutex_lock(&mutex->lock); 04318 th->vm->sleeper++; 04319 /* 04320 * Carefully! while some contended threads are in lock_func(), 04321 * vm->sleepr is unstable value. we have to avoid both deadlock 04322 * and busy loop. 04323 */ 04324 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && 04325 !patrol_thread) { 04326 timeout_ms = 100; 04327 patrol_thread = th; 04328 } 04329 04330 GVL_UNLOCK_BEGIN(); 04331 interrupted = lock_func(th, mutex, (int)timeout_ms); 04332 native_mutex_unlock(&mutex->lock); 04333 GVL_UNLOCK_END(); 04334 04335 if (patrol_thread == th) 04336 patrol_thread = NULL; 04337 04338 reset_unblock_function(th, &oldubf); 04339 04340 th->locking_mutex = Qfalse; 04341 if (mutex->th && interrupted == 2) { 04342 rb_check_deadlock(th->vm); 04343 } 04344 if (th->status == THREAD_STOPPED_FOREVER) { 04345 th->status = prev_status; 04346 } 04347 th->vm->sleeper--; 04348 04349 if (mutex->th == th) mutex_locked(th, self); 04350 04351 if (interrupted) { 04352 RUBY_VM_CHECK_INTS_BLOCKING(th); 04353 } 04354 } 04355 } 04356 return self; 04357 } 04358 04359 /* 04360 * call-seq: 04361 * mutex.owned? -> true or false 04362 * 04363 * Returns +true+ if this lock is currently held by current thread. 04364 * <em>This API is experimental, and subject to change.</em> 04365 */ 04366 VALUE 04367 rb_mutex_owned_p(VALUE self) 04368 { 04369 VALUE owned = Qfalse; 04370 rb_thread_t *th = GET_THREAD(); 04371 rb_mutex_t *mutex; 04372 04373 GetMutexPtr(self, mutex); 04374 04375 if (mutex->th == th) 04376 owned = Qtrue; 04377 04378 return owned; 04379 } 04380 04381 static const char * 04382 rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) 04383 { 04384 const char *err = NULL; 04385 04386 native_mutex_lock(&mutex->lock); 04387 04388 if (mutex->th == 0) { 04389 err = "Attempt to unlock a mutex which is not locked"; 04390 } 04391 else if (mutex->th != th) { 04392 err = "Attempt to unlock a mutex which is locked by another thread"; 04393 } 04394 else { 04395 mutex->th = 0; 04396 if (mutex->cond_waiting > 0) 04397 native_cond_signal(&mutex->cond); 04398 } 04399 04400 native_mutex_unlock(&mutex->lock); 04401 04402 if (!err) { 04403 rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes; 04404 while (*th_mutex != mutex) { 04405 th_mutex = &(*th_mutex)->next_mutex; 04406 } 04407 *th_mutex = mutex->next_mutex; 04408 mutex->next_mutex = NULL; 04409 } 04410 04411 return err; 04412 } 04413 04414 /* 04415 * call-seq: 04416 * mutex.unlock -> self 04417 * 04418 * Releases the lock. 04419 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. 04420 */ 04421 VALUE 04422 rb_mutex_unlock(VALUE self) 04423 { 04424 const char *err; 04425 rb_mutex_t *mutex; 04426 GetMutexPtr(self, mutex); 04427 04428 /* When running trap handler */ 04429 if (!mutex->allow_trap && GET_THREAD()->interrupt_mask & TRAP_INTERRUPT_MASK) { 04430 rb_raise(rb_eThreadError, "can't be called from trap context"); 04431 } 04432 04433 err = rb_mutex_unlock_th(mutex, GET_THREAD()); 04434 if (err) rb_raise(rb_eThreadError, "%s", err); 04435 04436 return self; 04437 } 04438 04439 static void 04440 rb_mutex_abandon_all(rb_mutex_t *mutexes) 04441 { 04442 rb_mutex_t *mutex; 04443 04444 while (mutexes) { 04445 mutex = mutexes; 04446 mutexes = mutex->next_mutex; 04447 mutex->th = 0; 04448 mutex->next_mutex = 0; 04449 } 04450 } 04451 04452 static VALUE 04453 rb_mutex_sleep_forever(VALUE time) 04454 { 04455 sleep_forever(GET_THREAD(), 1, 0); /* permit spurious check */ 04456 return Qnil; 04457 } 04458 04459 static VALUE 04460 rb_mutex_wait_for(VALUE time) 04461 { 04462 struct timeval *t = (struct timeval *)time; 04463 sleep_timeval(GET_THREAD(), *t, 0); /* permit spurious check */ 04464 return Qnil; 04465 } 04466 04467 VALUE 04468 rb_mutex_sleep(VALUE self, VALUE timeout) 04469 { 04470 time_t beg, end; 04471 struct timeval t; 04472 04473 if (!NIL_P(timeout)) { 04474 t = rb_time_interval(timeout); 04475 } 04476 rb_mutex_unlock(self); 04477 beg = time(0); 04478 if (NIL_P(timeout)) { 04479 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); 04480 } 04481 else { 04482 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); 04483 } 04484 end = time(0) - beg; 04485 return INT2FIX(end); 04486 } 04487 04488 /* 04489 * call-seq: 04490 * mutex.sleep(timeout = nil) -> number 04491 * 04492 * Releases the lock and sleeps +timeout+ seconds if it is given and 04493 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by 04494 * the current thread. 04495 * 04496 * Note that this method can wakeup without explicit Thread#wakeup call. 04497 * For example, receiving signal and so on. 04498 */ 04499 static VALUE 04500 mutex_sleep(int argc, VALUE *argv, VALUE self) 04501 { 04502 VALUE timeout; 04503 04504 rb_scan_args(argc, argv, "01", &timeout); 04505 return rb_mutex_sleep(self, timeout); 04506 } 04507 04508 /* 04509 * call-seq: 04510 * mutex.synchronize { ... } -> result of the block 04511 * 04512 * Obtains a lock, runs the block, and releases the lock when the block 04513 * completes. See the example under +Mutex+. 04514 */ 04515 04516 VALUE 04517 rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) 04518 { 04519 rb_mutex_lock(mutex); 04520 return rb_ensure(func, arg, rb_mutex_unlock, mutex); 04521 } 04522 04523 /* 04524 * call-seq: 04525 * mutex.synchronize { ... } -> result of the block 04526 * 04527 * Obtains a lock, runs the block, and releases the lock when the block 04528 * completes. See the example under +Mutex+. 04529 */ 04530 static VALUE 04531 rb_mutex_synchronize_m(VALUE self, VALUE args) 04532 { 04533 if (!rb_block_given_p()) { 04534 rb_raise(rb_eThreadError, "must be called with a block"); 04535 } 04536 04537 return rb_mutex_synchronize(self, rb_yield, Qundef); 04538 } 04539 04540 void rb_mutex_allow_trap(VALUE self, int val) 04541 { 04542 rb_mutex_t *m; 04543 GetMutexPtr(self, m); 04544 04545 m->allow_trap = val; 04546 } 04547 04548 /* 04549 * Document-class: ThreadShield 04550 */ 04551 static void 04552 thread_shield_mark(void *ptr) 04553 { 04554 rb_gc_mark((VALUE)ptr); 04555 } 04556 04557 static const rb_data_type_t thread_shield_data_type = { 04558 "thread_shield", 04559 {thread_shield_mark, 0, 0,}, 04560 }; 04561 04562 static VALUE 04563 thread_shield_alloc(VALUE klass) 04564 { 04565 return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0)); 04566 } 04567 04568 #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type)) 04569 #define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19) 04570 #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT) 04571 #define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT) 04572 04573 static inline void 04574 rb_thread_shield_waiting_inc(VALUE b) 04575 { 04576 unsigned int w = rb_thread_shield_waiting(b); 04577 w++; 04578 if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)) 04579 rb_raise(rb_eRuntimeError, "waiting count overflow"); 04580 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; 04581 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT); 04582 } 04583 04584 static inline void 04585 rb_thread_shield_waiting_dec(VALUE b) 04586 { 04587 unsigned int w = rb_thread_shield_waiting(b); 04588 if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow"); 04589 w--; 04590 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; 04591 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT); 04592 } 04593 04594 VALUE 04595 rb_thread_shield_new(void) 04596 { 04597 VALUE thread_shield = thread_shield_alloc(rb_cThreadShield); 04598 rb_mutex_lock((VALUE)DATA_PTR(thread_shield)); 04599 return thread_shield; 04600 } 04601 04602 /* 04603 * Wait a thread shield. 04604 * 04605 * Returns 04606 * true: acquired the thread shield 04607 * false: the thread shield was destroyed and no other threads waiting 04608 * nil: the thread shield was destroyed but still in use 04609 */ 04610 VALUE 04611 rb_thread_shield_wait(VALUE self) 04612 { 04613 VALUE mutex = GetThreadShieldPtr(self); 04614 rb_mutex_t *m; 04615 04616 if (!mutex) return Qfalse; 04617 GetMutexPtr(mutex, m); 04618 if (m->th == GET_THREAD()) return Qnil; 04619 rb_thread_shield_waiting_inc(self); 04620 rb_mutex_lock(mutex); 04621 rb_thread_shield_waiting_dec(self); 04622 if (DATA_PTR(self)) return Qtrue; 04623 rb_mutex_unlock(mutex); 04624 return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse; 04625 } 04626 04627 /* 04628 * Release a thread shield, and return true if it has waiting threads. 04629 */ 04630 VALUE 04631 rb_thread_shield_release(VALUE self) 04632 { 04633 VALUE mutex = GetThreadShieldPtr(self); 04634 rb_mutex_unlock(mutex); 04635 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; 04636 } 04637 04638 /* 04639 * Release and destroy a thread shield, and return true if it has waiting threads. 04640 */ 04641 VALUE 04642 rb_thread_shield_destroy(VALUE self) 04643 { 04644 VALUE mutex = GetThreadShieldPtr(self); 04645 DATA_PTR(self) = 0; 04646 rb_mutex_unlock(mutex); 04647 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; 04648 } 04649 04650 /* variables for recursive traversals */ 04651 static ID recursive_key; 04652 04653 /* 04654 * Returns the current "recursive list" used to detect recursion. 04655 * This list is a hash table, unique for the current thread and for 04656 * the current __callee__. 04657 */ 04658 04659 static VALUE 04660 recursive_list_access(void) 04661 { 04662 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key); 04663 VALUE sym = ID2SYM(rb_frame_this_func()); 04664 VALUE list; 04665 if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) { 04666 hash = rb_hash_new(); 04667 OBJ_UNTRUST(hash); 04668 rb_thread_local_aset(rb_thread_current(), recursive_key, hash); 04669 list = Qnil; 04670 } 04671 else { 04672 list = rb_hash_aref(hash, sym); 04673 } 04674 if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) { 04675 list = rb_hash_new(); 04676 OBJ_UNTRUST(list); 04677 rb_hash_aset(hash, sym, list); 04678 } 04679 return list; 04680 } 04681 04682 /* 04683 * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already 04684 * in the recursion list. 04685 * Assumes the recursion list is valid. 04686 */ 04687 04688 static VALUE 04689 recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) 04690 { 04691 #if SIZEOF_LONG == SIZEOF_VOIDP 04692 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other)) 04693 #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP 04694 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \ 04695 rb_big_eql((obj_id), (other)) : ((obj_id) == (other))) 04696 #endif 04697 04698 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); 04699 if (pair_list == Qundef) 04700 return Qfalse; 04701 if (paired_obj_id) { 04702 if (!RB_TYPE_P(pair_list, T_HASH)) { 04703 if (!OBJ_ID_EQL(paired_obj_id, pair_list)) 04704 return Qfalse; 04705 } 04706 else { 04707 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) 04708 return Qfalse; 04709 } 04710 } 04711 return Qtrue; 04712 } 04713 04714 /* 04715 * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. 04716 * For a single obj_id, it sets list[obj_id] to Qtrue. 04717 * For a pair, it sets list[obj_id] to paired_obj_id if possible, 04718 * otherwise list[obj_id] becomes a hash like: 04719 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } 04720 * Assumes the recursion list is valid. 04721 */ 04722 04723 static void 04724 recursive_push(VALUE list, VALUE obj, VALUE paired_obj) 04725 { 04726 VALUE pair_list; 04727 04728 if (!paired_obj) { 04729 rb_hash_aset(list, obj, Qtrue); 04730 } 04731 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) { 04732 rb_hash_aset(list, obj, paired_obj); 04733 } 04734 else { 04735 if (!RB_TYPE_P(pair_list, T_HASH)){ 04736 VALUE other_paired_obj = pair_list; 04737 pair_list = rb_hash_new(); 04738 OBJ_UNTRUST(pair_list); 04739 rb_hash_aset(pair_list, other_paired_obj, Qtrue); 04740 rb_hash_aset(list, obj, pair_list); 04741 } 04742 rb_hash_aset(pair_list, paired_obj, Qtrue); 04743 } 04744 } 04745 04746 /* 04747 * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. 04748 * For a pair, if list[obj_id] is a hash, then paired_obj_id is 04749 * removed from the hash and no attempt is made to simplify 04750 * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id 04751 * Assumes the recursion list is valid. 04752 */ 04753 04754 static void 04755 recursive_pop(VALUE list, VALUE obj, VALUE paired_obj) 04756 { 04757 if (paired_obj) { 04758 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); 04759 if (pair_list == Qundef) { 04760 VALUE symname = rb_inspect(ID2SYM(rb_frame_this_func())); 04761 VALUE thrname = rb_inspect(rb_thread_current()); 04762 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list for %s in %s", 04763 StringValuePtr(symname), StringValuePtr(thrname)); 04764 } 04765 if (RB_TYPE_P(pair_list, T_HASH)) { 04766 rb_hash_delete(pair_list, paired_obj); 04767 if (!RHASH_EMPTY_P(pair_list)) { 04768 return; /* keep hash until is empty */ 04769 } 04770 } 04771 } 04772 rb_hash_delete(list, obj); 04773 } 04774 04775 struct exec_recursive_params { 04776 VALUE (*func) (VALUE, VALUE, int); 04777 VALUE list; 04778 VALUE obj; 04779 VALUE objid; 04780 VALUE pairid; 04781 VALUE arg; 04782 }; 04783 04784 static VALUE 04785 exec_recursive_i(VALUE tag, struct exec_recursive_params *p) 04786 { 04787 VALUE result = Qundef; 04788 int state; 04789 04790 recursive_push(p->list, p->objid, p->pairid); 04791 PUSH_TAG(); 04792 if ((state = EXEC_TAG()) == 0) { 04793 result = (*p->func)(p->obj, p->arg, FALSE); 04794 } 04795 POP_TAG(); 04796 recursive_pop(p->list, p->objid, p->pairid); 04797 if (state) 04798 JUMP_TAG(state); 04799 return result; 04800 } 04801 04802 /* 04803 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04804 * current method is called recursively on obj, or on the pair <obj, pairid> 04805 * If outer is 0, then the innermost func will be called with recursive set 04806 * to Qtrue, otherwise the outermost func will be called. In the latter case, 04807 * all inner func are short-circuited by throw. 04808 * Implementation details: the value thrown is the recursive list which is 04809 * proper to the current method and unlikely to be catched anywhere else. 04810 * list[recursive_key] is used as a flag for the outermost call. 04811 */ 04812 04813 static VALUE 04814 exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer) 04815 { 04816 VALUE result = Qundef; 04817 struct exec_recursive_params p; 04818 int outermost; 04819 p.list = recursive_list_access(); 04820 p.objid = rb_obj_id(obj); 04821 p.obj = obj; 04822 p.pairid = pairid; 04823 p.arg = arg; 04824 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); 04825 04826 if (recursive_check(p.list, p.objid, pairid)) { 04827 if (outer && !outermost) { 04828 rb_throw_obj(p.list, p.list); 04829 } 04830 return (*func)(obj, arg, TRUE); 04831 } 04832 else { 04833 p.func = func; 04834 04835 if (outermost) { 04836 recursive_push(p.list, ID2SYM(recursive_key), 0); 04837 result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p); 04838 recursive_pop(p.list, ID2SYM(recursive_key), 0); 04839 if (result == p.list) { 04840 result = (*func)(obj, arg, TRUE); 04841 } 04842 } 04843 else { 04844 result = exec_recursive_i(0, &p); 04845 } 04846 } 04847 *(volatile struct exec_recursive_params *)&p; 04848 return result; 04849 } 04850 04851 /* 04852 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04853 * current method is called recursively on obj 04854 */ 04855 04856 VALUE 04857 rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04858 { 04859 return exec_recursive(func, obj, 0, arg, 0); 04860 } 04861 04862 /* 04863 * Calls func(obj, arg, recursive), where recursive is non-zero if the 04864 * current method is called recursively on the ordered pair <obj, paired_obj> 04865 */ 04866 04867 VALUE 04868 rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 04869 { 04870 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0); 04871 } 04872 04873 /* 04874 * If recursion is detected on the current method and obj, the outermost 04875 * func will be called with (obj, arg, Qtrue). All inner func will be 04876 * short-circuited using throw. 04877 */ 04878 04879 VALUE 04880 rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 04881 { 04882 return exec_recursive(func, obj, 0, arg, 1); 04883 } 04884 04885 /* 04886 * call-seq: 04887 * thr.backtrace -> array 04888 * 04889 * Returns the current backtrace of the target thread. 04890 * 04891 */ 04892 04893 static VALUE 04894 rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval) 04895 { 04896 return vm_thread_backtrace(argc, argv, thval); 04897 } 04898 04899 /* call-seq: 04900 * thr.backtrace_locations(*args) -> array or nil 04901 * 04902 * Returns the execution stack for the target thread---an array containing 04903 * backtrace location objects. 04904 * 04905 * See Thread::Backtrace::Location for more information. 04906 * 04907 * This method behaves similarly to Kernel#caller_locations except it applies 04908 * to a specific thread. 04909 */ 04910 static VALUE 04911 rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) 04912 { 04913 return vm_thread_backtrace_locations(argc, argv, thval); 04914 } 04915 04916 /* 04917 * Document-class: ThreadError 04918 * 04919 * Raised when an invalid operation is attempted on a thread. 04920 * 04921 * For example, when no other thread has been started: 04922 * 04923 * Thread.stop 04924 * 04925 * <em>raises the exception:</em> 04926 * 04927 * ThreadError: stopping only thread 04928 */ 04929 04930 /* 04931 * +Thread+ encapsulates the behavior of a thread of 04932 * execution, including the main thread of the Ruby script. 04933 * 04934 * In the descriptions of the methods in this class, the parameter _sym_ 04935 * refers to a symbol, which is either a quoted string or a 04936 * +Symbol+ (such as <code>:name</code>). 04937 */ 04938 04939 void 04940 Init_Thread(void) 04941 { 04942 #undef rb_intern 04943 #define rb_intern(str) rb_intern_const(str) 04944 04945 VALUE cThGroup; 04946 rb_thread_t *th = GET_THREAD(); 04947 04948 sym_never = ID2SYM(rb_intern("never")); 04949 sym_immediate = ID2SYM(rb_intern("immediate")); 04950 sym_on_blocking = ID2SYM(rb_intern("on_blocking")); 04951 04952 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); 04953 rb_define_singleton_method(rb_cThread, "start", thread_start, -2); 04954 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); 04955 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); 04956 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); 04957 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); 04958 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); 04959 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); 04960 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); 04961 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); 04962 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); 04963 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); 04964 #if THREAD_DEBUG < 0 04965 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); 04966 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); 04967 #endif 04968 rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1); 04969 rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1); 04970 rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1); 04971 04972 rb_define_method(rb_cThread, "initialize", thread_initialize, -2); 04973 rb_define_method(rb_cThread, "raise", thread_raise_m, -1); 04974 rb_define_method(rb_cThread, "join", thread_join_m, -1); 04975 rb_define_method(rb_cThread, "value", thread_value, 0); 04976 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0); 04977 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0); 04978 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0); 04979 rb_define_method(rb_cThread, "run", rb_thread_run, 0); 04980 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0); 04981 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1); 04982 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2); 04983 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1); 04984 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); 04985 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); 04986 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); 04987 rb_define_method(rb_cThread, "status", rb_thread_status, 0); 04988 rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1); 04989 rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2); 04990 rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0); 04991 rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1); 04992 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0); 04993 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0); 04994 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0); 04995 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); 04996 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); 04997 rb_define_method(rb_cThread, "group", rb_thread_group, 0); 04998 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1); 04999 rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1); 05000 05001 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); 05002 05003 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed"); 05004 OBJ_TAINT(closed_stream_error); 05005 OBJ_FREEZE(closed_stream_error); 05006 05007 cThGroup = rb_define_class("ThreadGroup", rb_cObject); 05008 rb_define_alloc_func(cThGroup, thgroup_s_alloc); 05009 rb_define_method(cThGroup, "list", thgroup_list, 0); 05010 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0); 05011 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0); 05012 rb_define_method(cThGroup, "add", thgroup_add, 1); 05013 05014 { 05015 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); 05016 rb_define_const(cThGroup, "Default", th->thgroup); 05017 } 05018 05019 rb_cMutex = rb_define_class("Mutex", rb_cObject); 05020 rb_define_alloc_func(rb_cMutex, mutex_alloc); 05021 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); 05022 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); 05023 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0); 05024 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); 05025 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); 05026 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1); 05027 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0); 05028 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); 05029 05030 recursive_key = rb_intern("__recursive_key__"); 05031 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); 05032 05033 /* init thread core */ 05034 { 05035 /* main thread setting */ 05036 { 05037 /* acquire global vm lock */ 05038 gvl_init(th->vm); 05039 gvl_acquire(th->vm, th); 05040 native_mutex_initialize(&th->vm->thread_destruct_lock); 05041 native_mutex_initialize(&th->interrupt_lock); 05042 05043 th->pending_interrupt_queue = rb_ary_tmp_new(0); 05044 th->pending_interrupt_queue_checked = 0; 05045 th->pending_interrupt_mask_stack = rb_ary_tmp_new(0); 05046 05047 th->interrupt_mask = 0; 05048 } 05049 } 05050 05051 rb_thread_create_timer_thread(); 05052 05053 /* suppress warnings on cygwin, mingw and mswin.*/ 05054 (void)native_mutex_trylock; 05055 } 05056 05057 int 05058 ruby_native_thread_p(void) 05059 { 05060 rb_thread_t *th = ruby_thread_from_native(); 05061 05062 return th != 0; 05063 } 05064 05065 static int 05066 check_deadlock_i(st_data_t key, st_data_t val, int *found) 05067 { 05068 VALUE thval = key; 05069 rb_thread_t *th; 05070 GetThreadPtr(thval, th); 05071 05072 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) { 05073 *found = 1; 05074 } 05075 else if (th->locking_mutex) { 05076 rb_mutex_t *mutex; 05077 GetMutexPtr(th->locking_mutex, mutex); 05078 05079 native_mutex_lock(&mutex->lock); 05080 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { 05081 *found = 1; 05082 } 05083 native_mutex_unlock(&mutex->lock); 05084 } 05085 05086 return (*found) ? ST_STOP : ST_CONTINUE; 05087 } 05088 05089 #ifdef DEBUG_DEADLOCK_CHECK 05090 static int 05091 debug_i(st_data_t key, st_data_t val, int *found) 05092 { 05093 VALUE thval = key; 05094 rb_thread_t *th; 05095 GetThreadPtr(thval, th); 05096 05097 printf("th:%p %d %d", th, th->status, th->interrupt_flag); 05098 if (th->locking_mutex) { 05099 rb_mutex_t *mutex; 05100 GetMutexPtr(th->locking_mutex, mutex); 05101 05102 native_mutex_lock(&mutex->lock); 05103 printf(" %p %d\n", mutex->th, mutex->cond_waiting); 05104 native_mutex_unlock(&mutex->lock); 05105 } 05106 else 05107 puts(""); 05108 05109 return ST_CONTINUE; 05110 } 05111 #endif 05112 05113 static void 05114 rb_check_deadlock(rb_vm_t *vm) 05115 { 05116 int found = 0; 05117 05118 if (vm_living_thread_num(vm) > vm->sleeper) return; 05119 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); 05120 if (patrol_thread && patrol_thread != GET_THREAD()) return; 05121 05122 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); 05123 05124 if (!found) { 05125 VALUE argv[2]; 05126 argv[0] = rb_eFatal; 05127 argv[1] = rb_str_new2("No live threads left. Deadlock?"); 05128 #ifdef DEBUG_DEADLOCK_CHECK 05129 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread); 05130 st_foreach(vm->living_threads, debug_i, (st_data_t)0); 05131 #endif 05132 vm->sleeper--; 05133 rb_threadptr_raise(vm->main_thread, 2, argv); 05134 } 05135 } 05136 05137 static void 05138 update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 05139 { 05140 VALUE coverage = GET_THREAD()->cfp->iseq->coverage; 05141 if (coverage && RBASIC(coverage)->klass == 0) { 05142 long line = rb_sourceline() - 1; 05143 long count; 05144 if (RARRAY_PTR(coverage)[line] == Qnil) { 05145 return; 05146 } 05147 count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1; 05148 if (POSFIXABLE(count)) { 05149 RARRAY_PTR(coverage)[line] = LONG2FIX(count); 05150 } 05151 } 05152 } 05153 05154 VALUE 05155 rb_get_coverages(void) 05156 { 05157 return GET_VM()->coverages; 05158 } 05159 05160 void 05161 rb_set_coverages(VALUE coverages) 05162 { 05163 GET_VM()->coverages = coverages; 05164 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil); 05165 } 05166 05167 void 05168 rb_reset_coverages(void) 05169 { 05170 GET_VM()->coverages = Qfalse; 05171 rb_remove_event_hook(update_coverage); 05172 } 05173 05174 VALUE 05175 rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) 05176 { 05177 VALUE interrupt_mask = rb_hash_new(); 05178 rb_thread_t *cur_th = GET_THREAD(); 05179 05180 rb_hash_aset(interrupt_mask, rb_cObject, sym_never); 05181 rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask); 05182 05183 return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); 05184 } 05185