thread, io, & cs: place startup, wait, break, & kill

Fill in more of place creation and termination, which exposed
additional places that the thread layer needs place-specific
variables.
This commit is contained in:
Matthew Flatt 2018-09-03 16:47:00 -06:00
parent 35af40d850
commit 416447e842
24 changed files with 428 additions and 114 deletions

View File

@ -186,6 +186,40 @@
(sync (system-idle-evt))
(check 2 trying)) ; second thread should have completed
(let* ([place-symbols (make-hasheq)]
[register-place-symbol!
(lambda (sym proc)
(hash-set! place-symbols sym proc))])
(install-start-place!
(lambda (mod sym in out err)
(lambda (finish)
(finish #f #f #f)
((hash-ref place-symbols sym)))))
(register-place-symbol! 'nothing void)
(let ([pl1 (dynamic-place 'dummy 'nothing #f #f #f)])
(check #t (place? pl1))
(check 0 (place-wait pl1)))
(register-place-symbol! 'exit1 (lambda () (exit 1)))
(let ([pl2 (dynamic-place 'dummy 'exit1 #f #f #f)])
(check #t (place? pl2))
(check 1 (place-wait pl2)))
(register-place-symbol! 'loop (lambda () (let loop () (loop))))
(let ([pl3 (dynamic-place 'dummy 'loop #f #f #f)])
(check #t (place? pl3))
(place-break pl3)
(check 1 (place-wait pl3))
(printf "[That break was from a place, and it's expected]\n"))
(let ([pl4 (dynamic-place 'dummy 'loop #f #f #f)])
(check #f (sync/timeout 0.01 (place-dead-evt pl4)))
(place-kill pl4)
(check 1 (place-wait pl4))
(check #t (evt? (sync (place-dead-evt pl4))))
(check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4))))))
;; Measure thread quantum:
#;
(let ([t1 (thread (lambda () (let loop () (loop))))]

View File

@ -5,7 +5,7 @@
;; first index is reserved for Rumble:
(meta chez:define thread-register-start 1)
(meta chez:define thread-register-count 12)
(meta chez:define thread-register-count 14)
(meta chez:define io-register-start (+ thread-register-start thread-register-count))
(meta chez:define io-register-count 16)
@ -14,7 +14,7 @@
(meta chez:define regexp-register-count 3)
(meta chez:define expander-register-start (+ regexp-register-start regexp-register-count))
(meta chez:define expander-register-count 32)
(meta chez:define expander-register-count 30)
;; ----------------------------------------

View File

@ -578,6 +578,7 @@
place-local-register-ref ; not exported to Racket
place-local-register-set! ; not exported to Racket
place-local-register-init! ; not exported to Racket
place-exit ; not exported to Racket
_bool _bytes _short_bytes _double _double* _fixint _fixnum _float _fpointer _gcpointer
_int16 _int32 _int64 _int8 _longdouble _pointer _scheme _stdbool _void

View File

@ -19,7 +19,7 @@
(keyboard-interrupt-handler))
(define (engine-exit v)
(chez:exit v))
(place-exit v))
(define (set-engine-exit-handler! proc)
(set! engine-exit proc))

View File

@ -687,6 +687,10 @@
(current-exception-state (create-exception-state))
(base-exception-handler
(lambda (v)
#;
(#%printf "~s ~s\n"
(exn->string v)
'(continuation-mark-set-traces (current-continuation-marks)))
(cond
[(and (warning? v)
(not (non-continuable-violation? v)))

View File

@ -1528,6 +1528,9 @@
(define-virtual-register place-thread-category PLACE-KNOWN-THREAD)
(define (register-as-place-main!)
(place-thread-category PLACE-MAIN-THREAD)
(foreign-place-init!))
(define (foreign-place-init!)
(current-async-callback-queue (make-async-callback-queue (make-mutex)
(make-condition)
'())))

View File

@ -58,4 +58,4 @@
(define (internal-error who s)
(CHECK-uninterrupted
(chez:fprintf (current-error-port) "~a: ~a\n" who s)
(chez:exit 1)))
(#%exit 1)))

View File

@ -54,17 +54,26 @@
;; ----------------------------------------
(define-thread-local place-esc-box (box #f))
(meta-cond
[(threaded?)
(define (place-enabled?) #f) ;; FIXME
(define (fork-place thunk)
(define (fork-place thunk finish-proc)
(fork-thread (lambda ()
(init-virtual-registers)
(place-registers (vector-copy place-register-inits))
(thunk))))]
(init-place-locals!)
(foreign-place-init!)
(let ([result (call/cc
(lambda (esc)
(set-box! place-esc-box esc)
(thunk)
0))])
(finish-proc result)))))]
[else
(define (place-enabled?) #f)
(define (fork-place thunk) #f)])
(define (fork-place thunk finish-proc) #f)])
(define do-start-place void)
(define (install-start-place! proc)
@ -72,3 +81,9 @@
(define (start-place path sym in out err)
(do-start-place path sym in out err))
(define (place-exit v)
(let ([esc (unbox place-esc-box)])
(if esc
(esc v)
(#%exit v))))

View File

@ -2,10 +2,6 @@
(export)
(import (rename (chezpart)
[define chez:define])
(rename (only (chezscheme)
sleep
printf)
[sleep chez:sleep])
(rename (rumble)
[rumble:break-enabled-key break-enabled-key]
;; Remapped to place-local register operations:
@ -55,14 +51,35 @@
;; Chain to place-register handling:
[(_ . rest) #'(place:define . rest)]))
(define (exit n)
(chez:exit n))
;; This implementation of `sleep`, `get-wakeup-handle`, and `wakeup` is relevant
;; only for running the places part of the thread demo. The relevant callbacks get
;; replaced by the "io" layer to use rktio-based functions.
(define sleep-interrupted (rumble:unsafe-make-place-local #f))
(define (sleep secs)
(define isecs (inexact->exact (floor secs)))
(chez:sleep (make-time 'time-duration
(inexact->exact (floor (* (- secs isecs) 1e9)))
isecs)))
(let ([isecs (inexact->exact (floor secs))]
[zero-secs (make-time 'time-duration 0 0)]
[pause-secs (make-time 'time-duration 100000 0)])
(let loop ([all-secs (make-time 'time-duration
(inexact->exact (floor (* (- secs isecs) 1e9)))
isecs)])
(unless (or (time<=? all-secs zero-secs)
(let ([b (rumble:unsafe-place-local-ref sleep-interrupted)])
(and b (unbox b))))
(#%sleep pause-secs)
(loop (subtract-duration all-secs pause-secs))))
(let ([b (rumble:unsafe-place-local-ref sleep-interrupted)])
(when b
(set-box! b #f)))))
(define (get-wakeup-handle)
(let ([b (rumble:unsafe-place-local-ref sleep-interrupted)])
(or b
(begin
;; There's a race condition here.. Avoid triggering it
;; in the thread demo.
(rumble:unsafe-place-local-set! sleep-interrupted (box #f))
(get-wakeup-handle)))))
(define (wakeup b)
(set-box! b #t))
(define (primitive-table key)
(case key
@ -105,9 +122,13 @@
'poll-async-callbacks poll-async-callbacks
'disable-interrupts disable-interrupts
'enable-interrupts enable-interrupts
'sleep sleep
'get-wakeup-handle get-wakeup-handle
'wakeup wakeup
'fork-place rumble:fork-place
'start-place rumble:start-place
'fork-pthread rumble:fork-thread
'exit place-exit
'pthread? rumble:thread?
'get-thread-id rumble:get-thread-id
'make-condition rumble:make-condition

View File

@ -104,6 +104,14 @@
(check-signals)))
((sandman-do-poll timeout-sandman) mode wakeup))
;; get-wakeup
(lambda ()
(rktio_get_signal_handle rktio))
;; wakeup
(lambda (h)
(rktio_signal_received_at h))
;; any-sleepers?
(lambda ()
((sandman-do-any-sleepers? timeout-sandman)))

View File

@ -2,5 +2,6 @@
(require "bootstrap.rkt" ; must be before "main.rkt"
"main.rkt")
(provide (all-from-out "main.rkt"))
(provide (all-from-out "main.rkt")
;; From "bootstrap.rkt":
register-place-symbol!)

View File

@ -10,6 +10,8 @@
;; with `break-enabled-key`, and it does not support using an
;; exception handler in an engine.
(provide register-place-symbol!)
(define (make-engine thunk init-break-enabled-cell empty-config?)
(define ready-s (make-semaphore))
(define s (make-semaphore))
@ -135,32 +137,68 @@
(struct exn:break:terminate/non-engine exn:break/non-engine ())
(define (make-pthread-parameter v)
(define x v)
(define l (unsafe-make-place-local v))
(case-lambda
[() x]
[(v) (set! x v)]))
[() (unsafe-place-local-ref l)]
[(v) (unsafe-place-local-set! l v)]))
(define place-local-table (make-thread-cell (make-hasheq)))
(define initial-place-local-table (make-hasheq))
(define place-local-table (make-parameter initial-place-local-table))
(define (unsafe-make-place-local v)
(define key (vector v 'place-locale))
(hash-set! (thread-cell-ref place-local-table) key v)
(hash-set! (place-local-table) key v)
key)
(define (unsafe-place-local-ref key)
(hash-ref (thread-cell-ref place-local-table) key (vector-ref key 0)))
(hash-ref (place-local-table) key (vector-ref key 0)))
(define (unsafe-place-local-set! key val)
(hash-set! (thread-cell-ref place-local-table) key val))
(hash-set! (place-local-table) key val))
(define (fork-place thunk)
(thread (lambda ()
(thread-cell-set! place-local-table (make-hasheq))
(thunk))))
(define wakeables (make-weak-hasheq))
(define (wakeable-sleep msecs)
(define s (make-semaphore))
(hash-set! wakeables (place-local-table) s)
(sync/timeout msecs s)
(void))
(define (get-wakeup-handle)
(place-local-table))
(define (wakeup t)
(define s (hash-ref wakeables t #f))
(when s (semaphore-post s)))
(define place-done-prompt (make-continuation-prompt-tag 'place-done))
(define (fork-place thunk finish)
(parameterize ([place-local-table (make-hasheq)])
(thread (lambda ()
(define v
(call-with-continuation-prompt
thunk
place-done-prompt))
(finish v)))))
(define place-symbols (make-hasheq))
(define (start-place mod sym in out err)
(lambda (finish)
(void)))
(finish #f #f #f)
((hash-ref place-symbols sym))))
;; For use in "demo.rkt"
(define (register-place-symbol! sym proc)
(hash-set! place-symbols sym proc))
(define (place-exit v)
(if (eq? initial-place-local-table (place-local-table))
(exit v)
(abort-current-continuation
place-done-prompt
(lambda () v))))
(primitive-table '#%pthread
(hash
@ -193,29 +231,33 @@
'poll-async-callbacks (lambda () null)
'disable-interrupts void
'enable-interrupts void
'sleep wakeable-sleep
'get-wakeup-handle get-wakeup-handle
'wakeup wakeup
'fork-place fork-place
'start-place start-place
'exit place-exit
'fork-pthread (lambda args
(error "fork-pthread: not ready"))
'pthread? (lambda args
(error "thread?: not ready"))
'get-thread-id (lambda args
(error "get-pthread-id: not ready"))
'make-condition (lambda () 'condition)
'condition-wait (lambda args
(error "condition-wait: not ready"))
'condition-signal (lambda args
(error "condition-signal: not ready"))
'make-condition (lambda () (make-semaphore))
'condition-wait (lambda (c s)
(semaphore-post s)
(semaphore-wait c)
(semaphore-wait s))
'condition-signal (lambda (c)
(semaphore-post c))
'condition-broadcast (lambda args
(error "condition-broadcast: not ready"))
'threaded? (lambda () #f)
'current-engine-state (lambda args
(error "current-engine state: not ready"))
'make-mutex (lambda () 'mutex)
'mutex-acquire (lambda args
(error "mutex-acquire: not ready"))
'mutex-release (lambda args
(error "mutex-release: not ready"))))
'make-mutex (lambda () (make-semaphore 1))
'mutex-acquire (lambda (s) (semaphore-wait s))
'mutex-release (lambda (s) (semaphore-post s))))
;; add dummy definitions that implement pthreads and conditions etc.
;; dummy definitions that error

View File

@ -28,6 +28,11 @@
raise-custodian-is-shut-down
set-post-shutdown-action!)
(module+ scheduling
(provide do-custodian-shutdown-all
set-root-custodian!
create-custodian))
(struct custodian (children ; weakly maps maps object to callback
[shut-down? #:mutable]
[shutdown-sema #:mutable]
@ -59,7 +64,7 @@
#f ; shutdown semaphore
#f))
(define root-custodian (create-custodian))
(define-place-local root-custodian (create-custodian))
(define/who current-custodian
(make-parameter root-custodian
@ -67,6 +72,11 @@
(check who custodian? v)
v)))
;; To initialize a new place:
(define (set-root-custodian! c)
(set! root-custodian c)
(current-custodian c))
(define/who (make-custodian [parent (current-custodian)])
(check who custodian? parent)
(define c (create-custodian))

View File

@ -501,10 +501,30 @@
(check #t (symbol? (will-execute we)))
;; Check places, where the various export symbols passed to
;; `dynamic-place` are built into "bootstrap.rkt"
(define pl (dynamic-place 'dummy 'no-op #f #f #f))
(check #t (place? pl))
(sleep 1)
;; `dynamic-place` are registered via `register-place-symbol!`
(register-place-symbol! 'nothing void)
(define pl1 (dynamic-place 'dummy 'nothing #f #f #f))
(check #t (place? pl1))
(check 0 (place-wait pl1))
(register-place-symbol! 'exit1 (lambda () (exit 1)))
(define pl2 (dynamic-place 'dummy 'exit1 #f #f #f))
(check #t (place? pl2))
(check 1 (place-wait pl2))
(register-place-symbol! 'loop (lambda () (let loop () (loop))))
(define pl3 (dynamic-place 'dummy 'loop #f #f #f))
(check #t (place? pl3))
(place-break pl3)
(check 1 (place-wait pl3))
(printf "[That break was from a place, and it's expected]\n")
(define pl4 (dynamic-place 'dummy 'loop #f #f #f))
(check #f (sync/timeout 0.01 (place-dead-evt pl4)))
(place-kill pl4)
(check 1 (place-wait pl4))
(check #t (evt? (sync (place-dead-evt pl4))))
(check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4))))
(set! done? #t)))

View File

@ -1,7 +1,6 @@
#lang racket/base
(require (only-in racket/base
[exit host:exit])
"../common/check.rkt"
(require "../common/check.rkt"
"host.rkt"
"plumber.rkt")
(provide exit
@ -17,6 +16,7 @@
(check who (procedure-arity-includes/c 1) p)
p)))
;; In a non-main place, must be called only in the scheduler:
(define (force-exit v)
(cond
[(byte? v)

View File

@ -253,7 +253,7 @@
(let loop ([id 1])
(cond
[(< id (+ 1 THREAD-COUNT))
(cons (worker id (make-lock) (chez:make-mutex) (chez:make-condition) (make-queue) #t)
(cons (worker id (make-lock) (host:make-mutex) (host:make-condition) (make-queue) #t)
(loop (+ id 1)))]
[else
'()])))
@ -277,10 +277,10 @@
(let ([w (pick-worker)])
(with-lock ((worker-lock w) (get-caller))
(chez:mutex-acquire (worker-mutex w))
(host:mutex-acquire (worker-mutex w))
(queue-add! (worker-queue w) f)
(chez:condition-signal (worker-cond w))
(chez:mutex-release (worker-mutex w)))))
(host:condition-signal (worker-cond w))
(host:mutex-release (worker-mutex w)))))
(define (pick-worker)
(define workers (scheduler-workers global-scheduler))
@ -304,9 +304,9 @@
(cond
[(not (queue-empty? (worker-queue w))) ;; got work in meantime
(void)]
[(chez:mutex-acquire m #f) ;; cannot acquire lock while worker is being given work.
(chez:condition-wait (worker-cond w) m)
(chez:mutex-release m)]
[(host:mutex-acquire m #f) ;; cannot acquire lock while worker is being given work.
(host:condition-wait (worker-cond w) m)
(host:mutex-release m)]
[else ;; try to get lock again.
(try)])))
@ -343,9 +343,9 @@
((future*-engine future) TICKS void complete (expire future worker))]
[(not (future*-cont future)) ;; don't want to reschedule future with a saved continuation
(with-lock ((worker-lock worker) (get-caller))
(chez:mutex-acquire (worker-mutex worker))
(host:mutex-acquire (worker-mutex worker))
(queue-add! (worker-queue worker) future)
(chez:mutex-release (worker-mutex worker)))]
(host:mutex-release (worker-mutex worker)))]
[else
(with-lock ((future*-lock future) (get-caller))
(future:condition-signal (future*-cond future)))])))

View File

@ -66,17 +66,24 @@
[disable-interrupts host:disable-interrupts]
[enable-interrupts host:enable-interrupts]
;; Support for the thre scheduler and interrupts
;; across places
[sleep host:sleep]
[get-wakeup-handle host:get-wakeup-handle]
[wakeup host:wakeup]
[fork-place host:fork-place]
[start-place host:start-place]
[exit host:exit]
fork-pthread
pthread?
[get-thread-id get-pthread-id]
[make-condition chez:make-condition]
[condition-wait chez:condition-wait]
[condition-signal chez:condition-signal]
[condition-broadcast chez:condition-broadcast]
[make-mutex chez:make-mutex]
[mutex-acquire chez:mutex-acquire]
[mutex-release chez:mutex-release]
[make-condition host:make-condition]
[condition-wait host:condition-wait]
[condition-signal host:condition-signal]
[condition-broadcast host:condition-broadcast]
[make-mutex host:make-mutex]
[mutex-acquire host:mutex-acquire]
[mutex-release host:mutex-release]
threaded?)

View File

@ -24,30 +24,30 @@
(define/who (unsafe-make-os-semaphore)
(unless threaded? (raise-unsupported who))
(os-semaphore 0 (chez:make-mutex) (chez:make-condition)))
(os-semaphore 0 (host:make-mutex) (host:make-condition)))
(define/who (unsafe-os-semaphore-post s)
(check who os-semaphore? s)
(chez:mutex-acquire (os-semaphore-mutex s))
(host:mutex-acquire (os-semaphore-mutex s))
(when (zero? (os-semaphore-count s))
(chez:condition-signal (os-semaphore-condition s)))
(host:condition-signal (os-semaphore-condition s)))
(set-os-semaphore-count! s (add1 (os-semaphore-count s)))
(chez:mutex-release (os-semaphore-mutex s)))
(host:mutex-release (os-semaphore-mutex s)))
;; interrupts must be enabled when waiting on a semaphore; otherwise,
;; the wait will block GCs, likely deadlocking this thread and another
;; thread that is working toward posting the semaphore
(define/who (unsafe-os-semaphore-wait s)
(check who os-semaphore? s)
(chez:mutex-acquire (os-semaphore-mutex s))
(host:mutex-acquire (os-semaphore-mutex s))
(let loop ()
(cond
[(zero? (os-semaphore-count s))
(chez:condition-wait (os-semaphore-condition s) (os-semaphore-mutex s))
(host:condition-wait (os-semaphore-condition s) (os-semaphore-mutex s))
(loop)]
[else
(set-os-semaphore-count! s (sub1 (os-semaphore-count s)))]))
(chez:mutex-release (os-semaphore-mutex s)))
(host:mutex-release (os-semaphore-mutex s)))
(define (raise-unsupported who)
(raise

View File

@ -6,11 +6,14 @@
"schedule.rkt"
"atomic.rkt"
"thread.rkt"
(submod "thread.rkt" for-place)
"custodian.rkt"
(submod "custodian.rkt" scheduling)
"plumber.rkt"
"exit.rkt"
"sync.rkt"
"evt.rkt")
"evt.rkt"
"sandman.rkt")
(provide dynamic-place
place?
@ -32,67 +35,158 @@
;; ----------------------------------------
(struct place ([result #:mutable]
(struct place (lock
[result #:mutable] ; byte or #f, where #f means "not done"
[queued-result #:mutable] ; non-#f triggers a place exit
custodian
[post-shutdown #:mutable]
pumper-threads))
[post-shutdown #:mutable] ; list of callbacks
pumper-threads ; vector of up to three pumper threads
[pending-break #:mutable] ; #f, 'break, 'hangup, or 'terminate
done-waiting ; hash table of places to ping when this one ends
[wakeup-handle #:mutable]))
(define-place-local current-place #f)
(define place-prompt-tag (make-continuation-prompt-tag 'place))
(define (dynamic-place path sym in out err)
(define c (make-custodian))
(define new-place (place #f ; result
(define c (create-custodian))
(define lock (host:make-mutex))
(define started (host:make-condition))
(define done-waiting (make-hasheq))
(define new-place (place lock
#f ; result
#f ; queued-result
c
'() ; post-shutdown
(make-vector 3 #f))) ; pumper-threads
(make-vector 3 #f) ; pumper-threads
#f ; pending-break
done-waiting
#f))
(define orig-plumber (make-plumber))
(define (default-exit v)
(plumber-flush-all orig-plumber)
(unsafe-abort-current-continuation/no-wind
place-prompt-tag
(if (byte? v) v 0)))
(host:mutex-acquire lock)
(set-place-queued-result! new-place (if (byte? v) v 0))
(host:mutex-release lock)
;; Switch to scheduler, so it can exit:
(engine-block))
;; Start the new place
(host:fork-place
(lambda ()
(define finish (host:start-place path sym in out err))
(call-in-main-thread
(call-in-another-main-thread
c
(lambda ()
(define result
(call-with-continuation-prompt
(lambda ()
(set! current-place new-place)
(current-custodian c)
(current-plumber orig-plumber)
(exit-handler default-exit)
(finish
(lambda (in-th out-th err-th)
(vector-set! (place-pumper-threads place) 0 in-th)
(vector-set! (place-pumper-threads place) 1 out-th)
(vector-set! (place-pumper-threads place) 2 err-th)))
(default-exit 0))
place-prompt-tag
(lambda (v) v)))
(set-place-result! new-place result)))))
(set! current-place new-place)
(current-plumber orig-plumber)
(exit-handler default-exit)
;; The finish function reports some I/O related
;; information to store in the place, and when that
;; callback returns, it starts loading the specified
;; module
(call-with-continuation-prompt
(lambda ()
(finish
(lambda (in-th out-th err-th)
(vector-set! (place-pumper-threads new-place) 0 in-th)
(vector-set! (place-pumper-threads new-place) 1 out-th)
(vector-set! (place-pumper-threads new-place) 2 err-th)
(host:mutex-acquire lock)
(set-place-wakeup-handle! new-place (sandman-get-wakeup-handle))
(host:condition-signal started) ; place is sufficiently started
(host:mutex-release lock))))
(default-continuation-prompt-tag)
(lambda (thunk)
;; Thread ended with escape => exit with status 1
(call-with-continuation-prompt thunk)
(default-exit 1)))
(default-exit 0))))
(lambda (result)
;; Place is done, so save the result and alert anyone waiting on
;; the place
(do-custodian-shutdown-all c)
(host:mutex-acquire lock)
(set-place-result! new-place result)
(host:mutex-release lock)
(for ([k (in-hash-keys done-waiting)])
(cond
[(place? k)
(host:mutex-acquire (place-lock k))
(unless (place-result k)
(sandman-wakeup (place-wakeup-handle k)))
(host:mutex-release (place-lock k))]
[else (sandman-wakeup k)]))
(hash-clear! done-waiting)))
;; Wait for the place to start, then return the place object
(host:mutex-acquire lock)
(host:condition-wait started lock)
(host:mutex-release lock)
new-place)
(define/who (place-break p [kind #f])
(check who place? p)
(unless (or (not kind) (eq? kind 'hangup) (eq? kind 'terminate))
(raise-argument-error who "(or/c #f 'hangup 'terminate)" kind))
(void))
(host:mutex-acquire (place-lock p))
(define pending-break (place-pending-break p))
(when (or (not pending-break)
(break>? (or kind 'break) pending-break))
(set-place-pending-break! p (or kind 'break))
(sandman-wakeup (place-wakeup-handle p)))
(host:mutex-release (place-lock p)))
(void
(set-check-place-break!
;; Called in atomic mode by scheduler
(lambda ()
(define p current-place)
(when p
(host:mutex-acquire (place-lock p))
(define queued-result (place-queued-result p))
(define break (place-pending-break p))
(when break
(set-place-pending-break! p #f))
(host:mutex-release (place-lock p))
(when queued-result
(force-exit queued-result))
(when break
(do-break-thread root-thread break #f))))))
(define/who (place-kill p)
(check who place? p)
(host:mutex-acquire (place-lock p))
(unless (or (place-result p)
(place-queued-result p))
(set-place-queued-result! p 1))
(host:mutex-release (place-lock p))
(place-wait p)
(void))
(define/who (place-wait p)
(check who place? p)
(sync never-evt))
(sync (place-done-evt p #t)))
(struct place-done-evt (p get-result?)
#:property prop:evt (poller (lambda (self poll-ctx)
(assert-atomic-mode)
(define p (place-done-evt-p self))
(host:mutex-acquire (place-lock p))
(define result (place-result p))
(unless result
(hash-set! (place-done-waiting p)
(or current-place
(sandman-get-wakeup-handle))
#t))
(host:mutex-release (place-lock p))
(if result
(if (place-done-evt-get-result? self)
(values (list result) #f)
(values (list self) #f))
(values #f self))))
#:reflection-name 'place-dead-evt)
(define/who (place-dead-evt p)
(check who place? p)
never-evt)
(place-done-evt p #f))
(define/who (place-sleep msecs)
(void))
@ -124,7 +218,8 @@
#f)
(define (unsafe-add-post-custodian-shutdown proc)
(atomically
(set-place-post-shutdown! current-place
(cons proc
(place-post-shutdown current-place)))))
(when current-place
(atomically
(set-place-post-shutdown! current-place
(cons proc
(place-post-shutdown current-place))))))

View File

@ -15,6 +15,9 @@
do-poll ; <mode> (thread -> any) -> (void), calls function on any thread to wake up
; where <mode> is 'fast or 'slow
do-get-wakeup ; -> <wakeup-handle>, identifies current place
do-wakeup ; <wakeup-handle> -> (void), wakes up `do-sleep` call
do-any-sleepers? ; -> boolean
do-sleepers-external-events ; -> <ext-event-set> for sleepers

View File

@ -3,7 +3,8 @@
"check.rkt"
"tree.rkt"
"internal-error.rkt"
"sandman-struct.rkt")
"sandman-struct.rkt"
"host.rkt")
;; A "sandman" manages the set of all sleeping threads that may need
;; to be awoken in response to an external event, and it implements
@ -35,6 +36,8 @@
sandman-remove-sleeping-thread!
sandman-poll
sandman-sleep
sandman-get-wakeup-handle
sandman-wakeup
sandman-any-sleepers?
sandman-sleepers-external-events
sandman-condition-wait
@ -67,6 +70,14 @@
(define (sandman-sleep exts)
((sandman-do-sleep the-sandman) exts))
;; potentially in atomic mode
(define (sandman-get-wakeup-handle)
((sandman-do-get-wakeup the-sandman)))
;; potentially in atomic mode
(define (sandman-wakeup h)
((sandman-do-wakeup the-sandman) h))
;; in atomic mode
(define (sandman-any-sleepers?)
((sandman-do-any-sleepers? the-sandman)))
@ -129,7 +140,7 @@
(sandman
;; sleep
(lambda (timeout-at)
(sleep (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0)))
(host:sleep (/ (- (or timeout-at (distant-future)) (current-inexact-milliseconds)) 1000.0)))
;; poll
(lambda (mode wakeup)
@ -141,6 +152,14 @@
(for ([t (in-hash-keys threads)])
(wakeup t))))))
;; get-wakeup-handle
(lambda ()
(host:get-wakeup-handle))
;; wakeup
(lambda (h)
(host:wakeup h))
;; any-sleepers?
(lambda ()
(not (tree-empty? sleeping-threads)))

View File

@ -10,14 +10,18 @@
(submod "thread.rkt" scheduling)
"system-idle-evt.rkt"
"exit.rkt"
"future.rkt")
"future.rkt"
"custodian.rkt"
(submod "custodian.rkt" scheduling))
;; Many scheduler details are implemented in "thread.rkt", but this
;; module handles the thread selection, thread swapping, and
;; process sleeping.
(provide call-in-main-thread
set-atomic-timeout-callback!)
call-in-another-main-thread
set-atomic-timeout-callback!
set-check-place-break!)
(define TICKS 100000)
@ -28,6 +32,12 @@
(make-initial-thread thunk)
(select-thread!))
;; Initializes the thread system in a new place:
(define (call-in-another-main-thread c thunk)
(make-another-initial-thread-group)
(set-root-custodian! c)
(call-in-main-thread thunk))
;; ----------------------------------------
(define (select-thread! [pending-callbacks null])
@ -37,6 +47,7 @@
pending-callbacks))
(host:poll-will-executors)
(check-external-events 'fast)
(check-place-break)
(when (and (null? callbacks)
(all-threads-poll-done?)
(waiting-on-external-or-idle?))
@ -219,3 +230,9 @@
(begin0
atomic-timeout-callback
(set! atomic-timeout-callback cb)))
;; ----------------------------------------
(define check-place-break void)
(define (set-check-place-break! proc)
(set! check-place-break proc))

View File

@ -9,6 +9,7 @@
thread-group?
make-thread-group
current-thread-group
make-another-initial-thread-group
;; Used by scheduler
root-thread-group
@ -32,7 +33,10 @@
[chain #:mutable] ; children remaining to be scheduled round-robin
[chain-end #:mutable]))
(define root-thread-group (thread-group 'none 'none #f #f #f #f))
(define (make-root-thread-group)
(thread-group 'none 'none #f #f #f #f))
(define-place-local root-thread-group (make-root-thread-group))
(define-place-local num-threads-in-groups 0)
@ -42,6 +46,10 @@
(check who thread-group? v)
v)))
(define (make-another-initial-thread-group)
(set! root-thread-group (make-root-thread-group))
(current-thread-group root-thread-group))
(define/who (make-thread-group [parent (current-thread-group)])
(check who thread-group? parent)
(define tg (thread-group 'none 'none parent #f #f #f))

View File

@ -84,6 +84,12 @@
break-max))
;; Exports needed by "place.rkt":
(module* for-place #f
(provide root-thread
do-break-thread
break>?))
;; ----------------------------------------
(struct thread node (name