cs & thread: refactor and finish futures implementation

Complete the implementation of futures, fsemaphores, future logging,
and their cooperation with threads, places, and custodians.
This commit is contained in:
Matthew Flatt 2019-06-18 11:15:44 -06:00
parent eb4ec000b0
commit ba8d442e75
49 changed files with 1260 additions and 769 deletions

View File

@ -101,10 +101,10 @@ with @racket['result], @racket['abort], or @racket['suspend]; and
In process 0, some event pairs can be nested within other event pairs:
@racket['sync], @racket['block], or @racket['touch] with
@racket['result] or @racket['abort]; and @racket['touch-pause] with
@racket['touch-resume].
@racket['result] or @racket['abort]; @racket['touch-pause] with
@racket['touch-resume]; and @racket['start-work] with @racket['end-work].
An @racket['block] in process 0 is generated when an unsafe operation
A @racket['block] in process 0 is generated when an unsafe operation
is handled. This type of event will contain a symbol in the
@racket[unsafe-op-name] field that is the name of the operation. In all
other cases, this field contains @racket[#f].
@ -123,10 +123,10 @@ values depending on both the @racket[action] and @racket[prim-name] fields:
@item{@racket['touch] on process 0: contains the integer ID of the future
being touched.}
@item{@racket['sync] and @racket[prim-name] = @racket[|allocate memory|]:
@item{@racket['sync] and @racket[prim-name] is @racket['|allocate memory|]:
The size (in bytes) of the requested allocation.}
@item{@racket['sync] and @racket[prim-name] = @racket[|jit_on_demand|]:
@item{@racket['sync] and @racket[prim-name] is @racket['|jit_on_demand|]:
The runtime thread is performing a JIT compilation on behalf of the
future @racket[future-id]. The field contains the name of the function
being JIT compiled (as a symbol).}

View File

@ -1,8 +1,10 @@
#lang scheme/base
(require scheme/future
scheme/list
rackunit)
(require scheme/future
scheme/list
(rename-in rackunit
[check-equal? real:check-equal?])
(for-syntax racket/base))
#|Need to add expressions which raise exceptions inside a
future thunk which can be caught at the touch site
@ -12,7 +14,14 @@ Both future and touch should be called from within a future thunk.
We should also test deep continuations.
|#
|#
(define-syntax (check-equal? stx)
(syntax-case stx ()
[(_ e ...)
#`(begin
(writeln `(check-equal? e ...))
#,(syntax/loc stx (real:check-equal? e ...)))]))
;Tests specific to would-be-future
(define-struct future-event (future-id process-id what time prim-name target-fid)
@ -53,8 +62,17 @@ We should also test deep continuations.
(printf "hello3")))])
(touch f)
(let ([log (raw-log-output)])
(check-equal? 3 (length (get-blocks log)))
(check-equal? 3 (length (get-blocks-on 'printf log)))
;; Racket CS would-be futures behave slightly differently
;; --- reflecting the way that futures always suspend
;; when they hit a blocking operation
(case (system-type 'vm)
[(chez-scheme)
(check-equal? 1 (length (get-blocks log)))
;; `printf` is ok up to the point that it tries to get the currrent output port:
(check-equal? 1 (length (get-blocks-on 'continuation-mark-set-first log)))]
[else
(check-equal? 3 (length (get-blocks log)))
(check-equal? 3 (length (get-blocks-on 'printf log)))])
(check-equal? 0 (length (get-touch-blocks log)))))
(let ([f1 (would-be-future
@ -68,14 +86,21 @@ We should also test deep continuations.
42))))))])
(touch f1)
(let ([log (raw-log-output)])
(check-equal? 5 (length (get-blocks log)))
(check-equal? 1 (length (get-touch-blocks log)))
(check-equal? 4 (length (get-blocks-on 'printf log)))
(check-equal? 1 (length (get-blocks-on 'would-be-future log))))))
(case (system-type 'vm)
[(chez-scheme)
(check-equal? 2 (length (get-blocks log)))
(check-equal? 0 (length (get-touch-blocks log)))
(check-equal? 2 (length (get-blocks-on 'continuation-mark-set-first log)))
(check-equal? 0 (length (get-blocks-on 'would-be-future log)))]
[else
(check-equal? 5 (length (get-blocks log)))
(check-equal? 1 (length (get-touch-blocks log)))
(check-equal? 4 (length (get-blocks-on 'printf log)))
(check-equal? 1 (length (get-blocks-on 'would-be-future log)))]))))
;; ----------------------------------------
(define (run-tests func)
(define (run-tests func)
(check-equal?
'yes
(let/ec k
@ -854,7 +879,8 @@ We should also test deep continuations.
(define f (func (lambda () (loop i))))
(sleep 0.1)
(with-handlers ([exn:fail? (lambda (exn)
(unless (regexp-match #rx"expected number of values not received" (exn-message exn))
(unless (or (regexp-match #rx"expected number of values not received" (exn-message exn))
(regexp-match #rx"returned two values to single value return context" (exn-message exn)))
(raise exn)))])
(touch f))))

View File

@ -433,11 +433,11 @@
(define engine-tag (default-continuation-prompt-tag))
(define e (make-engine (lambda () 'done) engine-tag #f #f))
(define e (make-engine (lambda () 'done) engine-tag #f #f #f))
(check (cdr (e 100 void list vector))
'(done))
(define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f))
(define e-forever (make-engine (lambda () (let loop () (loop))) engine-tag #f #f #f))
(check (vector? (e-forever 10 void list vector))
#t)
@ -450,7 +450,7 @@
[else
(engine-block)
(loop (sub1 n))])))
engine-tag
engine-tag #f
#f #f))
(check (let ([started 0])
(let loop ([e e-10] [n 0])
@ -475,7 +475,7 @@
(lambda () (set! pre (add1 pre)))
(lambda () (loop (sub1 n)))
(lambda () (set! post (add1 post))))])))
engine-tag
engine-tag #f
#f #f)])
(check (let loop ([e e-10/dw] [n 0])
(e 200
@ -497,10 +497,10 @@
(thread-cell-set! pt (add1 p-old))
(list u-old
p-old
(make-engine gen engine-tag #f #f)
(make-engine gen engine-tag #f #f #f)
(thread-cell-ref ut)
(thread-cell-ref pt)))
(define l1 ((make-engine gen engine-tag #f #f)
(define l1 ((make-engine gen engine-tag #f #f #f)
100
void
(lambda (remain l) l)
@ -526,7 +526,7 @@
(check (procedure? my-param) #t)
(let ([e (with-continuation-mark parameterization-key
(extend-parameterization (continuation-mark-set-first #f parameterization-key) my-param 'set)
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f))])
(make-engine (lambda () (|#%app| my-param)) engine-tag #f #f #f))])
(check (|#%app| my-param) 'init)
(check (e 1000 void (lambda (remain v) v) (lambda (e timeout?) (error 'engine "oops"))) 'set))
@ -622,7 +622,7 @@
(loop (sub1 n)))))
(lambda ()
(set! post (add1 post))))))))
engine-tag
engine-tag #f
#f #f))
(check (let ([prefixes 0])

View File

@ -242,6 +242,28 @@
(check #t (evt? (sync (place-dead-evt pl4))))
(check #t (evt? (sync/timeout 0.01 (place-dead-evt pl4))))))
(let ()
(check 'ok (touch (future (lambda () 'ok))))
(check 'ok (touch (would-be-future (lambda () 'ok))))
(check 'ok (touch (would-be-future (lambda () (touch (would-be-future (lambda () 'ok))))))))
(let ()
(define fts (let loop ([i 0])
(if (= i 50)
'()
(cons
(future (lambda ()
(let loop ([i i])
(if (zero? i)
i
(add1 (loop (sub1 i)))))))
(loop (add1 i))))))
(check (let loop ([i 0])
(if (= i 50)
'()
(cons i (loop (add1 i)))))
(map touch fts)))
;; Measure thread quantum:
#;
(let ([t1 (thread (lambda () (let loop () (loop))))]

View File

@ -478,4 +478,6 @@
(apply 1/fprintf (|#%app| 1/current-error-port) fmt args)))
(set-ffi-get-lib-and-obj! ffi-get-lib ffi-get-obj ptr->address)
(set-make-async-callback-poll-wakeup! unsafe-make-signal-received)
(set-get-machine-info! get-machine-info))
(set-get-machine-info! get-machine-info)
(set-processor-count! (1/processor-count))
(install-future-logging-procs! logging-future-events? log-future-event))

View File

@ -224,7 +224,9 @@
(for-each
(lambda (table)
(hash-for-each table (lambda (k v) (hash-set! primitives k v))))
tables))
tables)
;; prropagate table to the rumble layer
(install-primitives-table! primitives))
(define (outer-eval s paths format)
(if (eq? format 'interpret)

View File

@ -697,7 +697,14 @@
mutex-acquire
mutex-release
threaded?
set-future-callbacks!)
set-future-callbacks!
install-primitives-table!
continuation-current-primitive
;; compile-time use in "thread.sls"
current-atomic-virtual-register
end-atomic-virtual-register
current-future-virtual-register)
(import (rename (chezpart)
[define define/no-lift])
(rename (only (chezscheme) sleep)

View File

@ -156,24 +156,26 @@
[else (loop (cdr mc))])))))
(define/who (maybe-future-barricade tag)
(when (future? (current-future)) ;; running in a future
(check who continuation-prompt-tag? tag)
(when (current-future)
(let ([fp (strip-impersonator (current-future-prompt))]
[tag (strip-impersonator tag)])
(cond
[(eq? fp tag)
;; shortcut: boundary is the future prompt
(void)]
[(eq? tag the-root-continuation-prompt-tag)
(block)]
(block-future)]
[else
(let loop ([mc (current-metacontinuation)])
(cond
[(null? mc)
;; Won't happen normally, since every thread starts with a explicit prompt
(block)]
(block-future)]
[(eq? tag (strip-impersonator (metacontinuation-frame-tag (car mc))))
(void)]
[(eq? (metacontinuation-frame-tag (car mc)) fp)
;; tag must be above future prompt
(block)]
(block-future)]
[else
(loop (cdr mc))]))]))))
@ -1031,7 +1033,7 @@
[(marks key none-v orig-prompt-tag)
(check who continuation-mark-set? :or-false marks)
(check who continuation-prompt-tag? orig-prompt-tag)
(maybe-future-barricade orig-prompt-tag)
(unless marks (maybe-future-barricade orig-prompt-tag))
(let ([prompt-tag (strip-impersonator orig-prompt-tag)])
(let-values ([(key wrapper) (extract-continuation-mark-key-and-wrapper 'continuation-mark-set-first key)])
(let* ([v0 (if marks

View File

@ -24,7 +24,7 @@
(define (set-engine-exit-handler! proc)
(set! engine-exit proc))
(define (make-engine thunk prompt-tag init-break-enabled-cell empty-config?)
(define (make-engine thunk prompt-tag abort-handler init-break-enabled-cell empty-config?)
(let ([paramz (if empty-config?
empty-parameterization
(current-parameterization))])
@ -44,7 +44,8 @@
(with-continuation-mark
parameterization-key paramz
(|#%app| thunk)))
prompt-tag))
prompt-tag
abort-handler))
engine-return))))
(if empty-config?
(make-empty-thread-cell-values)

View File

@ -533,44 +533,33 @@
(lambda (slow-k l)
l)))
(define (continuation->trace* k)
(call-with-values
(lambda ()
(let loop ([k k] [slow-k k] [move? #f])
(cond
[(or (not (#%$continuation? k))
(eq? k #%$null-continuation))
(values slow-k '())]
(define primitive-names #f)
(define (install-primitives-table! primitives)
(set! primitive-names primitives))
;; Simplified variant of `continuation->trace` that can be called to
;; get a likely primitive to blame for a blocking future.
(define (continuation-current-primitive k exclusions)
(let loop ([k (if (full-continuation? k) (full-continuation-k k) k)])
(cond
[(or (not (#%$continuation? k))
(eq? k #%$null-continuation))
#f]
[else
(let* ([name (or (let ([n #f])
(and n
(string->symbol (format "body of ~a" n))))
(let* ([c (#%$continuation-return-code k)]
[n (#%$code-name c)])
(and n (string->symbol n))))])
(cond
[(and name
(hash-ref primitive-names name #f)
(not (#%memq name exclusions)))
name]
[else
(let* ([name (or (let ([n #f])
(and n
(string->symbol (format "body of ~a" n))))
(let* ([c (#%$continuation-return-code k)]
[n (#%$code-name c)])
n))]
[desc
(let* ([ci (#%$code-info (#%$continuation-return-code k))]
[src (and
(code-info? ci)
(or
;; when per-expression inspector info is available:
(find-rpi (#%$continuation-return-offset k) ci)
;; when only per-function source location is available:
(code-info-src ci)))])
(and (or name src)
(cons name src)))])
(#%$split-continuation k 0)
(call-with-values
(lambda () (loop (#%$continuation-link k) (if move? (#%$continuation-link slow-k) slow-k) (not move?)))
(lambda (slow-k l)
(let ([l (if desc
(cons desc l)
l)])
(when (eq? k slow-k)
(hashtable-set! cached-traces k l))
(values slow-k l)))))])))
(lambda (slow-k l)
l)))
(#%$split-continuation k 0)
(loop (#%$continuation-link k))]))])))
(define (traces->context ls)
(let loop ([l '()] [ls ls])

View File

@ -1850,8 +1850,8 @@
(cond
[(eqv? (place-thread-category) PLACE-MAIN-THREAD)
;; In the main thread of a place. We must have gotten here by a
;; foreign call that called back, so interrupts are currently
;; disabled.
;; foreign call that called back, so interrupts are currently
;; disabled.
(cond
[(not atomic?)
;; reenable interrupts
@ -1876,7 +1876,7 @@
[q async-callback-queue]
[m (async-callback-queue-lock q)]
[need-interrupts?
;; If we created this therad by `fork-pthread`, we must
;; If we created this thread by `fork-pthread`, we must
;; have gotten here by a foreign call, so interrupts are
;; currently disabled
(eqv? (place-thread-category) PLACE-KNOWN-THREAD)])

View File

@ -1,14 +1,14 @@
;; Futures API
;; We need a little support for futures, because they interact with
;; continuation operations that may need to block the future.
(define future? (lambda (f) #f))
(define current-future (lambda () #f))
(define block (lambda () (void)))
(define-syntax (current-future stx)
(syntax-case stx ()
[(_) (with-syntax ([pos current-future-virtual-register])
#'(virtual-register pos))]))
(define block-future (lambda () (void)))
(define current-future-prompt (lambda () (void)))
(define future-wait (lambda () (void)))
(define (set-future-callbacks! _future? _current-future _block wait cfp)
(set! future? _future?)
(set! current-future _current-future)
(set! block _block)
(set! future-wait wait)
(set! current-future-prompt cfp))
(define (set-future-callbacks! block current-prompt)
(set! block-future block)
(set! current-future-prompt current-prompt))

View File

@ -2,7 +2,10 @@
;; pthread-specific bindings.
;; The last few virtual registers are reserved for use by the thread system
(meta define num-reserved-virtual-registers 2)
(meta define num-reserved-virtual-registers 3)
(meta define current-atomic-virtual-register (- (virtual-register-count) 1))
(meta define end-atomic-virtual-register (- (virtual-register-count) 2))
(meta define current-future-virtual-register (- (virtual-register-count) 3))
(meta define virtual-register-initial-values '())
@ -30,5 +33,8 @@
[else (cons (with-syntax ([pos (datum->syntax #'here pos)]
[init (car l)])
#'(set-virtual-register! pos init))
(loop (cdr l) (add1 pos)))]))])
#'(define (id) init ...))]))
(loop (cdr l) (add1 pos)))]))]
[future-pos current-future-virtual-register])
#'(define (id)
init ...
(set-virtual-register! future-pos #f)))]))

View File

@ -32,35 +32,50 @@
[set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!]
[set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!]
[set-custodian-memory-use-proc! rumble:set-custodian-memory-use-proc!]
[set-immediate-allocation-check-proc! rumble:set-immediate-allocation-check-proc!]))
[set-immediate-allocation-check-proc! rumble:set-immediate-allocation-check-proc!]
[continuation-current-primitive rumble:continuation-current-primitive]))
(include "place-register.ss")
(define-place-register-define place:define thread-register-start thread-register-count)
;; Special handling of `current-atomic`: use the last virtual register;
;; we rely on the fact that the register's default value is 0.
;; Special handling of `current-atomic` to use the last virtual register, and
;; similarr for other. We rely on the fact that the register's default value is 0
;; or the rumble layer installs a suitable default. Also, force inline a few
;; functions and handle other special cases.
(define-syntax (define stx)
(syntax-case stx (current-atomic end-atomic-callback make-pthread-parameter unsafe-make-place-local)
;; Recognize definition of `current-atomic`:
[(_ current-atomic (make-pthread-parameter 0))
(with-syntax ([(_ id _) stx]
[n (datum->syntax #'here (- (virtual-register-count) 1))])
#'(define-syntax id
(syntax-rules ()
[(_) (virtual-register n)]
[(_ v) (set-virtual-register! n v)])))]
;; Recognize definition of `end-atomic-callback`:
[(_ end-atomic-callback (make-pthread-parameter 0))
(with-syntax ([(_ id _) stx]
[n (datum->syntax #'here (- (virtual-register-count) 2))])
#'(define-syntax id
(syntax-rules ()
[(_) (virtual-register n)]
[(_ v) (set-virtual-register! n v)])))]
;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread:
[(_ alias-id unsafe-make-place-local) #'(begin)]
;; Chain to place-register handling:
[(_ . rest) #'(place:define . rest)]))
(let ([define-as-virtual-register
(lambda (stx n)
(with-syntax ([(_ id _) stx]
[n (datum->syntax #'here n)])
#'(define-syntax id
(syntax-rules ()
[(_) (virtual-register n)]
[(_ v) (set-virtual-register! n v)]))))])
(syntax-case stx (current-atomic end-atomic-callback current-future$1
lambda make-pthread-parameter unsafe-make-place-local)
;; Recognize definition of `current-atomic`:
[(_ current-atomic (make-pthread-parameter 0))
(define-as-virtual-register stx current-atomic-virtual-register)]
;; Recognize definition of `end-atomic-callback`:
[(_ end-atomic-callback (make-pthread-parameter 0))
(define-as-virtual-register stx end-atomic-virtual-register)]
;; Recognize definition of `current-future`:
[(_ current-future$1 (make-pthread-parameter #f))
(define-as-virtual-register stx current-future-virtual-register)]
;; Force-inline `start-atomic`, `end-atomic`, and `future-barrier`,
;; at least within the core layers:
[(_ id (lambda () expr ...))
(#%memq (syntax->datum #'id) '(start-atomic end-atomic future-barrier))
#'(begin
(define proc (let ([id (lambda () expr ...)]) id))
(define-syntax (id stx)
(syntax-case stx ()
[(_) #'(let () expr ...)]
[_ #'proc])))]
;; Workaround for redirected access of `unsafe-make-place-local` from #%pthread:
[(_ alias-id unsafe-make-place-local) #'(begin)]
;; Chain to place-register handling:
[(_ . rest) #'(place:define . rest)])))
;; This implementation of `sleep`, `get-wakeup-handle`, and `wakeup` is relevant
;; only for running the places part of the thread demo. The relevant callbacks get
@ -99,7 +114,7 @@
;; directly in "compiled/thread.scm". To make that work, the
;; entries need to be registered as built-in names with the
;; expander, and they need to be listed in
;; "primitives/internal.ss".
;; "primitive/internal.ss".
(hasheq
'make-pthread-parameter make-pthread-parameter
'unsafe-root-continuation-prompt-tag unsafe-root-continuation-prompt-tag
@ -155,7 +170,8 @@
'make-mutex rumble:make-mutex
'mutex-acquire rumble:mutex-acquire
'mutex-release rumble:mutex-release
'threaded? rumble:threaded?)]
'threaded? rumble:threaded?
'continuation-current-primitive rumble:continuation-current-primitive)]
[else #f]))
;; Tie knots:
@ -178,5 +194,4 @@
(lambda ()
(current-atomic (fx- (current-atomic) 1))))
(set-future-callbacks! 1/future? 1/current-future
future-block future-wait current-future-prompt))
(set-future-callbacks! future-block current-future-prompt))

View File

@ -1,6 +1,7 @@
#lang racket/base
(require racket/list
racket/match
"../common/set.rkt"
"../run/status.rkt"
"../compile/side-effect.rkt"
"../compile/known.rkt")
@ -8,11 +9,15 @@
(struct struct-shape (num-fields num-parent-fields op-types))
(define (add-defn-known! seen-defns syms rhs)
(define (add-defn-known! seen-defns all-mutated-vars syms rhs)
(for ([s (in-list syms)])
(unless (hash-ref seen-defns s #f)
(hash-set! seen-defns s (known-defined))))
(cond
(cond
[(for/or ([s (in-list syms)])
(set-member? all-mutated-vars s))
;; Don't record anything more specific for a mutated definition
(void)]
;; Recognize known-arity `lambda` and `case-lambda`
[(and (= 1 (length syms)) (lambda-arity rhs))
=>

View File

@ -104,6 +104,7 @@
(for ([sym (in-list (defn-syms defn))])
(hash-set! seen-defns sym (known-defined)))
(add-defn-known! seen-defns
(seteq)
(defn-syms defn)
(defn-rhs defn)))))
(for ([sym (in-list (defn-syms defn))])

View File

@ -151,11 +151,11 @@
(hash-ref seen-defns (car (defn-syms d)) #f))
#:break (not (safe-defn-or-expr? d))
#:when (defn? d))
(add-defn-known! seen-defns (defn-syms d) (defn-rhs d)))
(add-defn-known! seen-defns all-mutated-vars (defn-syms d) (defn-rhs d)))
(define e (car body))
(define new-defn
(list 'define-values (defn-syms e) (simplify-expr (defn-rhs e) all-mutated-vars safe-ref? seen-defns)))
(add-defn-known! seen-defns (defn-syms e) (defn-rhs e))
(add-defn-known! seen-defns all-mutated-vars (defn-syms e) (defn-rhs e))
(cons new-defn (loop (cdr body)))]
[else
(define e

View File

@ -23,7 +23,9 @@
add-stderr-log-receiver!
add-stdout-log-receiver!
add-syslog-log-receiver!
logger-init!)
logger-init!
logging-future-events?
log-future-event)
(define (make-root-logger)
(create-logger #:topic #f #:parent #f #:propagate-filters 'none))
@ -62,6 +64,10 @@
(atomically/no-interrupts/no-wind
(log-level?* logger level topic)))
(define (logging-future-events?)
(atomically/no-interrupts/no-wind
(log-level?* root-logger 'debug 'future)))
;; In atomic mode with interrupts disabled
(define/who (log-level?* logger level topic)
(level>=? (logger-wanted-level logger topic) level))
@ -129,6 +135,10 @@
(atomically/no-interrupts/no-wind
(log-message* logger level topic message data prefix? #f)))
(define (log-future-event message data)
(atomically/no-interrupts/no-wind
(log-message* root-logger 'debug 'future message data #t #f)))
;; In atomic mode with interrupts disabled
;; Can be called in any host Scheme thread and in interrupt handler,
;; like `log-level?*`

View File

@ -59,7 +59,6 @@
get-original-error-port)
(define (io-place-init! in-fd out-fd err-fd cust plumber)
(sandman-place-init!)
(rktio-place-init!)
(logger-init!)
(shared-ltps-place-init!)

View File

@ -1,20 +0,0 @@
#lang racket/base
(require "../common/internal-error.rkt")
;; Simple lock for sandman
(provide make-lock
lock-acquire
lock-release)
(define (make-lock)
(box 0))
(define (lock-acquire box)
(let loop ()
(unless (and (= 0 (unbox box)) (box-cas! box 0 1))
(loop))))
(define (lock-release box)
(unless (box-cas! box 1 0)
(internal-error "failed to release lock")))

View File

@ -4,7 +4,6 @@
"../common/internal-error.rkt"
"../host/thread.rkt"
"../host/rktio.rkt"
"lock.rkt"
"ltps.rkt")
;; Create an extended sandman that can sleep with a rktio poll set. An
@ -22,8 +21,7 @@
sandman-poll-ctx-add-poll-set-adder!
sandman-poll-ctx-merge-timeout
sandman-set-background-sleep!
sandman-poll-ctx-poll?
sandman-place-init!)
sandman-poll-ctx-poll?)
(struct exts (timeout-at fd-adders))
@ -57,13 +55,6 @@
(set! background-sleep sleep)
(set! background-sleep-fd fd))
(define-place-local lock (make-lock))
(define-place-local waiting-threads '())
(define-place-local awoken-threads '())
(define (sandman-place-init!)
(set! lock (make-lock)))
(void
(current-sandman
(let ([timeout-sandman (current-sandman)])
@ -165,37 +156,4 @@
;; extract-timeout
(lambda (exts)
(exts-timeout-at exts))
;; condition-wait
(lambda (t)
(lock-acquire lock)
(set! waiting-threads (cons t waiting-threads))
(lock-release lock)
;; awoken callback. for when thread is awoken
(lambda ()
(lock-acquire lock)
(if (memq t waiting-threads)
(begin
(set! waiting-threads (remove t waiting-threads eq?))
(set! awoken-threads (cons t awoken-threads))
(rktio_signal_received_at (rktio_get_signal_handle rktio))) ;; wakeup main thread if sleeping
(internal-error "thread is not a member of waiting-threads\n"))
(lock-release lock)))
;; condition-poll
(lambda (mode wakeup)
(lock-acquire lock)
(define at awoken-threads)
(set! awoken-threads '())
(lock-release lock)
(for-each (lambda (t)
(wakeup t)) at))
;; any-waiters?
(lambda ()
(or (not (null? waiting-threads)) (not (null? awoken-threads))))
;; lock
lock))))
(exts-timeout-at exts))))))

View File

@ -744,6 +744,8 @@
prim-knowns knowns imports mutated simples)])))]
[`,_
(let ([u-v (unwrap v)])
(when (eq? u-v 'ensure-place-wakeup-handle)
(log-error "here"))
(cond
[(not (symbol? u-v))
v]

View File

@ -31,7 +31,14 @@ GLOBALS = --no-global \
++global-ok place-wakeup \
++global-ok compute-memory-sizes \
++global-ok check-place-activity \
++global-ok make-place-ports+fds
++global-ok make-place-ports+fds \
++global-ok future-block-for-atomic \
++global-ok pthread-count \
++global-ok wakeup-this-place \
++global-ok ensure-place-wakeup-handle \
++global-ok futures-sync-for-custodian-shutdown \
++global-ok logging-future-events? \
++global-ok log-future-event
GENERATE_ARGS = -t main.rkt --submod main \
--check-depends $(BUILDDIR)compiled/thread-dep.rktd \

View File

@ -9,9 +9,15 @@
[poll-guard-evt raw:poll-guard-evt]
[choice-evt raw:choice-evt])
(only-in "sync.rkt"
sync/enable-break))
sync/enable-break)
(only-in "parameter.rkt"
[current-future raw:current-future])
(only-in "future.rkt"
future-block
currently-running-future))
(provide wrap-evt
(provide current-future
wrap-evt
handle-evt
handle-evt?
guard-evt
@ -23,6 +29,10 @@
call-with-semaphore
call-with-semaphore/enable-break)
(define (current-future)
(or (raw:current-future)
(currently-running-future)))
(define/who (choice-evt . args)
(for ([arg (in-list args)])
(check who evt? arg))

View File

@ -3,6 +3,7 @@
"host.rkt"
"place-local.rkt"
"internal-error.rkt"
"parameter.rkt"
"debug.rkt")
(provide atomically
@ -17,16 +18,18 @@
in-atomic-mode?
future-barrier
add-end-atomic-callback!
start-implicit-atomic-mode
end-implicit-atomic-mode
assert-atomic-mode)
assert-atomic-mode
;; This definition is specially recognized for Racket on
;; Chez Scheme and converted to use a virtual register:
(define current-atomic (make-pthread-parameter 0))
set-future-block!)
;; "atomically" is atomic within a place; when a future-running
;; pthread tries to enter atomic mode, it is suspended
(define-syntax-rule (atomically expr ...)
(begin
(start-atomic)
@ -41,9 +44,12 @@
(let () expr ...)
(end-atomic/no-interrupts))))
;; inlined in Chez Scheme embedding:
(define (start-atomic)
(future-barrier)
(current-atomic (fx+ (current-atomic) 1)))
;; inlined in Chez Scheme embedding:
(define (end-atomic)
(define n (fx- (current-atomic) 1))
(cond
@ -82,6 +88,11 @@
(define (in-atomic-mode?)
(positive? (current-atomic)))
;; inlined in Chez Scheme embedding:
(define (future-barrier)
(when (current-future)
(future-block-for-atomic)))
;; ----------------------------------------
;; A "list" of callbacks to run when exiting atomic mode,
@ -99,6 +110,13 @@
;; ----------------------------------------
(define future-block-for-atomic (lambda () (void)))
(define (set-future-block! block)
(set! future-block-for-atomic block))
;; ----------------------------------------
(debug-select
#:on
[(define current-implicit-atomic (make-pthread-parameter #t))

View File

@ -13,7 +13,7 @@
(provide register-place-symbol!
set-io-place-init!)
(define (make-engine thunk prompt-tag init-break-enabled-cell empty-config?)
(define (make-engine thunk prompt-tag abort-handler init-break-enabled-cell empty-config?)
(define ready-s (make-semaphore))
(define s (make-semaphore))
(define prefix void)
@ -280,7 +280,8 @@
(error "current-engine state: not ready"))
'make-mutex (lambda () (make-semaphore 1))
'mutex-acquire (lambda (s) (semaphore-wait s))
'mutex-release (lambda (s) (semaphore-post s))))
'mutex-release (lambda (s) (semaphore-post s))
'continuation-current-primitive (lambda (k) #f)))
;; add dummy definitions that implement pthreads and conditions etc.
;; dummy definitions that error

View File

@ -62,7 +62,7 @@
(let receive () ; loop if a retry is needed
((atomically
(define pw+v (queue-remove! (channel-put-queue ch)))
(define gw (current-thread))
(define gw (current-thread/in-atomic))
(cond
[(not pw+v)
(define gq (channel-get-queue ch))
@ -95,7 +95,7 @@
(define b (box #f))
(define gq (channel-get-queue ch))
(define gw (channel-select-waiter (poll-ctx-select-proc poll-ctx)
(current-thread)))
(current-thread/in-atomic)))
(define n (queue-add! gq (cons gw b)))
(values #f
(wrap-evt
@ -126,7 +126,7 @@
[else
((atomically
(define gw+b (queue-remove! (channel-get-queue ch)))
(define pw (current-thread))
(define pw (current-thread/in-atomic))
(cond
[(not gw+b)
(define pq (channel-put-queue ch))
@ -158,7 +158,7 @@
[else
(define pq (channel-put-queue ch))
(define pw (channel-select-waiter (poll-ctx-select-proc poll-ctx)
(current-thread)))
(current-thread/in-atomic)))
(define n (queue-add! pq (cons pw v)))
(values #f
(wrap-evt
@ -201,10 +201,11 @@
;; ----------------------------------------
;; in atomic mode
(define (not-matching-select-waiter w+b/v)
(define w (car w+b/v))
(or (not (channel-select-waiter? w))
(not (eq? (current-thread)
(not (eq? (current-thread/in-atomic)
(channel-select-waiter-thread w)))))
;; ----------------------------------------

View File

@ -0,0 +1,5 @@
#lang racket/base
(provide (all-defined-out))
(define TICKS 100000)

View File

@ -15,8 +15,9 @@
[place #:mutable] ; place containing the custodian
[memory-use #:mutable] ; set after a major GC
[gc-roots #:mutable] ; weak references to charge to custodian; access without interrupts
[memory-limits #:mutable] ; list of (cons limit cust)
[immediate-limit #:mutable]) ; limit on immediate allocation
[memory-limits #:mutable] ; list of (cons limit cust)
[immediate-limit #:mutable] ; limit on immediate allocation
[sync-futures? #:mutable]) ; whether a sync witht future threads is needed on shutdown
#:authentic)
(define (create-custodian parent)
@ -30,7 +31,8 @@
0 ; memory use
#f ; GC roots
null ; memory limits
#f)) ; immediate limit
#f ; immediate limit
#f)) ; sync-futures?
(define initial-place-root-custodian (create-custodian #f))

View File

@ -43,6 +43,9 @@
create-custodian
poll-custodian-will-executor))
(module+ for-future
(provide set-custodian-futures-sync!))
;; For `(struct custodian ...)`, see "custodian-object.rkt"
(struct custodian-box ([v #:mutable] sema)
@ -259,6 +262,8 @@
(define (do-custodian-shutdown-all c)
(unless (custodian-shut-down? c)
(set-custodian-shut-down?! c #t)
(when (custodian-sync-futures? c)
(futures-sync-for-custodian-shutdown))
(for ([(child callback) (in-hash (custodian-children c))])
(if (procedure-arity-includes? callback 2)
(callback child c)
@ -364,6 +369,13 @@
;; ----------------------------------------
(define futures-sync-for-custodian-shutdown (lambda () (void)))
(define (set-custodian-futures-sync! proc)
(set! futures-sync-for-custodian-shutdown proc))
;; ----------------------------------------
;; Disable interrupts before taking this lock, since it
;; guards values that are manipulated by a GC callback
(define memory-limit-lock (host:make-mutex))

View File

@ -562,6 +562,35 @@
(check (vector 'again) got5)
(check 0 (place-wait pl5))
(check #f (current-future))
(define f (future (lambda () 10)))
(check 10 (touch f))
(check 11 (touch (would-be-future (lambda () 11))))
(check 12 (touch (would-be-future (lambda () (sleep) 12)))) ; blocks on `(sleep)`
(define fx (future (lambda () (current-future))))
(check fx (touch fx))
(define f0s (for/list ([i (in-range 50)])
(future (lambda ()
(let loop ([i i])
(if (zero? i)
i
(add1 (loop (sub1 i)))))))))
(check (for/list ([i (in-range 50)]) i) (map touch f0s))
(define fs (make-fsemaphore 0))
(check (void) (fsemaphore-post fs))
(check (void) (fsemaphore-wait fs))
(define f1 (future (lambda () (fsemaphore-wait fs) 'f1)))
(define f2 (future (lambda () (touch f1) 'f2)))
(sync (system-idle-evt))
(check (void) (fsemaphore-post fs))
(check 'f2 (touch f2))
(check 'f1 (touch f1))
(set! done? #t)))
(unless done?

View File

@ -1,6 +1,10 @@
#lang racket/base
(require "check.rkt"
"semaphore.rkt")
"parameter.rkt"
"future-lock.rkt"
(submod "future.rkt" for-fsemaphore)
"evt.rkt"
"sync.rkt")
(provide fsemaphore?
make-fsemaphore
@ -9,24 +13,93 @@
fsemaphore-try-wait?
fsemaphore-count)
(struct fsemaphore (sema))
(struct fsemaphore ([c #:mutable] ; counter
lock
[dependents #:mutable] ; dependent futures
[dep-box #:mutable]) ; for waiting by non-futures
#:authentic)
(struct fsemaphore-box-evt (b)
#:property prop:evt (poller (lambda (fsb poll-ctx)
(define b (fsemaphore-box-evt-b fsb))
(cond
[(unbox b) (values '(#t) #f)]
[else (values #f fsb)]))))
(define/who (make-fsemaphore init)
(check who exact-nonnegative-integer? init)
(fsemaphore (make-semaphore init)))
(fsemaphore init
(make-lock)
#hasheq()
#f))
(define/who (fsemaphore-post fsema)
(check who fsemaphore? fsema)
(semaphore-post (fsemaphore-sema fsema)))
(define/who (fsemaphore-post fs)
(check who fsemaphore? fs)
(with-lock (fsemaphore-lock fs)
(define c (fsemaphore-c fs))
(cond
[(zero? c)
(define b (fsemaphore-dep-box fs))
(define deps (fsemaphore-dependents fs))
;; If a future is waiting on the semaphore, it wins over any
;; non-future threads that are blocked on the fsemaphore.
;; That's not a great choice, but it we don't have to worry
;; about keeping track of threads that are in still line versus
;; threads that have been interrupted.
(cond
[(not (hash-empty? deps))
(define f (hash-iterate-key deps (hash-iterate-first deps)))
(set-fsemaphore-dependents! fs (hash-remove deps f))
(future-notify-dependent f)]
[else
(when b
;; This is a kind of broadcast wakeup, and then the
;; awakened threads will compete for the fsemaphore:
(set-fsemaphore-dep-box! fs #f)
(set-box! b #t))
(set-fsemaphore-c! fs 1)])]
[else
(set-fsemaphore-c! fs (add1 c))])))
(define/who (fsemaphore-wait fsema)
(check who fsemaphore? fsema)
(semaphore-wait (fsemaphore-sema fsema)))
(define/who (fsemaphore-wait fs)
(check who fsemaphore? fs)
(lock-acquire (fsemaphore-lock fs))
(define c (fsemaphore-c fs))
(cond
[(zero? c)
(define me-f (current-future))
(cond
[me-f
(lock-acquire (future*-lock me-f))
(set-fsemaphore-dependents! fs (hash-set (fsemaphore-dependents fs) me-f #t))
(set-future*-state! me-f 'fsema)
(lock-release (fsemaphore-lock fs))
(future-suspend) ; expects lock on f and releases it
(void)]
[else
(define dep-box (or (fsemaphore-dep-box fs)
(let ([b (box #f)])
(set-fsemaphore-dep-box! fs b)
b)))
(lock-release (fsemaphore-lock fs))
(sync (fsemaphore-box-evt dep-box))
(fsemaphore-wait fs)])]
[else
(set-fsemaphore-c! fs (sub1 c))
(lock-release (fsemaphore-lock fs))]))
(define/who (fsemaphore-try-wait? fsema)
(check who fsemaphore? fsema)
(semaphore-try-wait? (fsemaphore-sema fsema)))
(define/who (fsemaphore-try-wait? fs)
(check who fsemaphore? fs)
(with-lock (fsemaphore-lock fs)
(define c (fsemaphore-c fs))
(cond
[(zero? c) #f]
[else
(set-fsemaphore-c! fs (sub1 c))
#t])))
(define/who (fsemaphore-count fsema)
(check who fsemaphore? fsema)
0)
(define/who (fsemaphore-count fs)
(check who fsemaphore? fs)
(with-lock (fsemaphore-lock fs)
(fsemaphore-c fs)))

View File

@ -0,0 +1,12 @@
#lang racket/base
(provide get-next-id)
(define ID (box 1))
(define get-next-id
(lambda ()
(let ([id (unbox ID)])
(if (box-cas! ID id (+ 1 id))
id
(get-next-id)))))

View File

@ -0,0 +1,62 @@
#lang racket/base
(require "internal-error.rkt"
"host.rkt"
"parameter.rkt"
"atomic.rkt")
;; This module implements a lightweight spinlock for futures and
;; fsemaphores.
;; The overall locking regime depends on this lock order (i.e.,
;; when multiple locks are held at once, they must be acquired
;; in this order):
;;
;; - fsemaphore [one at a time]
;; - schedule queue
;; - futures, lower ID before higher ID
;;
;; A future's lock must be held to change the future's fields, except
;; that the fields to implement the schedule queue should be modified
;; only with the schedule-queue lock held.
;;
;; A future with state #f is available to run, but it must be either
;; `would-be?` (never in the queue, only run by a Racket thread) or
;; currently in a queue for a future pthread to take ownership of the
;; future.
(provide with-lock
make-lock
lock-acquire
lock-release
start-future-uninterrupted
end-future-uninterrupted)
(define-syntax-rule (with-lock lock-expr expr ...)
(let ([lock lock-expr])
(lock-acquire lock)
(begin0
(let () expr ...)
(lock-release lock))))
(define (make-lock) (box 0))
(define (start-future-uninterrupted)
(if (current-future)
(current-atomic (add1 (current-atomic))) ; see `run-future-in-worker`
(start-atomic)))
(define (end-future-uninterrupted)
(if (current-future)
(current-atomic (sub1 (current-atomic))) ; see `run-future-in-worker`
(end-atomic)))
(define (lock-acquire lock)
(start-future-uninterrupted)
(let loop ()
(unless (box-cas! lock 0 1)
(loop))))
(define (lock-release lock)
(unless (box-cas! lock 1 0)
(internal-error "lock release failed!"))
(end-future-uninterrupted))

View File

@ -0,0 +1,94 @@
#lang racket/base
(require "host.rkt"
"parameter.rkt"
"future-object.rkt"
"place-local.rkt")
(provide log-future
logging-futures?
flush-future-log
install-future-logging-procs!)
(struct future-event (future-id proc-id action time prim-name user-data)
#:prefab)
(define-place-local events null)
;; called with no future locks held
(define (log-future action [future-id #f]
#:prim-name [prim-name #f]
#:data [data #f])
(cond
[(current-future)
=> (lambda (me-f)
(set! events (cons (future-event (or future-id (future*-id me-f))
(get-pthread-id)
action
(current-inexact-milliseconds)
prim-name
data)
events)))]
[(logging-futures?)
(flush-future-log)
(define id (or future-id
(let ([f (currently-running-future)])
(if f
(future*-id f)
-1))))
(log-future-event* (future-event id 0 action (current-inexact-milliseconds) prim-name data))]))
;; maybe in atomic mode and only in main pthread
(define (logging-futures?)
(logging-future-events?))
;; maybe in atomic mode and only in main pthread
(define (flush-future-log)
(define new-events events)
(unless (null? new-events)
(set! events null)
(when (logging-futures?)
(for ([e (in-list (reverse new-events))])
(log-future-event* e)))))
(define (log-future-event* e)
(define proc-id (future-event-proc-id e))
(define action (future-event-action e))
(define msg (string-append "id "
(number->string (future-event-future-id e))
", process "
(number->string proc-id)
": "
(if (and (eqv? proc-id 0)
(eq? action 'block))
(string-append "HANDLING: "
(symbol->string
(or (future-event-prim-name e)
'|[unknown]|)))
(action->string action))
"; time: "
(number->string (future-event-time e))))
(log-future-event msg e))
(define (action->string a)
(case a
[(create) "created"]
[(complete) "completed"]
[(start-work) "started work"]
[(end-work) "ended work"]
[(block) "BLOCKING on process 0"]
[(touch) "touching future: touch"]
[(result) "result determined"]
[(suspend) "suspended"]
[(touch-pause) "paused for touch"]
[(touch-resume) "resumed for touch"]
[else "[unknown action]"]))
;; ----------------------------------------
(define logging-future-events? (lambda () #f))
(define log-future-event (lambda (msg e) (void)))
(define (install-future-logging-procs! logging? log)
(set! logging-future-events? logging?)
(set! log-future-event log))

View File

@ -0,0 +1,35 @@
#lang racket/base
(require "host.rkt")
(provide (struct-out future*)
currently-running-future-key
currently-running-future)
;; ----------------------------------------
;; See "future-lock.rkt" for information on locking rules
(struct future* (id
lock
custodian ; don't run in future pthread if custodian is shut down
[would-be? #:mutable] ; transitions from #t to 'blocked after blocked
[thunk #:mutable] ; thunk or continuation
[prev #:mutable] ; queue previous
[next #:mutable] ; queue next
[results #:mutable]
[state #:mutable] ; #f (could run), 'running, 'blocked, 'done, 'aborted, 'fsema, or future waiting on
[dependents #:mutable]) ; futures that are blocked on this one
#:authentic
#:reflection-name 'future)
;; ----------------------------------------
(define currently-running-future-key (gensym 'future))
;; Only called in a Racket thread:
(define (currently-running-future)
(continuation-mark-set-first
#f
currently-running-future-key
#f
(unsafe-root-continuation-prompt-tag)))

View File

@ -1,100 +1,163 @@
#lang racket/base
(require "place-local.rkt"
(require "config.rkt"
"place-local.rkt"
"check.rkt"
"internal-error.rkt"
"host.rkt"
"atomic.rkt"
"parameter.rkt"
"../common/queue.rkt"
"atomic.rkt"
"custodian-object.rkt"
"thread.rkt"
"lock.rkt")
(submod "thread.rkt" for-future)
(submod "custodian.rkt" for-future)
"sync.rkt"
"evt.rkt"
"future-object.rkt"
"future-id.rkt"
"future-lock.rkt"
"future-logging.rkt")
(provide init-future-place!
futures-enabled?
current-future
future
future?
would-be-future
touch
future-block
future-wait
current-future-prompt
future:condition-broadcast
future:condition-signal
future:condition-wait
future:make-condition
signal-future
currently-running-future
reset-future-logs-for-tracing!
mark-future-trace-end!)
mark-future-trace-end!
set-processor-count!)
(define place-main-thread-id (make-pthread-parameter 0))
(module+ for-place
(provide set-place-future-procs!
kill-future-scheduler))
(module+ for-fsemaphore
(provide future*-lock
set-future*-state!
future-suspend
future-notify-dependent))
(define (init-future-place!)
(place-main-thread-id (get-pthread-id)))
(void))
;; not sure of order here...
(define (get-caller)
(cond
[(current-future)
(current-future)]
[(not (= (place-main-thread-id) (get-pthread-id)))
(get-pthread-id)]
[else
(current-thread)]))
(define (futures-enabled?)
(threaded?))
;; ----------------------------------------
;; ---------------------------- futures ----------------------------------
(struct future-evt (future)
#:property prop:evt (poller (lambda (fe poll-ctx)
(define f (future-evt-future fe))
(lock-acquire (future*-lock f))
(define s (future*-state f))
(lock-release (future*-lock f))
(cond
[(or (eq? s 'running)
(eq? s 'fsema))
(values #f fe)]
[else (values '(#t) #f)]))))
(define (create-future thunk cust would-be?)
(define id (get-next-id))
(log-future 'create #:data id)
(future* id
(make-lock) ; lock
cust
would-be?
thunk
#f ; prev
#f ; next
#f ; results
#f ; state
#hasheq())) ; dependents
(define ID (box 1))
(define (future? v)
(future*? v))
(define get-next-id
(lambda ()
(let ([id (unbox ID)])
(if (box-cas! ID id (+ 1 id))
id
(get-next-id)))))
(define futures-enabled? threaded?)
(struct future* (id cond lock prompt
would-be? [thunk #:mutable] [engine #:mutable]
[cont #:mutable] [result #:mutable] [done? #:mutable]
[blocked? #:mutable][resumed? #:mutable]
[cond-wait? #:mutable]))
(define (create-future would-be-future?)
(future* (get-next-id) ;; id
(future:make-condition) ;; cond
(make-lock) ;; lock
(make-continuation-prompt-tag 'future) ;; prompt
would-be-future? ;; would-be?
#f ;; thunk
#f ;; engine
#f ;; cont
#f ;; result
#f ;; done?
#f ;; blocked?
#f ;; resumed?
#f)) ;; cond-wait?
(define future? future*?)
(define current-future (make-pthread-parameter #f))
(define future-scheduler-prompt-tag (make-continuation-prompt-tag 'future-scheduler))
(define future-start-prompt-tag (make-continuation-prompt-tag 'future-star))
(define (current-future-prompt)
(if (current-future)
(future*-prompt (current-future))
(internal-error "Not running in a future.")))
future-scheduler-prompt-tag
(internal-error "not running in a future")))
(define (thunk-wrapper f thunk)
(lambda ()
(let ([result (thunk)])
(with-lock ((future*-lock f) (current-future))
(set-future*-result! f result)
(set-future*-done?! f #t)
(future:condition-broadcast (future*-cond f))))))
;; called with lock on f held;
;; in a non-main pthread, caller is responsible for logging 'end-work
(define (run-future f #:was-blocked? [was-blocked? #f])
(set-future*-state! f 'running)
(define thunk (future*-thunk f))
(set-future*-thunk! f #f)
(lock-release (future*-lock f))
(when was-blocked?
(when (logging-futures?)
(log-future 'block (future*-id f) #:prim-name (continuation-current-primitive
thunk
'(unsafe-start-atomic)))
(log-future 'result (future*-id f))))
(unless (eq? (future*-would-be? f) 'blocked)
(log-future 'start-work (future*-id f)))
(define (finish! results state)
(start-future-uninterrupted)
(lock-acquire (future*-lock f))
(set-future*-results! f results)
(set-future*-state! f state)
(define deps (future*-dependents f))
(set-future*-dependents! f #hasheq())
(lock-release (future*-lock f))
;; stay in uninterrupted mode here, because we need to make sure
;; that dependents get rescheduled
(future-notify-dependents deps)
(end-future-uninterrupted)
(log-future 'complete (future*-id f)))
(cond
[(current-future)
;; An attempt to escape will cause the future to block, so
;; we only need to handle success
(call-with-values (lambda ()
(call-with-continuation-prompt
thunk
future-start-prompt-tag
(lambda args (void))))
(lambda results
(finish! results 'done)))]
[(eq? (future*-would-be? f) #t)
;; Similar to `(current-future)` case, but retries
;; excplitily if the future blocks
(call-with-values (lambda ()
(call-with-continuation-prompt
(lambda ()
(current-future f)
(begin0
(thunk)
(current-future #f)))
future-start-prompt-tag
(lambda args
;; Blocked as a would-be future; `(current-future)` has been
;; reset to #f, and we can retry immediately
(set-future*-would-be?! f 'blocked)
(touch f))))
(lambda results
(when (eq? (future*-state f) 'running)
(finish! results 'done)
(log-future 'end-work (future*-id f)))))]
[else
;; No need for the future prompt tag
(dynamic-wind
(lambda () (void))
(lambda ()
(with-continuation-mark
currently-running-future-key f
(call-with-values thunk
(lambda results
(finish! results 'done)))))
(lambda ()
(unless (eq? (future*-state f) 'done)
(finish! #f 'aborted))
(log-future 'end-work (future*-id f))))]))
(define/who (future thunk)
(check who (procedure-arity-includes/c 0) thunk)
@ -102,311 +165,419 @@
[(not (futures-enabled?))
(would-be-future thunk)]
[else
(let ([f (create-future #f)])
(set-future*-engine! f (make-engine (thunk-wrapper f thunk) (future*-prompt f) #f #t))
(schedule-future f)
f)]))
(define me-f (current-future))
(define cust (future-custodian me-f))
(define f (create-future thunk cust #f))
(when cust
(unless me-f
(maybe-start-scheduler)
(set-custodian-sync-futures?! cust #t))
(schedule-future! f))
f]))
(define/who (would-be-future thunk)
(check who (procedure-arity-includes/c 0) thunk)
(let ([f (create-future #t)])
(set-future*-thunk! f (thunk-wrapper f thunk))
f))
(ensure-place-wakeup-handle)
(create-future thunk (future-custodian (current-future)) #t))
(define (future-custodian me-f)
(if me-f
(future*-custodian me-f)
(thread-representative-custodian (current-thread/in-atomic))))
;; When two futures interact, we may need to adjust both;
;; to keep locks ordered, take lock of future with the
;; lower ID, first; beware that the two futures make be
;; the same (in which case we're headed for a circular
;; dependency)
(define (lock-acquire-both f)
(define me-f (current-future))
(cond
[(or (not me-f)
(eq? me-f f))
(lock-acquire (future*-lock f))]
[((future*-id me-f) . < . (future*-id f))
(lock-acquire (future*-lock me-f))
(lock-acquire (future*-lock f))]
[else
(lock-acquire (future*-lock f))
(lock-acquire (future*-lock me-f))]))
(define (lock-release-both f)
(lock-release-current)
(lock-release (future*-lock f)))
(define (lock-release-current)
(define me-f (current-future))
(when me-f
(lock-release (future*-lock me-f))))
(define/who (touch f)
(check who future*? f)
(lock-acquire-both f)
(define s (future*-state f))
(cond
[(future*-done? f)
(future*-result f)]
[(future*-would-be? f)
((future*-thunk f))
(future*-result f)]
[(lock-acquire (future*-lock f) (get-caller) #f) ;; got lock
(when (or (and (not (future*-blocked? f)) (not (future*-done? f)))
(and (future*-blocked? f) (not (future*-cont f))))
(future:condition-wait (future*-cond f) (future*-lock f)))
(future-awoken f)]
[(eq? s 'done)
(lock-release-both f)
(apply values (future*-results f))]
[(eq? s 'aborted)
(lock-release-both f)
(raise (exn:fail "touch: future previously aborted"
(current-continuation-marks)))]
[(eq? s 'blocked)
(cond
[(current-future)
;; Can't run a blocked future in a future pthread
(dependent-on-future f)]
[else
;; Lock on f is held (and no current future to lock)
(run-future f #:was-blocked? #t)
(apply values (future*-results f))])]
[(eq? s #f)
(cond
[(current-future)
;; Need to wait on `f`, so deschedule current one;
;; we may pick `f` next the queue (or maybe later)
(dependent-on-future f)]
[(future*-would-be? f) ; => not scheduled
(lock-release-current)
;; Lock on f is held
(run-future f)
(apply values (future*-results f))]
[else
;; Give up locks in hope of geting `f` off the
;; schedule queue
(lock-release (future*-lock f))
(cond
[(try-deschedule-future? f)
;; lock on `f` is held...
(run-future f)
(apply values (future*-results f))]
[else
;; Contention, so try again
(touch f)])])]
[(eq? s 'running)
(cond
[(current-future)
;; Stop working on this one until `f` is done
(dependent-on-future f)]
[else
;; Have to wait until it's not running anywhere
(set-future*-dependents! f (hash-set (future*-dependents f) 'place #t))
(lock-release (future*-lock f))
(log-future 'touch-pause (future*-id f))
(sync (future-evt f))
(log-future 'touch-resume (future*-id f))
(touch f)])]
[(future? s)
(cond
[(current-future)
;; Waiting on `s` on, so give up on the current future for now
(dependent-on-future f)]
[else
;; Maybe we can start running `s` to get `f` moving...
(lock-release (future*-lock f))
(touch s)
(touch f)])]
[(box? s) ; => dependent on fsemaphore
(cond
[(current-future)
;; Lots to wait on, so give up on the current future for now
(dependent-on-future f)]
[else
;; Wait until fsemaphore post succeeds for the future, then try again.
(lock-release (future*-lock f))
(log-future 'touch-pause (future*-id f))
(sync (future-evt f))
(log-future 'touch-resume (future*-id f))
(touch f)])]
[else
(touch f)]))
(lock-release (future*-lock f))
(internal-error "unrecognized future state")]))
(define (future-awoken f)
(cond
[(future*-done? f) ;; someone else ran continuation
(lock-release (future*-lock f) (get-caller))
(future*-result f)]
[(future*-blocked? f) ;; we need to run continuation
(set-future*-blocked?! f #f)
(set-future*-resumed?! f #t)
(lock-release (future*-lock f) (get-caller))
((future*-cont f) '())
(future*-result f)]
[else
(internal-error "Awoken but future is neither blocked nor done.")]))
;; called in a futurre pthread;
;; called with lock held for both `f` and the current future
(define (dependent-on-future f)
;; in a future pthread, so set up a dependency and on `f` and
;; bail out, so the current future pthread can do other things;
;; note that `me-f` might be the same as `f`, in which case we'll
;; create a circular dependency
(define me-f (current-future))
(set-future*-dependents! f (hash-set (future*-dependents f) me-f #t))
(set-future*-state! me-f f)
(on-transition-to-unfinished)
(unless (eq? me-f f)
(lock-release (future*-lock f)))
;; almost the same as being blocked, but when `f` completes,
;; it will reschedule `me-f`
(future-suspend f)
;; on return from `future-suspend`, no locks are held
(touch f))
;; called from chez layer.
;; called in a future pthread;
;; can be called from Rumble layer
(define (future-block)
(define f (current-future))
(when (and f (not (future*-blocked? f)) (not (future*-resumed? f)))
(with-lock ((future*-lock f) f)
(set-future*-blocked?! f #t))
(engine-block)))
(define me-f (current-future))
(unless (future*-would-be? me-f)
(log-future 'block (future*-id me-f)))
(lock-acquire (future*-lock me-f))
(set-future*-state! me-f 'blocked)
(on-transition-to-unfinished)
(future-suspend))
;; called from chez layer.
;; this should never be called from outside a future.
(define (future-wait)
(define f (current-future))
(with-lock ((future*-lock f) f)
(future:condition-wait (future*-cond f) (future*-lock f))))
;; called with lock held on the current future
(define (future-suspend [touching-f #f])
(define me-f (current-future))
(call-with-composable-continuation
(lambda (k)
(set-future*-thunk! me-f k)
(lock-release (future*-lock me-f))
(when touching-f
(log-future 'touch (future*-id me-f) #:data (future*-id touching-f)))
(unless (future*-would-be? me-f)
(log-future 'suspend (future*-id me-f)))
(cond
[(future*-would-be? me-f)
(current-future #f)
(abort-current-continuation future-start-prompt-tag (void))]
[else
(abort-current-continuation future-scheduler-prompt-tag (void))]))
future-start-prompt-tag))
;; futures and conditions
;; ----------------------------------------
(define (wait-future f m)
(with-lock ((future*-lock f) f)
(set-future*-cond-wait?! f #t))
(lock-release m (get-caller))
(engine-block))
(define pthread-count 1)
(define (awaken-future f)
(with-lock ((future*-lock f) (get-caller))
(set-future*-cond-wait?! f #f)))
;; Called by io layer
(define (set-processor-count! n)
(set! pthread-count n))
;; --------------------------- conditions ------------------------------------
(define-place-local the-scheduler #f)
(struct future-condition* (queue lock))
(struct scheduler ([workers #:mutable]
[futures-head #:mutable]
[futures-tail #:mutable]
mutex ; guards futures chain; see "future-lock.rkt" for discipline
cond) ; signaled when chain goes from empty to non-empty
#:authentic)
(define (future:make-condition)
(future-condition* (make-queue) (make-lock)))
(struct worker (id
[die? #:mutable]
sync-state) ; box used to sync shutdowns: 'idle, 'running, or 'pending
#:authentic)
(define (future:condition-wait c m)
(define caller (get-caller))
(if (own-lock? m caller)
(begin
(with-lock ((future-condition*-lock c) caller)
(queue-add! (future-condition*-queue c) caller))
(if (future? caller)
(wait-future caller m)
(thread-condition-wait (lambda () (lock-release m caller))))
(lock-acquire m (get-caller))) ;; reaquire lock
(internal-error "Caller does not hold lock\n")))
(define (make-worker id)
(worker id
#f ; die?
(box 'idle)))
(define (signal-future f)
(future:condition-signal (future*-cond f)))
(define (future:condition-signal c)
(with-lock ((future-condition*-lock c) (get-caller))
(let ([waitees (future-condition*-queue c)])
(unless (queue-empty? waitees)
(let ([waitee (queue-remove! waitees)])
(if (future? waitee)
(awaken-future waitee)
(thread-condition-awaken waitee)))))))
(define (future:condition-broadcast c)
(with-lock ((future-condition*-lock c) (get-caller))
(define waitees '())
(queue-remove-all! (future-condition*-queue c)
(lambda (e)
(set! waitees (cons e waitees))))
(let loop ([q waitees])
(unless (null? q)
(let ([waitee (car q)])
(if (future? waitee)
(awaken-future waitee)
(thread-condition-awaken waitee))
(loop (cdr q)))))))
;; ------------------------------------- future scheduler ----------------------------------------
(define THREAD-COUNT 2)
(define TICKS 1000000000)
(define-place-local global-scheduler #f)
(define (scheduler-running?)
(not (not global-scheduler)))
(struct worker (id lock mutex cond
[queue #:mutable] [idle? #:mutable]
[pthread #:mutable #:auto] [die? #:mutable #:auto])
#:auto-value #f)
(struct scheduler ([workers #:mutable #:auto])
#:auto-value #f)
;; I think this atomically is sufficient to guarantee scheduler is only created once.
;; called in a Racket thread
(define (maybe-start-scheduler)
(atomically
(unless global-scheduler
(set! global-scheduler (scheduler))
(let ([workers (create-workers)])
(set-scheduler-workers! global-scheduler workers)
(start-workers workers)))))
(unless the-scheduler
(ensure-place-wakeup-handle)
(set! the-scheduler (scheduler '()
#f ; futures-head
#f ; futures-tail
(host:make-mutex)
(host:make-condition)))
(define workers
(for/list ([id (in-range 1 (add1 pthread-count))])
(define w (make-worker id))
(start-worker w)
w))
(set-scheduler-workers! the-scheduler workers))))
(define (kill-scheduler)
(when global-scheduler
(for-each (lambda (w)
(with-lock ((worker-lock w) (get-caller))
(set-worker-die?! w #t)))
(scheduler-workers global-scheduler))))
;; called in atomic mode
(define (kill-future-scheduler)
(when the-scheduler
(define s the-scheduler)
(host:mutex-acquire (scheduler-mutex s))
(for ([w (in-list (scheduler-workers s))])
(set-worker-die?! w #t))
(host:condition-signal (scheduler-cond s))
(host:mutex-release (scheduler-mutex s))
(futures-sync-for-shutdown)))
(define (create-workers)
(let loop ([id 1])
;; called in any pthread
;; called maybe holding an fsemaphore lock, but nothing else
(define (schedule-future! f #:front? [front? #f])
(define s the-scheduler)
(host:mutex-acquire (scheduler-mutex s))
(define old (if front?
(scheduler-futures-head s)
(scheduler-futures-tail s)))
(cond
[(not old)
(set-scheduler-futures-head! s f)
(set-scheduler-futures-tail! s f)
(host:condition-signal (scheduler-cond s))]
[front?
(set-future*-next! f old)
(set-future*-prev! old f)
(set-scheduler-futures-head! s f)]
[else
(set-future*-prev! f old)
(set-future*-next! old f)
(set-scheduler-futures-tail! s f)])
(host:mutex-release (scheduler-mutex s)))
;; called with queue lock held
(define (deschedule-future f)
(define s the-scheduler)
(cond
[(or (future*-prev f)
(future*-next f))
(if (future*-prev f)
(set-future*-next! (future*-prev f) (future*-next f))
(set-scheduler-futures-head! s (future*-next f)))
(if (future*-next f)
(set-future*-prev! (future*-next f) (future*-prev f))
(set-scheduler-futures-tail! s (future*-prev f)))
(set-future*-prev! f #f)
(set-future*-next! f #f)]
[(eq? f (scheduler-futures-head s))
(set-scheduler-futures-head! s #f)
(set-scheduler-futures-tail! s #f)]
[else
(internal-error "future is not in queue")]))
;; called with no locks held; if successful,
;; returns with lock held on f
(define (try-deschedule-future? f)
(define s the-scheduler)
(host:mutex-acquire (scheduler-mutex s))
(define ok?
(cond
[(< id (+ 1 THREAD-COUNT))
(cons (worker id (make-lock) (host:make-mutex) (host:make-condition) (make-queue) #t)
(loop (+ id 1)))]
[(and (not (future*-prev f))
(not (future*-next f))
(not (eq? f (scheduler-futures-head s))))
;; Was descheduled by someone else already, or maybe
;; hasn't yet made it back into the schedule after a
;; dependency triggered `future-notify-dependent`
#f]
[else
'()])))
(deschedule-future f)
(lock-acquire (future*-lock f))
#t]))
(host:mutex-release (scheduler-mutex s))
ok?)
;; When a new thread is forked it inherits the values of thread parameters from its creator
;; So, if current-atomic is set for the main thread and then new threads are forked, those new
;; threads current-atomic will be set and then never unset because they will not run code that
;; unsets it.
(define (start-workers workers)
(for-each (lambda (w)
(set-worker-pthread! w (fork-pthread (lambda ()
(current-atomic 0)
(current-thread #f)
(current-engine-state #f)
(current-future #f)
((worker-scheduler-func w))))))
workers))
(define (schedule-future f)
(maybe-start-scheduler)
(let ([w (pick-worker)])
(with-lock ((worker-lock w) (get-caller))
(host:mutex-acquire (worker-mutex w))
(queue-add! (worker-queue w) f)
(host:condition-signal (worker-cond w))
(host:mutex-release (worker-mutex w)))))
(define (pick-worker)
(define workers (scheduler-workers global-scheduler))
(let loop ([workers* (cdr workers)]
[best (car workers)])
;; called in any pthread
;; called maybe holding an fsemaphore lock, but nothing else
(define (future-notify-dependents deps)
(for ([f (in-hash-keys deps)])
(cond
[(or (null? workers*)
(queue-empty? (worker-queue best)))
best]
[(< (queue-length (worker-queue (car workers*)))
(queue-length (worker-queue best)))
(loop (cdr workers*)
(car workers*))]
[else
(loop (cdr workers*)
best)])))
[(eq? f 'place) (wakeup-this-place)]
[else (future-notify-dependent f)])))
(define (wait-for-work w)
(define m (worker-mutex w))
(let try ()
(cond
[(not (queue-empty? (worker-queue w))) ;; got work in meantime
(void)]
[(host:mutex-acquire m #f) ;; cannot acquire lock while worker is being given work.
(host:condition-wait (worker-cond w) m)
(host:mutex-release m)]
[else ;; try to get lock again.
(try)])))
;; called in any pthread
;; called maybe holding an fsemaphore lock, but nothing else
(define (future-notify-dependent f)
(with-lock (future*-lock f)
(set-future*-state! f #f))
(on-transition-to-unfinished)
(if (future*-would-be? f)
(wakeup-this-place)
(schedule-future! f #:front? #t)))
(define (worker-scheduler-func worker)
(lambda ()
(define (loop)
(lock-acquire (worker-lock worker) (get-pthread-id)) ;; block
(cond
[(worker-die? worker) ;; worker was killed
(lock-release (worker-lock worker) (get-pthread-id))]
[(queue-empty? (worker-queue worker)) ;; have lock. no work
(lock-release (worker-lock worker) (get-pthread-id))
(cond
[(steal-work worker)
(do-work)]
[else
(wait-for-work worker)])
(loop)]
[else
(do-work)
(loop)]))
(define (complete ticks args)
(void))
(define (expire future worker)
(lambda (new-eng timeout?)
(set-future*-engine! future new-eng)
(cond
[(positive? (current-atomic))
((future*-engine future) TICKS (prefix future) complete (expire future worker))]
[(future*-resumed? future) ;; run to completion
((future*-engine future) TICKS void complete (expire future worker))]
[(not (future*-cont future)) ;; don't want to reschedule future with a saved continuation
(with-lock ((worker-lock worker) (get-caller))
(host:mutex-acquire (worker-mutex worker))
(queue-add! (worker-queue worker) future)
(host:mutex-release (worker-mutex worker)))]
[else
(with-lock ((future*-lock future) (get-caller))
(future:condition-signal (future*-cond future)))])))
(define (prefix f)
(lambda ()
(when (future*-blocked? f)
(call-with-composable-continuation
(lambda (k)
(with-lock ((future*-lock f) (current-future))
(set-future*-cont! f k))
(engine-block))
(future*-prompt f)))))
;; ----------------------------------------
;; need to have lock here.
(define (do-work)
(let ([work (queue-remove! (worker-queue worker))])
(cond
[(future*-cond-wait? work)
(queue-add! (worker-queue worker) work)
(lock-release (worker-lock worker) (get-pthread-id))] ;; put back on queue
[else
(lock-release (worker-lock worker) (get-pthread-id))
(current-future work)
((future*-engine work) TICKS (prefix work) complete (expire work worker)) ;; call engine.
(current-future #f)])))
(loop)))
(define (start-worker w)
(define s the-scheduler)
(fork-pthread
(lambda ()
(current-future 'worker)
(host:mutex-acquire (scheduler-mutex s))
(let loop ()
(or (box-cas! (worker-sync-state w) 'idle 'running)
(box-cas! (worker-sync-state w) 'pending 'running))
(cond
[(worker-die? w) ; worker was killed
(host:mutex-release (scheduler-mutex s))
(box-cas! (worker-sync-state w) 'running 'idle)]
[(scheduler-futures-head s)
=> (lambda (f)
(deschedule-future f)
(host:mutex-release (scheduler-mutex s))
(lock-acquire (future*-lock f))
;; lock is held on f; run the future
(maybe-run-future-in-worker f w)
;; look for more work
(host:mutex-acquire (scheduler-mutex s))
(loop))]
[else
;; wait for work
(or (box-cas! (worker-sync-state w) 'pending 'idle)
(box-cas! (worker-sync-state w) 'running 'idle))
(host:condition-wait (scheduler-cond s) (scheduler-mutex s))
(loop)])))))
(define (order-workers w1 w2)
(cond
[(< (worker-id w1) (worker-id w2))
(values w1 w2)]
[else
(values w2 w1)]))
;; called with lock on f
(define (maybe-run-future-in-worker f w)
;; Don't start the future if the custodian is shut down,
;; because we may have transitioned from 'pending to
;; 'running without an intervening check
(cond
[(custodian-shut-down? (future*-custodian f))
(set-future*-state! f 'blocked)
(on-transition-to-unfinished)
(lock-release (future*-lock f))]
[else
(run-future-in-worker f w)]))
;; Acquire lock of peer with smallest id # first.
;; worker is attempting to steal work from peers
(define (steal-work worker)
(let loop ([q (scheduler-workers global-scheduler)])
(cond
[(null? q) #f] ;; failed to steal work.
[(not (eq? (worker-id worker) (worker-id (car q)))) ;; not ourselves
(let*-values ([(peer) (car q)]
[(w1 w2) (order-workers worker peer)]) ;; order them.
(lock-acquire (worker-lock w1) (get-pthread-id))
(lock-acquire (worker-lock w2) (get-pthread-id))
(cond
[(> (queue-length (worker-queue peer)) 2) ;; going to steal. Should likely made this # higher.
(do ([i (floor (/ (queue-length (worker-queue peer)) 2)) (- i 1)])
[(zero? i) (void)]
(let ([work (queue-remove-end! (worker-queue peer))])
(queue-add! (worker-queue worker) work)))
(lock-release (worker-lock peer) (get-pthread-id)) ;; don't want to release our own lock.
#t] ;; stole work
[else ;; try a different peer
(lock-release (worker-lock worker) (get-pthread-id))
(lock-release (worker-lock peer) (get-pthread-id))
(loop (cdr q))]))]
[else (loop (cdr q))])))
(define (run-future-in-worker f w)
(current-future f)
;; If we didn't need to check custodians, could be just
;; (call-with-continuation-prompt
;; (lambda () (run-future f))
;; future-scheduler-prompt-tag
;; void)
;; But use an engine so we can periodically check that the future is
;; still supposed to run.
;; We take advantage of `current-atomic`, which would otherwise
;; be unused, to disable interruptions.
(define e (make-engine (lambda () (run-future f))
future-scheduler-prompt-tag
void
break-enabled-default-cell
#t))
(let loop ([e e])
(e TICKS
(lambda ()
;; Check that the future should still run
(when (and (custodian-shut-down? (future*-custodian f))
(zero? (current-atomic)))
(lock-acquire (future*-lock f))
(set-future*-state! f #f)
(on-transition-to-unfinished)
(future-suspend)))
(lambda (leftover-ticks result)
;; Done --- completed or suspend (e.g., blocked)
(void))
(lambda (e timeout?)
(loop e))))
(log-future 'end-work (future*-id f))
(current-future 'worker))
;; in atomic mode
(define (futures-sync-for-shutdown)
;; Make sure any futures that are running in a future pthread
;; have had a chance to notice a custodian shutdown or a
;; future-scheduler shutdown.
;;
;; Move each 'running worker into the 'pending state:
(for ([w (in-list (scheduler-workers the-scheduler))])
(box-cas! (worker-sync-state w) 'running 'pending))
;; A worker that transitions from 'pending to 'running or 'idle
;; is guaranteed to not run a future chose custodian is
;; shutdown or run any future if the worker is terminated
(for ([w (in-list (scheduler-workers the-scheduler))])
(define bx (worker-sync-state w))
(let loop ()
(when (box-cas! bx 'pending 'pending)
(host:sleep 0.001) ; not much alternative to spinning
(loop)))))
;; ----------------------------------------
@ -415,3 +586,25 @@
(define (mark-future-trace-end!)
(void))
;; ----------------------------------------
;; When a future changes from a state where the main thread may be
;; waiting for it, then make sure there's a wakeup signal
(define (on-transition-to-unfinished)
(define me-f (current-future))
(when (and me-f
(not (future*-would-be? me-f)))
(wakeup-this-place)))
(define wakeup-this-place (lambda () (void)))
(define ensure-place-wakeup-handle (lambda () (void)))
(define (set-place-future-procs! wakeup ensure)
(set! wakeup-this-place wakeup)
(set! ensure-place-wakeup-handle ensure))
;; tell "atomic.rkt" layer how to block:
(void (set-future-block! future-block))
(void (set-custodian-futures-sync! futures-sync-for-shutdown))

View File

@ -93,4 +93,6 @@
[make-mutex host:make-mutex]
[mutex-acquire host:mutex-acquire]
[mutex-release host:mutex-release]
threaded?)
threaded?
continuation-current-primitive)

View File

@ -1,52 +0,0 @@
#lang racket/base
(require "internal-error.rkt")
(provide with-lock
make-lock
lock-acquire
lock-release
own-lock?)
(define-syntax-rule (with-lock (lock caller) expr ...)
(begin
(lock-acquire lock caller)
(begin0
(let () expr ...)
(lock-release lock caller))))
(struct future-lock* (box owner))
(define (lock-owner lock)
(unbox (future-lock*-owner lock)))
(define (make-lock)
(future-lock* (box 0) (box #f)))
(define (lock-acquire lock caller [block? #t])
(define box (future-lock*-box lock))
(let loop ()
(cond
[(and (= 0 (unbox box)) (box-cas! box 0 1)) ;; got lock
(unless (box-cas! (future-lock*-owner lock) #f caller)
(internal-error "Lock already has owner."))
#t]
[block?
(loop)]
[else
#f])))
(define (lock-release lock caller)
(when (eq? caller (unbox (future-lock*-owner lock)))
(unless (box-cas! (future-lock*-owner lock) caller #f)
(internal-error "Failed to reset owner\n"))
(unless (box-cas! (future-lock*-box lock) 1 0)
(internal-error "Lock release failed\n"))))
(define (own-lock? lock caller)
(and (eq? caller (unbox (future-lock*-owner lock)))
(begin0
#t
(unless (= 1 (unbox (future-lock*-box lock)))
(internal-error "Caller 'owns' lock but lock is free.")))))

View File

@ -28,6 +28,7 @@
"place.rkt"
"place-message.rkt"
"future.rkt"
"future-logging.rkt"
"fsemaphore.rkt"
"os-thread.rkt")
@ -188,11 +189,12 @@
would-be-future
current-future
future-block
future-wait
current-future-prompt
reset-future-logs-for-tracing!
mark-future-trace-end!
set-processor-count!
install-future-logging-procs!
fsemaphore?
make-fsemaphore
fsemaphore-post

View File

@ -2,6 +2,7 @@
(require "check.rkt"
"atomic.rkt"
"host.rkt"
"parameter.rkt"
"thread.rkt"
(except-in (submod "thread.rkt" scheduling)
thread
@ -58,7 +59,7 @@
(engine-block)))))
#:custodian cust)))
(atomically
(set-thread-forward-break-to! (current-thread) t))
(set-thread-forward-break-to! (current-thread/in-atomic) t))
(semaphore-post ready-sema) ; let the nested thread run
;; Wait for the nested thread to complete -- and any thread nested
@ -76,7 +77,7 @@
;; killed or aborted to the original continuation
(atomically
(set-thread-forward-break-to! (current-thread) #f))
(set-thread-forward-break-to! (current-thread/in-atomic) #f))
;; Propagate any leftover break, but give a propagated
;; exception priority over a break exception:

View File

@ -1,6 +1,24 @@
#lang racket/base
(require "host.rkt")
(provide current-thread)
(provide current-atomic
current-thread/in-atomic
current-future) ; not the one exported to Racket; see "api.rkt"
(define current-thread (make-pthread-parameter #f))
;; These definitions are specially recognized for Racket on
;; Chez Scheme and converted to use a virtual register.
(define current-atomic (make-pthread-parameter 0))
;; The `current-thread` wrapper disallows access to this
;; pthread-local value in a future pthread:
(define current-thread/in-atomic (make-pthread-parameter #f))
;; Normally #f for a place's main pthread (running a Racket thread)
;; and non-#f for a future pthread, but can be a would-be future
;; in the main pthread
(define current-future (make-pthread-parameter #f))
;; Calling `(current-thread/in-atomic)` is faster than
;; `(current-thread)`, but it's only valid in a place's main pthread
;; --- not in a future thread.

View File

@ -17,6 +17,7 @@
"semaphore.rkt"
"evt.rkt"
"sandman.rkt"
(submod "future.rkt" for-place)
"place-message.rkt")
(provide dynamic-place
@ -121,6 +122,7 @@
(do-custodian-shutdown-all orig-cust)
(for ([proc (in-list (place-post-shutdown new-place))])
(proc))
(kill-future-scheduler)
(host:mutex-acquire lock)
(set-place-result! new-place result)
(host:mutex-release lock)
@ -440,3 +442,10 @@
;; in atomic mode
(lambda (pl)
(wakeup-waiting pl))))
(void (set-place-future-procs!
(lambda ()
(place-has-activity! current-place))
;; in atomic mode
(lambda ()
(ensure-wakeup-handle!))))

View File

@ -29,14 +29,6 @@
do-merge-timeout ; <ext-event-set> <wake-up-date-as-msecs> -> <ext-event-set>
do-extract-timeout ; <ext-event-set> -> <wake-up-date-as-msecs>
do-condition-wait ; set a thread to wait on a condition
do-condition-poll ; reschedule awoken threads
do-any-waiters? ; -> boolean
lock
#;...) ; sandman implementations can add more methods
#:prefab)

View File

@ -41,9 +41,6 @@
sandman-wakeup
sandman-any-sleepers?
sandman-sleepers-external-events
sandman-condition-wait
sandman-condition-poll
sandman-any-waiters?
current-sandman)
@ -87,31 +84,6 @@
(define (sandman-sleepers-external-events)
((sandman-do-sleepers-external-events the-sandman)))
;; in atomic mode
(define (sandman-condition-wait thread)
((sandman-do-condition-wait the-sandman) thread))
;; in atomic mode
(define (sandman-condition-poll mode thread-wakeup)
((sandman-do-condition-poll the-sandman) mode thread-wakeup))
;; in atomic mode
(define (sandman-any-waiters?)
((sandman-do-any-waiters? the-sandman)))
;; created simple lock here to avoid cycle in loading from using lock defined in future.rkt
(define (make-lock)
(box 0))
(define (lock-acquire box)
(let loop ()
(unless (and (= 0 (unbox box)) (box-cas! box 0 1))
(loop))))
(define (lock-release box)
(unless (box-cas! box 1 0)
(internal-error "Failed to release lock\n")))
(define-place-local waiting-threads '())
(define-place-local awoken-threads '())
@ -194,37 +166,7 @@
(min sleep-until timeout-at)
timeout-at))
;; extract-timeout
(lambda (sleep-until) sleep-until)
;; condition-wait
(lambda (t)
(lock-acquire (sandman-lock the-sandman))
(set! waiting-threads (cons t waiting-threads))
(lock-release (sandman-lock the-sandman))
;; awoken callback. for when thread is awoken
(lambda (root-thread)
(lock-acquire (sandman-lock the-sandman))
(if (memq t waiting-threads)
(begin
(set! waiting-threads (remove t waiting-threads eq?))
(set! awoken-threads (cons t awoken-threads)))
(internal-error "thread is not a member of waiting-threads\n"))
(lock-release (sandman-lock the-sandman))))
;; condition-poll
(lambda (mode wakeup)
(lock-acquire (sandman-lock the-sandman))
(define at awoken-threads)
(set! awoken-threads '())
(lock-release (sandman-lock the-sandman))
(for-each (lambda (t)
(wakeup t)) at))
;; any waiters?
(lambda ()
(or (not (null? waiting-threads)) (not (null? awoken-threads))))
(make-lock)))
(lambda (sleep-until) sleep-until)))
(void (current-sandman the-default-sandman))

View File

@ -1,5 +1,6 @@
#lang racket/base
(require "place-local.rkt"
(require "config.rkt"
"place-local.rkt"
"place-object.rkt"
"atomic.rkt"
"host.rkt"
@ -14,7 +15,8 @@
"future.rkt"
"custodian.rkt"
(submod "custodian.rkt" scheduling)
"pre-poll.rkt")
"pre-poll.rkt"
"future-logging.rkt")
;; Many scheduler details are implemented in "thread.rkt", but this
;; module handles the thread selection, thread swapping, and
@ -26,8 +28,6 @@
set-check-place-activity!
thread-swap-count)
(define TICKS 100000)
;; Initializes the thread system:
(define (call-in-main-thread thunk)
(make-initial-thread (lambda ()
@ -64,6 +64,7 @@
(call-pre-poll-external-callbacks)
(check-place-activity)
(check-queued-custodian-shutdown)
(flush-future-log)
(when (and (null? callbacks)
(all-threads-poll-done?)
(waiting-on-external-or-idle?))
@ -83,7 +84,8 @@
(define e (thread-engine t))
(set-thread-engine! t 'running)
(set-thread-sched-info! t #f)
(current-thread t)
(current-future (thread-future t))
(current-thread/in-atomic t)
(set-place-current-thread! current-place t)
(set! thread-swap-count (add1 thread-swap-count))
(run-callbacks-in-engine
@ -101,8 +103,10 @@
(lambda args
(start-implicit-atomic-mode)
(accum-cpu-time! t #t)
(current-thread #f)
(set-thread-future! t #f)
(current-thread/in-atomic #f)
(set-place-current-thread! current-place #f)
(current-future #f)
(unless (zero? (current-atomic))
(internal-error "terminated in atomic mode!"))
(thread-dead! t)
@ -115,7 +119,9 @@
(cond
[(zero? (current-atomic))
(accum-cpu-time! t timeout?)
(current-thread #f)
(set-thread-future! t (current-future))
(current-thread/in-atomic #f)
(current-future #f)
(set-place-current-thread! current-place #f)
(unless (eq? (thread-engine t) 'done)
(set-thread-engine! t e))
@ -137,7 +143,6 @@
#:custodian #f)
(select-thread! callbacks)]
[(and (not (sandman-any-sleepers?))
(not (sandman-any-waiters?))
(not (any-idle-waiters?)))
;; all threads done or blocked
(cond
@ -161,10 +166,6 @@
(lambda (t)
(thread-reschedule! t)
(set! did? #t)))
(sandman-condition-poll mode
(lambda (t)
(thread-reschedule! t)
(set! did? #t)))
(when did?
(thread-did-work!))
did?)

View File

@ -139,7 +139,7 @@
(set-semaphore-count! s (sub1 c))
void]
[else
(define w (current-thread))
(define w (current-thread/in-atomic))
(define n (queue-add! s w))
(set-semaphore-count! s -1) ; so CAS not tried for `semaphore-post`
(waiter-suspend!

View File

@ -7,6 +7,7 @@
"channel.rkt"
(submod "channel.rkt" for-sync)
"thread.rkt"
"parameter.rkt"
(only-in (submod "thread.rkt" scheduling)
thread-descheduled?)
"schedule-info.rkt"
@ -149,7 +150,7 @@
(thread-pop-suspend+resume-callbacks!)
(thread-pop-kill-callback!)
(when local-break-cell
(thread-remove-ignored-break-cell! (current-thread) local-break-cell))
(thread-remove-ignored-break-cell! (current-thread/in-atomic) local-break-cell))
;; On escape, post nacks, etc.:
(syncing-abandon! s)))))
@ -587,7 +588,7 @@
;; don't suspend after all
void]
[else
(define t (current-thread))
(define t (current-thread/in-atomic))
(set-syncing-wakeup!
s
(lambda ()

View File

@ -54,15 +54,14 @@
thread-ignore-break-cell!
thread-remove-ignored-break-cell!
thread-representative-custodian
thread-send
thread-receive
thread-try-receive
thread-rewind-receive
thread-receive-evt
thread-condition-awaken
thread-condition-wait)
thread-receive-evt)
;; Exports needed by "schedule.rkt":
(module* scheduling #f
@ -93,6 +92,9 @@
break>?
thread-did-work!))
(module* for-future #f
(provide break-enabled-default-cell))
;; ----------------------------------------
(struct thread node (name
@ -125,6 +127,8 @@
[mailbox-wakeup #:mutable] ; callback to trigger (in atomic mode) on `thread-send`
[cpu-time #:mutable] ; accumulates CPU time in milliseconds
[future #:mutable] ; current would-be future
[condition-wakeup #:mutable])
#:property prop:waiter
@ -137,6 +141,10 @@
(define-place-local root-thread #f)
(define (current-thread)
(future-barrier)
(current-thread/in-atomic))
;; ----------------------------------------
;; Thread creation
@ -152,6 +160,7 @@
(current-thread-group)))
(define e (make-engine proc
(default-continuation-prompt-tag)
#f
(if (or initial? at-root?)
break-enabled-default-cell
(current-break-enabled-cell))
@ -190,6 +199,8 @@
0 ; cpu-time
#f ; future
void ; condition-wakeup
))
((atomically
@ -269,13 +280,13 @@
;; Called in atomic mode:
(define (thread-push-kill-callback! cb)
(assert-atomic-mode)
(define t (current-thread))
(define t (current-thread/in-atomic))
(set-thread-kill-callbacks! t (cons cb (thread-kill-callbacks t))))
;; Called in atomic mode:
(define (thread-pop-kill-callback!)
(assert-atomic-mode)
(define t (current-thread))
(define t (current-thread/in-atomic))
(set-thread-kill-callbacks! t (cdr (thread-kill-callbacks t))))
(define/who (kill-thread t)
@ -292,7 +303,7 @@
[else
(atomically
(do-kill-thread t))
(when (eq? t (current-thread))
(when (eq? t (current-thread/in-atomic))
(when (eq? t root-thread)
(force-exit 0))
(engine-block))
@ -318,6 +329,12 @@
[else
(do-kill-thread t)])))
(define (thread-representative-custodian t)
(atomically
(define cs (thread-custodian-references t))
(and (pair? cs)
(custodian-reference->custodian (car cs)))))
;; Called in atomic mode:
(define (run-kill-callbacks! t)
(assert-atomic-mode)
@ -407,7 +424,7 @@
(thread-group-remove! (thread-parent t) t)
(when timeout-at
(add-to-sleeping-threads! t (sandman-merge-timeout #f timeout-at)))
(when (eq? t (current-thread))
(when (eq? t (current-thread/in-atomic))
(thread-did-work!))
;; Beware that this thunk is not used when a thread is descheduled
;; by a custodian callback
@ -592,14 +609,14 @@
;; Given callbacks are also called in atomic mode
(define (thread-push-suspend+resume-callbacks! s-cb r-cb)
(assert-atomic-mode)
(define t (current-thread))
(define t (current-thread/in-atomic))
(set-thread-suspend+resume-callbacks! t (cons (cons s-cb r-cb)
(thread-suspend+resume-callbacks t))))
;; Called in atomic mode:
(define (thread-pop-suspend+resume-callbacks!)
(assert-atomic-mode)
(define t (current-thread))
(define t (current-thread/in-atomic))
(set-thread-suspend+resume-callbacks! t (cdr (thread-suspend+resume-callbacks t))))
;; Called in atomic mode:
@ -675,7 +692,7 @@
(schedule-info-did-work? sched-info))
(thread-did-work!)]
[else (thread-did-no-work!)])
(set-thread-sched-info! (current-thread) sched-info))
(set-thread-sched-info! (current-thread/in-atomic) sched-info))
(engine-block))
;; Sleep for a while
@ -752,32 +769,36 @@
;; changed, or when a thread is just swapped in, then
;; `check-for-break` should be called.
(define (check-for-break)
(define t (current-thread))
(when (and
;; allow `check-for-break` before threads are running:
t
;; quick pre-test before going atomic:
(thread-pending-break t))
((atomically
(cond
[(and (thread-pending-break t)
(break-enabled)
(not (thread-ignore-break-cell? t (current-break-enabled-cell)))
(>= (add1 (current-breakable-atomic)) (current-atomic)))
(define exn:break* (case (thread-pending-break t)
[(hang-up) exn:break:hang-up/non-engine]
[(terminate) exn:break:terminate/non-engine]
[else exn:break/non-engine]))
(set-thread-pending-break! t #f)
(lambda ()
;; Out of atomic mode
(call-with-escape-continuation
(lambda (k)
(raise (exn:break*
"user break"
(current-continuation-marks)
k)))))]
[else void])))))
(unless (current-future)
(define t (current-thread))
(when (and
;; allow `check-for-break` before threads are running:
t
;; quick pre-test before going atomic:
(thread-pending-break t))
((atomically
(cond
[(and (thread-pending-break t)
;; check atomicity early to avoid nested break checks,
;; since `continuation-mark-set-first` inside `break-enabled`
;; can take a while
(>= (add1 (current-breakable-atomic)) (current-atomic))
(break-enabled)
(not (thread-ignore-break-cell? t (current-break-enabled-cell))))
(define exn:break* (case (thread-pending-break t)
[(hang-up) exn:break:hang-up/non-engine]
[(terminate) exn:break:terminate/non-engine]
[else exn:break/non-engine]))
(set-thread-pending-break! t #f)
(lambda ()
;; Out of atomic mode
(call-with-escape-continuation
(lambda (k)
(raise (exn:break*
"user break"
(current-continuation-marks)
k)))))]
[else void]))))))
;; The break-enabled transition hook is called by the host
;; system when a control transfer (such as a continuation jump)
@ -912,36 +933,9 @@
[else
(lambda () #f)]))))
(define/who (thread-condition-awaken thd)
(check who thread? thd)
((atomically
(cond
[(not (thread-dead? thd))
(define wakeup (thread-condition-wakeup thd))
(set-thread-condition-wakeup! thd void)
wakeup] ;; should be called outside of atomic mode?
[else
(lambda () #f)]))))
(define (thread-condition-wait lock-release)
((atomically
(define t (current-thread))
(set-thread-condition-wakeup! t (sandman-condition-wait t))
(lock-release)
(define do-yield
(thread-deschedule! t
#f
void
(lambda ()
;; try again?
(do-yield))
))
(lambda ()
(do-yield)))))
(define (thread-receive)
((atomically
(define t (current-thread))
(define t (current-thread/in-atomic))
(cond
[(is-mail? t)
(define v (dequeue-mail! t))
@ -966,7 +960,7 @@
(define (thread-try-receive)
(atomically
(define t (current-thread))
(define t (current-thread/in-atomic))
(if (is-mail? t)
(dequeue-mail! t)
#f)))
@ -974,7 +968,7 @@
(define/who (thread-rewind-receive lst)
(check who list? lst)
(atomically
(define t (current-thread))
(define t (current-thread/in-atomic))
(for-each (lambda (msg)
(push-mail! t msg))
lst)))
@ -986,7 +980,7 @@
;; in atomic mode:
(lambda (self poll-ctx)
(assert-atomic-mode)
(define t (current-thread))
(define t (current-thread/in-atomic))
(cond
[(is-mail? t) (values (list self) #f)]
[(poll-ctx-poll? poll-ctx) (values #f self)]