cs: change mutable hash tables to be safe only for Racket threads

Mutable `eq?`- and `eqv?`-based hash tables were formerly guarded by a
lock that made them safe for Scheme threads (i.e., OS-level threads).
In particular, that futures could concurrently access hash tables. But
the cost of that lock appears to be too high for such a rarely-used
capability.

Switching `eq?`- and `eqv?`-based hash tables so that they're safe
only for Racket threads means that the lock on a hash table can be
much cheaper. A lock is still needed to because the Rumble layer adds
extra fields for iteration. In the specific case of `hash-ref` on
`eq?`-based tables, however, the lock can be ignored, which makes one
of the most common `hash-ref`s much faster.

Overall, `hash-ref` on a mutable `eq?`-based hash table is now 4-5
times as fast, which makes it about twice as fast as traditional
Racket's `hash-ref`. A `hash-set!` operation is about twice as fast as
before, which puts it on par with traditional Rackets `hash-set!`. The
`hash-ref` improvement makes `send` about twice as fast as before in
Racket CS, making it a little faster than traditional Racket.

Since futures can no longer concurrently access `eq?`- and
`eqv?`-based hash tables, they have to synchronize with the main
thread for access. Racket CS had avoided the "sync" action on futures
that traditional Racket sometimes uses, but this change introduces
sync actions to Racket CS, since it's appropriate for accessing
mutable `eq?`- and `eqv?`-based hash tables.
This commit is contained in:
Matthew Flatt 2019-10-05 14:46:40 -06:00
parent f8bc4e8fa1
commit f574583907
19 changed files with 322 additions and 293 deletions

View File

@ -12,7 +12,7 @@
(define collection 'multi)
(define version "7.4.0.10")
(define version "7.4.0.11")
(define deps `("racket-lib"
["racket" #:version ,version]))

View File

@ -27,22 +27,31 @@ We should also test deep continuations.
(define-struct future-event (future-id process-id what time prim-name target-fid)
#:prefab)
(define (get-events-of-type type log)
(filter (λ (e)
(equal? (future-event-what e) type))
(define (get-events-of-type type log)
(filter (λ (e)
(equal? (future-event-what e) type))
log))
(define (get-blocks log)
(get-events-of-type 'block log))
(get-events-of-type 'block log))
(define (get-touch-blocks log)
(get-events-of-type 'touch log))
(define (get-sync-blocks log)
(get-events-of-type 'sync log))
(define (get-blocks-on prim log)
(filter (λ (e)
(equal? (future-event-prim-name e)
prim))
(get-blocks log)))
(define (get-touch-blocks log)
(get-events-of-type 'touch log))
(define (get-blocks-on prim log)
(filter (λ (e)
(equal? (future-event-prim-name e)
prim))
(get-blocks log)))
(define (get-sync-blocks-on prim log)
(filter (λ (e)
(equal? (future-event-prim-name e)
prim))
(get-sync-blocks log)))
(when (futures-enabled?)
(define recv (make-log-receiver (current-logger) 'debug))
@ -96,7 +105,25 @@ We should also test deep continuations.
(check-equal? 5 (length (get-blocks log)))
(check-equal? 1 (length (get-touch-blocks log)))
(check-equal? 4 (length (get-blocks-on 'printf log)))
(check-equal? 1 (length (get-blocks-on 'would-be-future log)))]))))
(check-equal? 1 (length (get-blocks-on 'would-be-future log)))])))
(let ([f (let ([ht (make-hasheq)])
(would-be-future (λ ()
(hash-set! ht 'ok 5)
(hash-ref ht 'ok))))])
(touch f)
(let ([log (raw-log-output)])
;; Racket CS syncs on hash-ref, traditional Racket blocks
(case (system-type 'vm)
[(chez-scheme)
(check-equal? 2 (length (get-sync-blocks log)))
(check-equal? 1 (length (get-sync-blocks-on 'hash-set! log)))
(check-equal? 1 (length (get-sync-blocks-on 'hash-ref log)))]
[else
(check-equal? 2 (length (get-blocks log)))
(check-equal? 1 (length (get-blocks-on 'hash-set! log)))
(check-equal? 1 (length (get-blocks-on 'hash-ref log)))])
(check-equal? 0 (length (get-touch-blocks log))))))
;; ----------------------------------------

View File

@ -298,6 +298,7 @@ RUMBLE_SRCS = rumble/define.ss \
rumble/place.ss \
rumble/errno-data.ss \
rumble/foreign.ss \
rumble/async-callback.ss \
rumble/future.ss \
rumble/version.ss \
rumble/inline.ss \

View File

@ -4,7 +4,7 @@
;; Allocation of place registers to built-in subsystems, where the
;; first index is reserved for Rumble:
(meta chez:define thread-register-start 1)
(meta chez:define thread-register-start 3)
(meta chez:define thread-register-count 31)
(meta chez:define io-register-start (+ thread-register-start thread-register-count))

View File

@ -707,6 +707,7 @@
set-future-callbacks!
install-primitives-table!
continuation-current-primitive
call-as-asynchronous-callback
;; compile-time use in "thread.sls"
current-atomic-virtual-register
@ -792,6 +793,7 @@
(include "rumble/place.ss")
(include "rumble/errno-data.ss")
(include "rumble/foreign.ss")
(include "rumble/async-callback.ss")
(include "rumble/future.ss")
(include "rumble/inline.ss")

View File

@ -0,0 +1,59 @@
(define-record async-callback-queue (lock condition in wakeup))
(define (current-async-callback-queue)
(place-async-callback-queue))
(define (async-callback-place-init!)
(place-async-callback-queue (make-async-callback-queue (make-mutex)
(make-condition)
'()
(make-async-callback-poll-wakeup))))
(define (call-as-asynchronous-callback thunk)
(with-interrupts-disabled*
(async-callback-queue-call (current-async-callback-queue) thunk #t)))
(define (async-callback-queue-call async-callback-queue thunk need-interrupts?)
(let* ([result-done? (box #f)]
[result #f]
[q async-callback-queue]
[m (async-callback-queue-lock q)])
(mutex-acquire m)
(set-async-callback-queue-in! q (cons (lambda ()
(set! result (thunk))
(mutex-acquire m)
(set-box! result-done? #t)
(condition-broadcast (async-callback-queue-condition q))
(mutex-release m))
(async-callback-queue-in q)))
((async-callback-queue-wakeup q))
(let loop ()
(unless (unbox result-done?)
(when need-interrupts?
;; Enable interrupts so that the thread is deactivated
;; when we wait on the condition
(enable-interrupts))
(condition-wait (async-callback-queue-condition q) m)
(when need-interrupts? (disable-interrupts))
(loop)))
(mutex-release m)
result))
(define make-async-callback-poll-wakeup (lambda () void))
(define (set-make-async-callback-poll-wakeup! make-wakeup)
(set! make-async-callback-poll-wakeup make-wakeup))
;; Returns callbacks to run in atomic mode
(define (poll-async-callbacks)
(let ([q (current-async-callback-queue)])
(mutex-acquire (async-callback-queue-lock q))
(let ([in (async-callback-queue-in q)])
(cond
[(null? in)
(mutex-release (async-callback-queue-lock q))
'()]
[else
(set-async-callback-queue-in! q '())
(mutex-release (async-callback-queue-lock q))
(reverse in)]))))

View File

@ -4,13 +4,14 @@
(define (set-intern-regexp?! p) (set! intern-regexp? p))
(define (datum-intern-literal v)
(when (current-future) (block-future))
(cond
[(or (number? v)
(string? v)
(char? v)
(bytes? v)
(intern-regexp? v))
(with-interrupts-disabled
(with-interrupts-disabled*
(or (weak-hash-ref-key datums v #f)
(let ([v (cond
[(string? v) (string->immutable-string v)]

View File

@ -1545,7 +1545,7 @@
(#%$keep-live v) ...
result))
(define call-locks (make-hasheq))
(define call-locks (make-eq-hashtable))
(define (ffi-call/callable call? in-types out-type abi save-errno lock-name blocking? atomic? async-apply)
(let* ([conv (case abi
@ -1633,10 +1633,11 @@
[arg-makers (cddr gen-proc+ret-maker+arg-makers)]
[async-callback-queue (and (procedure? async-apply) (current-async-callback-queue))]
[lock (and lock-name
(or (hash-ref call-locks (string->symbol lock-name) #f)
(let ([lock (make-mutex)])
(hash-set! call-locks (string->symbol lock-name) lock)
lock)))])
(with-global-lock
(or (eq-hashtable-ref call-locks (string->symbol lock-name) #f)
(let ([lock (make-mutex)])
(eq-hashtable-set! call-locks (string->symbol lock-name) lock)
lock))))])
(cond
[call?
(cond
@ -1847,13 +1848,7 @@
(define-virtual-register place-thread-category PLACE-KNOWN-THREAD)
(define (register-as-place-main!)
(place-thread-category PLACE-MAIN-THREAD)
(foreign-place-init!))
(define (foreign-place-init!)
(current-async-callback-queue (make-async-callback-queue (make-mutex)
(make-condition)
'()
(make-async-callback-poll-wakeup))))
(async-callback-place-init!))
;; Can be called in any Scheme thread
(define (call-as-atomic-callback thunk atomic? async-apply async-callback-queue)
@ -1881,35 +1876,12 @@
[else
;; Not in a place's main thread; queue an async callback
;; and wait for the response
(let* ([result-done? (box #f)]
[result #f]
[q async-callback-queue]
[m (async-callback-queue-lock q)]
[need-interrupts?
;; If we created this thread by `fork-pthread`, we must
;; have gotten here by a foreign call, so interrupts are
;; currently disabled
(eqv? (place-thread-category) PLACE-KNOWN-THREAD)])
(mutex-acquire m)
(set-async-callback-queue-in! q (cons (lambda ()
(set! result (|#%app| async-apply thunk))
(mutex-acquire m)
(set-box! result-done? #t)
(condition-broadcast (async-callback-queue-condition q))
(mutex-release m))
(async-callback-queue-in q)))
((async-callback-queue-wakeup q))
(let loop ()
(unless (unbox result-done?)
(when need-interrupts?
;; Enable interrupts so that the thread is deactivated
;; when we wait on the condition
(enable-interrupts))
(condition-wait (async-callback-queue-condition q) m)
(when need-interrupts? (disable-interrupts))
(loop)))
(mutex-release m)
result)]))
(async-callback-queue-call async-callback-queue
(lambda () (|#%app| async-apply thunk))
;; If we created this thread by `fork-pthread`, we must
;; have gotten here by a foreign call, so interrupts are
;; currently disabled
(eqv? (place-thread-category) PLACE-KNOWN-THREAD))]))
(define scheduler-start-atomic void)
(define scheduler-end-atomic void)
@ -1917,28 +1889,6 @@
(set! scheduler-start-atomic start-atomic)
(set! scheduler-end-atomic end-atomic))
(define make-async-callback-poll-wakeup (lambda () void))
(define (set-make-async-callback-poll-wakeup! make-wakeup)
(set! make-async-callback-poll-wakeup make-wakeup))
(define-record async-callback-queue (lock condition in wakeup))
(define-virtual-register current-async-callback-queue #f)
;; Returns callbacks to run in atomic mode
(define (poll-async-callbacks)
(let ([q (current-async-callback-queue)])
(mutex-acquire (async-callback-queue-lock q))
(let ([in (async-callback-queue-in q)])
(cond
[(null? in)
(mutex-release (async-callback-queue-lock q))
'()]
[else
(set-async-callback-queue-in! q '())
(mutex-release (async-callback-queue-lock q))
(reverse in)]))))
;; ----------------------------------------
(define-record-type (callback create-callback ffi-callback?)

View File

@ -7,8 +7,24 @@
#'(virtual-register pos))]))
(define block-future (lambda () (void)))
(define sync-future (lambda (who thunk) (thunk)))
(define current-future-prompt (lambda () (void)))
(define (set-future-callbacks! block current-prompt)
(define (set-future-callbacks! block sync current-prompt)
(set! block-future block)
(set! sync-future sync)
(set! current-future-prompt current-prompt))
;; Call `thunk` in the main thread to synchronizel the thunk must be
;; constant-time, never rasse an exception, and return a single value
(define (future-sync who thunk)
(let ([disabled? (> (disable-interrupts) 1)])
(enable-interrupts)
(cond
[disabled?
;; Interrupts were already disabled, so we're holding the global
;; lock, in a garbage collection, or something like that --- as
;; synchronized as possible already
(thunk)]
[else
(sync-future who thunk)])))

View File

@ -1,3 +1,6 @@
;; Mutable hash table are safe for engine-based concurrency, but they
;; are not safe for concurrent access across Scheme threads.
;; Mutable and weak-equal hash tables need a lock
;; and an iteration vector
(define-record locked-iterable-hash (lock
@ -8,7 +11,11 @@
;; tables in a `mutable-hash` record
(define-record mutable-hash locked-iterable-hash
(ht)) ; Chez Scheme hashtable
(define-record eq-mutable-hash mutable-hash
())
(define (create-mutable-hash ht kind) (make-mutable-hash (make-lock kind) #f #f ht))
(define (create-eq-mutable-hash ht) (make-eq-mutable-hash (make-lock 'eq?) #f #f ht))
(define (mutable-hash-lock ht) (locked-iterable-hash-lock ht))
(define (mutable-hash-cells ht) (locked-iterable-hash-cells ht))
@ -25,17 +32,17 @@
(define make-hasheq
(case-lambda
[() (create-mutable-hash (make-eq-hashtable) 'eq?)]
[() (create-eq-mutable-hash (make-eq-hashtable))]
[(alist) (fill-hash! 'make-hasheq (make-hasheq) alist)]))
(define (eq-hashtable->hash ht)
(create-mutable-hash ht 'eq?))
(create-eq-mutable-hash ht))
(define (hash->eq-hashtable ht)
(mutable-hash-ht ht))
(define make-weak-hasheq
(case-lambda
[() (create-mutable-hash (make-weak-eq-hashtable) 'eq?)]
[() (create-eq-mutable-hash (make-weak-eq-hashtable))]
[(alist) (fill-hash! 'make-weak-hasheq (make-weak-hasheq) alist)]))
(define make-hasheqv
@ -89,10 +96,10 @@
(define (hash-set! ht k v)
(cond
[(mutable-hash? ht)
(lock-acquire (mutable-hash-lock ht))
(hashtable-set! (mutable-hash-ht ht) k v)
(set-locked-iterable-hash-retry?! ht #t)
(lock-release (mutable-hash-lock ht))]
(cond
[(and (current-future) (eq-mutable-hash? ht))
(future-sync 'hash-set! (lambda () (mutable-hash-set! ht k v)))]
[else (mutable-hash-set! ht k v)])]
[(weak-equal-hash? ht) (weak-hash-set! ht k v)]
[(and (impersonator? ht)
(let ([ht (impersonator-val ht)])
@ -101,22 +108,19 @@
(impersonate-hash-set! ht k v)]
[else (raise-argument-error 'hash-set! "(and/c hash? (not/c immutable?))" ht)]))
(define (mutable-hash-set! ht k v)
(lock-acquire (mutable-hash-lock ht))
(hashtable-set! (mutable-hash-ht ht) k v)
(set-locked-iterable-hash-retry?! ht #t)
(lock-release (mutable-hash-lock ht)))
(define (hash-remove! ht k)
(cond
[(mutable-hash? ht)
(lock-acquire (mutable-hash-lock ht))
(let ([cell (and (mutable-hash-cells ht)
(hashtable-ref-cell (mutable-hash-ht ht) k))])
(cond
[cell
(hashtable-delete! (mutable-hash-ht ht) k)
;; Clear cell, because it may be in `(locked-iterable-hash-cells ht)`
(set-car! cell #!bwp)
(set-cdr! cell #!bwp)
(set-locked-iterable-hash-retry?! ht #t)]
[else
(hashtable-delete! (mutable-hash-ht ht) k)]))
(lock-release (mutable-hash-lock ht))]
(cond
[(and (current-future) (eq-mutable-hash? ht))
(future-sync 'hash-remove! (lambda () (mutable-hash-remove! ht k)))]
[else (mutable-hash-remove! ht k)])]
[(weak-equal-hash? ht) (weak-hash-remove! ht k)]
[(and (impersonator? ht)
(let ([ht (impersonator-val ht)])
@ -125,13 +129,27 @@
(impersonate-hash-remove! ht k)]
[else (raise-argument-error 'hash-remove! "(and/c hash? (not/c immutable?))" ht)]))
(define (mutable-hash-remove! ht k)
(lock-acquire (mutable-hash-lock ht))
(let ([cell (and (mutable-hash-cells ht)
(hashtable-ref-cell (mutable-hash-ht ht) k))])
(cond
[cell
(hashtable-delete! (mutable-hash-ht ht) k)
;; Clear cell, because it may be in `(locked-iterable-hash-cells ht)`
(set-car! cell #!bwp)
(set-cdr! cell #!bwp)
(set-locked-iterable-hash-retry?! ht #t)]
[else
(hashtable-delete! (mutable-hash-ht ht) k)]))
(lock-release (mutable-hash-lock ht)))
(define (hash-clear! ht)
(cond
[(mutable-hash? ht)
(lock-acquire (mutable-hash-lock ht))
(set-locked-iterable-hash-cells! ht #f)
(hashtable-clear! (mutable-hash-ht ht))
(lock-release (mutable-hash-lock ht))]
(cond
[(current-future) (future-sync 'hash-clear! (lambda () (mutable-hash-clear! ht)))]
[else (mutable-hash-clear! ht)])]
[(weak-equal-hash? ht) (weak-hash-clear! ht)]
[(and (impersonator? ht)
(let ([ht (impersonator-val ht)])
@ -145,17 +163,19 @@
(loop (hash-iterate-next ht i)))))]
[else (raise-argument-error 'hash-clear! "(and/c hash? (not/c immutable?))" ht)]))
(define (mutable-hash-clear! ht)
(lock-acquire (mutable-hash-lock ht))
(set-locked-iterable-hash-cells! ht #f)
(hashtable-clear! (mutable-hash-ht ht))
(lock-release (mutable-hash-lock ht)))
(define (hash-copy ht)
(cond
[(mutable-hash? ht)
(lock-acquire (mutable-hash-lock ht))
(let ([new-ht (create-mutable-hash (hashtable-copy (mutable-hash-ht ht) #t)
(cond
[(hash-eq? ht) 'eq?]
[(hash-eqv? ht) 'eqv?]
[else 'equal?]))])
(lock-release (mutable-hash-lock ht))
new-ht)]
(cond
[(and (current-future) (eq-mutable-hash? ht))
(future-sync 'hash-copy (lambda () (mutable-hash-copy ht)))]
[else (mutable-hash-copy ht)])]
[(weak-equal-hash? ht) (weak-hash-copy ht)]
[(intmap? ht)
(let ([new-ht (cond
@ -173,6 +193,17 @@
(impersonate-hash-copy ht)]
[else (raise-argument-error 'hash-copy "hash?" ht)]))
(define (mutable-hash-copy ht)
(lock-acquire (mutable-hash-lock ht))
(let ([new-ht (if (eq-mutable-hash? ht)
(create-eq-mutable-hash (hashtable-copy (mutable-hash-ht ht) #t))
(create-mutable-hash (hashtable-copy (mutable-hash-ht ht) #t)
(cond
[(hash-eqv? ht) 'eqv?]
[else 'equal?])))])
(lock-release (mutable-hash-lock ht))
new-ht))
(define (hash-set ht k v)
(cond
[(intmap? ht) (intmap-set ht k v)]
@ -209,8 +240,7 @@
(define (hash-eq? ht)
(cond
[(mutable-hash? ht)
(eq? (hashtable-equivalence-function (mutable-hash-ht ht)) eq?)]
[(mutable-hash? ht) (eq-mutable-hash? ht)]
[(intmap? ht)
(intmap-eq? ht)]
[(weak-equal-hash? ht) #f]
@ -273,10 +303,18 @@
(define (hash-ref/none ht k)
(cond
[(mutable-hash? ht)
(lock-acquire (mutable-hash-lock ht))
(let ([v (hashtable-ref (mutable-hash-ht ht) k none)])
(lock-release (mutable-hash-lock ht))
v)]
(cond
[(eq-mutable-hash? ht)
;; As long as we'e not in a future thread, it's an atomic action
;; to access the mutable hash table using `eq-hashtable-ref`:
(if (current-future)
(future-sync 'hash-ref (lambda () (eq-hashtable-ref (mutable-hash-ht ht) k none)))
(eq-hashtable-ref (mutable-hash-ht ht) k none))]
[else
(lock-acquire (mutable-hash-lock ht))
(let ([v (hashtable-ref (mutable-hash-ht ht) k none)])
(lock-release (mutable-hash-lock ht))
v)])]
[(intmap? ht)
(intmap-ref ht k none)]
[(weak-equal-hash? ht)
@ -306,11 +344,11 @@
(define (hash-ref-key/none ht k)
(cond
[(mutable-hash? ht)
(lock-acquire (mutable-hash-lock ht))
(let* ([pair (hashtable-ref-cell (mutable-hash-ht ht) k)]
[v (if pair (car pair) none)])
(lock-release (mutable-hash-lock ht))
v)]
(cond
[(and (current-future) (eq-mutable-hash? ht))
(future-sync 'hash-ref-key (lambda () (mutable-hash-ref-key/none ht k)))]
[else
(mutable-hash-ref-key/none ht k)])]
[(intmap? ht)
(intmap-ref-key ht k none)]
[(weak-equal-hash? ht)
@ -321,6 +359,13 @@
[else
(raise-argument-error 'hash-ref-key "hash?" ht)]))
(define (mutable-hash-ref-key/none ht k)
(lock-acquire (mutable-hash-lock ht))
(let* ([pair (hashtable-ref-cell (mutable-hash-ht ht) k)]
[v (if pair (car pair) none)])
(lock-release (mutable-hash-lock ht))
v))
(define (fail-hash-ref who default)
(if (procedure? default)
(if (procedure-arity-includes? default 0)
@ -430,7 +475,12 @@
(define (hash-count ht)
(cond
[(mutable-hash? ht) (hashtable-size (mutable-hash-ht ht))]
[(mutable-hash? ht)
(cond
[(current-future)
(future-sync 'hash-count (lambda () (hashtable-size (mutable-hash-ht ht))))]
[else
(hashtable-size (mutable-hash-ht ht))])]
[(intmap? ht) (intmap-count ht)]
[(weak-equal-hash? ht) (weak-hash-count ht)]
[(and (impersonator? ht)

View File

@ -13,32 +13,6 @@
[(not (threaded?))
;; Using a Chez Scheme build without thread support,
;; but we need to cooperate with engine-based threads.
;; `eqv?`- and `eq?`-based tables appear to run with
;; interrupts disabled, so they're safe for engine-based
;; threads; just create a Racket-visible lock for
;; `equal?`-based hash tables
(define (make-lock for-kind)
(and (eq? for-kind 'equal?)
(make-scheduler-lock)))
(define lock-acquire
(case-lambda ;; so it matches the one below
[(lock)
(when lock
;; Thread layer sets this callback to wait
;; on a semaphore:
(scheduler-lock-acquire lock))]
[(lock _)
(when lock
;; Thread layer sets this callback to wait
;; on a semaphore:
(scheduler-lock-acquire lock))]))
(define (lock-release lock)
(when lock
(scheduler-lock-release lock)))
;; Use `with-global-lock*` when no lock is needed absent threads
(define-syntax-rule (with-global-lock* e ...)
(begin e ...))
@ -49,144 +23,38 @@
(with-interrupts-disabled
e ...))]
[else
;; Using a Chez Scheme build with thread support; make hash-table
;; access thread-safe at that level for `eq?`- and `eqv?`-based
;; tables.
;; An `equal?`-based table is made safe at the level of Racket
;; threads, but not at Chez threads. Blocking a Chez thread might
;; block the Racket scheduler itself, so we just don't support it.
;; Using a Chez Scheme build with thread support, so a stronger
;; global lock is needed.
;; Taking a lock disables interrupts, which ensures that the GC
;; callback or other atomic actions can use hash tables without
;; deadlocking.
;; Assume low contention on `eq?`- and `eqv?`-based tables across
;; Chez Scheme threads, in which case a compare-and-set spinlock is
;; usually good enough. But if not, transition to a real lock; use a
;; mutex, but transitioning requires using an inintermediate
;; semaphore.
(define (make-spinlock)
;; Box content: #f (unlocked), #t (locked), sema (transitioning), or mutex
(box #f))
(define (spinlock? v) (#%box? v))
(define (spinlock-acquire q)
(let loop ([n 0])
(disable-interrupts)
(cond
[(#%box-cas! q #f #t)
;; Took lock
(#%void)]
[(eq? #t (#%unbox q))
;; Spin..
(enable-interrupts)
(cond
[(fx= n 1000)
;; There's contention after all, so trasition to a semaphore,
;; where the current lock holder implicitly owns the semaphore.
;; That lock holder can replace the semaphore with a mutex,
;; which is cheaper to acquire and release.
(let ([lk (new-sema)])
(#%box-cas! q #t lk)
(loop 0))]
[else
(loop (fx+ n 1))])]
[else
(let ([l (#%unbox q)])
(cond
[(sema? l)
;; Transitioning to slower lock; wait on semaphore, then
;; try again
(enable-interrupts)
(sema-wait l)
(loop 0)]
[(mutex? l)
;; Using (permanent) mutex as lock
(mutex-acquire l)]
[else
(enable-interrupts)
(loop 0)]))])))
(define (spinlock-release q)
(unless (#%box-cas! q #t #f)
;; Contention must have promoted to a semaphore or mutex...
(let ([l (#%unbox q)])
(cond
[(mutex? l)
;; Must have been acquired as a plain mutex
(mutex-release l)]
[else
;; Transitioning, so finish transition to a plain mutex
(#%set-box! q (make-mutex))
(sema-post-all l)])))
(enable-interrupts)
(#%void))
;; Semaphores that include a "post all" operation
(define-record sema (v m c))
(define (new-sema)
(make-sema 0 (make-mutex) (make-condition)))
(define (sema-wait l)
(mutex-acquire (sema-m l))
(let loop ()
(let ([v (sema-v l)])
(cond
[(eqv? v #t) ; posted all
(mutex-release (sema-m l))]
[(eqv? 0 v)
(condition-wait (sema-c l) (sema-m l))
(loop)]
[else
(set-sema-v! l (sub1 v))
(mutex-release (sema-m l))]))))
(define (sema-post l)
(mutex-acquire (sema-m l))
(set-sema-v! l (add1 (sema-v l)))
(condition-signal (sema-c l))
(mutex-release (sema-m l)))
(define (sema-post-all l)
(mutex-acquire (sema-m l))
(set-sema-v! l #t)
(condition-broadcast (sema-c l))
(mutex-release (sema-m l)))
(define (make-lock for-kind)
(cond
[(eq? for-kind 'equal?)
(make-scheduler-lock)]
[else
(make-spinlock)]))
(define lock-acquire
(case-lambda
[(lock)
(cond
[(not lock) (#%void)]
[(spinlock? lock)
(spinlock-acquire lock)]
[else
(scheduler-lock-acquire lock)])]
[(lock block?)
(cond
[(not lock) (#%void)]
[(spinlock? lock)
(spinlock-acquire lock block?)]
[else
(scheduler-lock-acquire lock)])]))
(define (lock-release lock)
(cond
[(not lock) (#%void)]
[(spinlock? lock)
(spinlock-release lock)]
[else
(scheduler-lock-release lock)]))
(define global-lock (make-spinlock))
(define global-lock (make-mutex))
(define-syntax-rule (with-global-lock* e ...)
(with-global-lock e ...))
(define-syntax-rule (with-global-lock e ...)
(begin
(spinlock-acquire global-lock)
(mutex-acquire global-lock)
(begin0
(begin e ...)
(spinlock-release global-lock))))])
(mutex-release global-lock))))])
;; ------------------------------------------------------------
;; Locks used for hash tables
(define (make-lock for-kind)
(cond
[(eq? for-kind 'equal?)
(make-scheduler-lock)]
[else #f]))
(define lock-acquire
(case-lambda
[(lock)
(cond
[(not lock) (disable-interrupts)]
[else
(scheduler-lock-acquire lock)])]))
(define (lock-release lock)
(cond
[(not lock) (enable-interrupts) (#%void)]
[else
(scheduler-lock-release lock)]))

View File

@ -4,16 +4,21 @@
;; that are all in the same place.
;; The first slot in the vector holds a hash table for allocated
;; place-local values, and the rest are used by the thread, io, etc.,
;; layers for directly accessed variables.
;; place-local values, the last is used by "async-callback.ss", and
;; the rest are used by the thread, io, etc., layers for directly
;; accessed variables.
(define NUM-PLACE-REGISTERS 128)
(define NUM-PLACE-REGISTERS 128) ; 3 thorugh 126 available for subsystems
(define LOCAL_TABLE-INDEX 0)
(define ASYNC-CALLBACK-REGISTER-INDEX 1)
;; index 2 is available
(define-virtual-register place-registers (#%make-vector NUM-PLACE-REGISTERS 0))
(define place-register-inits (#%make-vector NUM-PLACE-REGISTERS 0))
(define (init-place-locals!)
(#%vector-set! (place-registers) 0 (make-weak-hasheq)))
(#%vector-set! (place-registers) LOCAL_TABLE-INDEX (make-weak-hasheq)))
(define-record place-local (default-v))
@ -21,13 +26,13 @@
(make-place-local v))
(define (unsafe-place-local-ref pl)
(let ([v (hash-ref (#%vector-ref (place-registers) 0) pl none)])
(let ([v (hash-ref (#%vector-ref (place-registers) LOCAL_TABLE-INDEX) pl none)])
(if (eq? v none)
(place-local-default-v pl)
v)))
(define (unsafe-place-local-set! pl v)
(hash-set! (#%vector-ref (place-registers) 0) pl v))
(hash-set! (#%vector-ref (place-registers) LOCAL_TABLE-INDEX) pl v))
(define (place-local-register-ref i)
(#%vector-ref (place-registers) i))
@ -47,6 +52,16 @@
;; ----------------------------------------
(define place-async-callback-queue
(case-lambda
[() (let ([v (#%vector-ref (place-registers) ASYNC-CALLBACK-REGISTER-INDEX)])
(if (eqv? v 0)
#f
v))]
[(v) (#%vector-set! (place-registers) ASYNC-CALLBACK-REGISTER-INDEX v)]))
;; ----------------------------------------
(define place-specific-table (unsafe-make-place-local #f))
(define (unsafe-get-place-table)

View File

@ -28,6 +28,7 @@
[get-thread-id rumble:get-thread-id]
[get-initial-pthread rumble:get-initial-pthread]
[current-place-roots rumble:current-place-roots]
[call-as-asynchronous-callback rumble:call-as-asynchronous-callback]
[set-ctl-c-handler! rumble:set-ctl-c-handler!]
[set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!]
[set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!]
@ -162,6 +163,7 @@
'call-with-current-pthread-continuation call/cc
'exit place-exit
'pthread? rumble:thread?
'call-as-asynchronous-callback rumble:call-as-asynchronous-callback
'get-thread-id rumble:get-thread-id
'make-condition rumble:make-condition
'condition-wait rumble:condition-wait
@ -194,4 +196,4 @@
(lambda ()
(current-atomic (fx- (current-atomic) 1))))
(set-future-callbacks! future-block current-future-prompt))
(set-future-callbacks! future-block future-sync current-future-prompt))

View File

@ -16,7 +16,7 @@
#define MZSCHEME_VERSION_X 7
#define MZSCHEME_VERSION_Y 4
#define MZSCHEME_VERSION_Z 0
#define MZSCHEME_VERSION_W 10
#define MZSCHEME_VERSION_W 11
/* A level of indirection makes `#` work as needed: */
#define AS_a_STR_HELPER(x) #x

View File

@ -283,6 +283,7 @@
'make-mutex (lambda () (make-semaphore 1))
'mutex-acquire (lambda (s) (semaphore-wait s))
'mutex-release (lambda (s) (semaphore-post s))
'call-as-asynchronous-callback (lambda (thunk) (thunk))
'continuation-current-primitive (lambda (k) #f)))
;; add dummy definitions that implement pthreads and conditions etc.

View File

@ -71,8 +71,11 @@
(number->string proc-id)
": "
(if (and (eqv? proc-id 0)
(eq? action 'block))
(string-append "HANDLING: "
(or (eq? action 'block)
(eq? action 'sync)))
(string-append (if (eq? action 'block)
"HANDLING: "
"synchronizing: ")
(symbol->string
(or (future-event-prim-name e)
'|[unknown]|)))
@ -88,6 +91,7 @@
[(start-work) "started work"]
[(end-work) "ended work"]
[(block) "BLOCKING on process 0"]
[(sync) "synchronizing with process 0"]
[(touch) "touching future: touch"]
[(result) "result determined"]
[(suspend) "suspended"]

View File

@ -28,6 +28,7 @@
would-be-future
touch
future-block
future-sync
current-future-prompt
currently-running-future
reset-future-logs-for-tracing!
@ -353,6 +354,35 @@
;; ----------------------------------------
;; Call `thunk` in the place's main thread:
(define (future-sync who thunk)
(define me-f (current-future))
(cond
[(future*-would-be? me-f)
(current-future #f)
(log-future 'sync (future*-id me-f) #:prim-name who)
(let ([v (thunk)])
(log-future 'result (future*-id me-f))
(current-future me-f)
v)]
[else
;; Atomic mode prevents getting terminated or swapped out
;; while we block on the main thread
(current-atomic (add1 (current-atomic)))
(begin0
;; Host's `call-as-asynchronous-callback` will post `thunk`
;; so that it's returned by `host:poll-async-callbacks` to
;; the scheduler in the place's main thread
(host:call-as-asynchronous-callback
(lambda ()
(log-future 'sync (future*-id me-f) #:prim-name who)
(let ([v (thunk)])
(log-future 'result (future*-id me-f))
v)))
(current-atomic (sub1 (current-atomic))))]))
;; ----------------------------------------
(define pthread-count 1)
;; Called by io layer

View File

@ -95,4 +95,6 @@
[mutex-release host:mutex-release]
threaded?
[call-as-asynchronous-callback host:call-as-asynchronous-callback]
continuation-current-primitive)

View File

@ -191,6 +191,7 @@
would-be-future
current-future
future-block
future-sync
current-future-prompt
reset-future-logs-for-tracing!
mark-future-trace-end!