diff --git a/pkgs/base/info.rkt b/pkgs/base/info.rkt index f094655a3f..e2867e9cea 100644 --- a/pkgs/base/info.rkt +++ b/pkgs/base/info.rkt @@ -12,7 +12,7 @@ (define collection 'multi) -(define version "7.4.0.10") +(define version "7.4.0.11") (define deps `("racket-lib" ["racket" #:version ,version])) diff --git a/pkgs/racket-test/tests/future/future.rkt b/pkgs/racket-test/tests/future/future.rkt index 88d0d75f54..67d1cad1de 100644 --- a/pkgs/racket-test/tests/future/future.rkt +++ b/pkgs/racket-test/tests/future/future.rkt @@ -27,22 +27,31 @@ We should also test deep continuations. (define-struct future-event (future-id process-id what time prim-name target-fid) #:prefab) -(define (get-events-of-type type log) - (filter (λ (e) - (equal? (future-event-what e) type)) +(define (get-events-of-type type log) + (filter (λ (e) + (equal? (future-event-what e) type)) log)) (define (get-blocks log) - (get-events-of-type 'block log)) + (get-events-of-type 'block log)) - (define (get-touch-blocks log) - (get-events-of-type 'touch log)) +(define (get-sync-blocks log) + (get-events-of-type 'sync log)) - (define (get-blocks-on prim log) - (filter (λ (e) - (equal? (future-event-prim-name e) - prim)) - (get-blocks log))) +(define (get-touch-blocks log) + (get-events-of-type 'touch log)) + +(define (get-blocks-on prim log) + (filter (λ (e) + (equal? (future-event-prim-name e) + prim)) + (get-blocks log))) + +(define (get-sync-blocks-on prim log) + (filter (λ (e) + (equal? (future-event-prim-name e) + prim)) + (get-sync-blocks log))) (when (futures-enabled?) (define recv (make-log-receiver (current-logger) 'debug)) @@ -96,7 +105,25 @@ We should also test deep continuations. (check-equal? 5 (length (get-blocks log))) (check-equal? 1 (length (get-touch-blocks log))) (check-equal? 4 (length (get-blocks-on 'printf log))) - (check-equal? 1 (length (get-blocks-on 'would-be-future log)))])))) + (check-equal? 1 (length (get-blocks-on 'would-be-future log)))]))) + + (let ([f (let ([ht (make-hasheq)]) + (would-be-future (λ () + (hash-set! ht 'ok 5) + (hash-ref ht 'ok))))]) + (touch f) + (let ([log (raw-log-output)]) + ;; Racket CS syncs on hash-ref, traditional Racket blocks + (case (system-type 'vm) + [(chez-scheme) + (check-equal? 2 (length (get-sync-blocks log))) + (check-equal? 1 (length (get-sync-blocks-on 'hash-set! log))) + (check-equal? 1 (length (get-sync-blocks-on 'hash-ref log)))] + [else + (check-equal? 2 (length (get-blocks log))) + (check-equal? 1 (length (get-blocks-on 'hash-set! log))) + (check-equal? 1 (length (get-blocks-on 'hash-ref log)))]) + (check-equal? 0 (length (get-touch-blocks log)))))) ;; ---------------------------------------- diff --git a/racket/src/cs/Makefile b/racket/src/cs/Makefile index 7e99ff9c29..8502cefaec 100644 --- a/racket/src/cs/Makefile +++ b/racket/src/cs/Makefile @@ -298,6 +298,7 @@ RUMBLE_SRCS = rumble/define.ss \ rumble/place.ss \ rumble/errno-data.ss \ rumble/foreign.ss \ + rumble/async-callback.ss \ rumble/future.ss \ rumble/version.ss \ rumble/inline.ss \ diff --git a/racket/src/cs/place-register.ss b/racket/src/cs/place-register.ss index 37a2accc29..279d3d9a13 100644 --- a/racket/src/cs/place-register.ss +++ b/racket/src/cs/place-register.ss @@ -4,7 +4,7 @@ ;; Allocation of place registers to built-in subsystems, where the ;; first index is reserved for Rumble: -(meta chez:define thread-register-start 1) +(meta chez:define thread-register-start 3) (meta chez:define thread-register-count 31) (meta chez:define io-register-start (+ thread-register-start thread-register-count)) diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index 8d38dcf597..064a8b921f 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -707,6 +707,7 @@ set-future-callbacks! install-primitives-table! continuation-current-primitive + call-as-asynchronous-callback ;; compile-time use in "thread.sls" current-atomic-virtual-register @@ -792,6 +793,7 @@ (include "rumble/place.ss") (include "rumble/errno-data.ss") (include "rumble/foreign.ss") + (include "rumble/async-callback.ss") (include "rumble/future.ss") (include "rumble/inline.ss") diff --git a/racket/src/cs/rumble/async-callback.ss b/racket/src/cs/rumble/async-callback.ss new file mode 100644 index 0000000000..7bd343ecc8 --- /dev/null +++ b/racket/src/cs/rumble/async-callback.ss @@ -0,0 +1,59 @@ + +(define-record async-callback-queue (lock condition in wakeup)) + +(define (current-async-callback-queue) + (place-async-callback-queue)) + +(define (async-callback-place-init!) + (place-async-callback-queue (make-async-callback-queue (make-mutex) + (make-condition) + '() + (make-async-callback-poll-wakeup)))) + +(define (call-as-asynchronous-callback thunk) + (with-interrupts-disabled* + (async-callback-queue-call (current-async-callback-queue) thunk #t))) + +(define (async-callback-queue-call async-callback-queue thunk need-interrupts?) + (let* ([result-done? (box #f)] + [result #f] + [q async-callback-queue] + [m (async-callback-queue-lock q)]) + (mutex-acquire m) + (set-async-callback-queue-in! q (cons (lambda () + (set! result (thunk)) + (mutex-acquire m) + (set-box! result-done? #t) + (condition-broadcast (async-callback-queue-condition q)) + (mutex-release m)) + (async-callback-queue-in q))) + ((async-callback-queue-wakeup q)) + (let loop () + (unless (unbox result-done?) + (when need-interrupts? + ;; Enable interrupts so that the thread is deactivated + ;; when we wait on the condition + (enable-interrupts)) + (condition-wait (async-callback-queue-condition q) m) + (when need-interrupts? (disable-interrupts)) + (loop))) + (mutex-release m) + result)) + +(define make-async-callback-poll-wakeup (lambda () void)) +(define (set-make-async-callback-poll-wakeup! make-wakeup) + (set! make-async-callback-poll-wakeup make-wakeup)) + +;; Returns callbacks to run in atomic mode +(define (poll-async-callbacks) + (let ([q (current-async-callback-queue)]) + (mutex-acquire (async-callback-queue-lock q)) + (let ([in (async-callback-queue-in q)]) + (cond + [(null? in) + (mutex-release (async-callback-queue-lock q)) + '()] + [else + (set-async-callback-queue-in! q '()) + (mutex-release (async-callback-queue-lock q)) + (reverse in)])))) diff --git a/racket/src/cs/rumble/datum.ss b/racket/src/cs/rumble/datum.ss index 496ac1afcd..14a686dfbf 100644 --- a/racket/src/cs/rumble/datum.ss +++ b/racket/src/cs/rumble/datum.ss @@ -4,13 +4,14 @@ (define (set-intern-regexp?! p) (set! intern-regexp? p)) (define (datum-intern-literal v) + (when (current-future) (block-future)) (cond [(or (number? v) (string? v) (char? v) (bytes? v) (intern-regexp? v)) - (with-interrupts-disabled + (with-interrupts-disabled* (or (weak-hash-ref-key datums v #f) (let ([v (cond [(string? v) (string->immutable-string v)] diff --git a/racket/src/cs/rumble/foreign.ss b/racket/src/cs/rumble/foreign.ss index a858d10b62..dfc30accc9 100644 --- a/racket/src/cs/rumble/foreign.ss +++ b/racket/src/cs/rumble/foreign.ss @@ -1545,7 +1545,7 @@ (#%$keep-live v) ... result)) -(define call-locks (make-hasheq)) +(define call-locks (make-eq-hashtable)) (define (ffi-call/callable call? in-types out-type abi save-errno lock-name blocking? atomic? async-apply) (let* ([conv (case abi @@ -1633,10 +1633,11 @@ [arg-makers (cddr gen-proc+ret-maker+arg-makers)] [async-callback-queue (and (procedure? async-apply) (current-async-callback-queue))] [lock (and lock-name - (or (hash-ref call-locks (string->symbol lock-name) #f) - (let ([lock (make-mutex)]) - (hash-set! call-locks (string->symbol lock-name) lock) - lock)))]) + (with-global-lock + (or (eq-hashtable-ref call-locks (string->symbol lock-name) #f) + (let ([lock (make-mutex)]) + (eq-hashtable-set! call-locks (string->symbol lock-name) lock) + lock))))]) (cond [call? (cond @@ -1847,13 +1848,7 @@ (define-virtual-register place-thread-category PLACE-KNOWN-THREAD) (define (register-as-place-main!) (place-thread-category PLACE-MAIN-THREAD) - (foreign-place-init!)) - -(define (foreign-place-init!) - (current-async-callback-queue (make-async-callback-queue (make-mutex) - (make-condition) - '() - (make-async-callback-poll-wakeup)))) + (async-callback-place-init!)) ;; Can be called in any Scheme thread (define (call-as-atomic-callback thunk atomic? async-apply async-callback-queue) @@ -1881,35 +1876,12 @@ [else ;; Not in a place's main thread; queue an async callback ;; and wait for the response - (let* ([result-done? (box #f)] - [result #f] - [q async-callback-queue] - [m (async-callback-queue-lock q)] - [need-interrupts? - ;; If we created this thread by `fork-pthread`, we must - ;; have gotten here by a foreign call, so interrupts are - ;; currently disabled - (eqv? (place-thread-category) PLACE-KNOWN-THREAD)]) - (mutex-acquire m) - (set-async-callback-queue-in! q (cons (lambda () - (set! result (|#%app| async-apply thunk)) - (mutex-acquire m) - (set-box! result-done? #t) - (condition-broadcast (async-callback-queue-condition q)) - (mutex-release m)) - (async-callback-queue-in q))) - ((async-callback-queue-wakeup q)) - (let loop () - (unless (unbox result-done?) - (when need-interrupts? - ;; Enable interrupts so that the thread is deactivated - ;; when we wait on the condition - (enable-interrupts)) - (condition-wait (async-callback-queue-condition q) m) - (when need-interrupts? (disable-interrupts)) - (loop))) - (mutex-release m) - result)])) + (async-callback-queue-call async-callback-queue + (lambda () (|#%app| async-apply thunk)) + ;; If we created this thread by `fork-pthread`, we must + ;; have gotten here by a foreign call, so interrupts are + ;; currently disabled + (eqv? (place-thread-category) PLACE-KNOWN-THREAD))])) (define scheduler-start-atomic void) (define scheduler-end-atomic void) @@ -1917,28 +1889,6 @@ (set! scheduler-start-atomic start-atomic) (set! scheduler-end-atomic end-atomic)) -(define make-async-callback-poll-wakeup (lambda () void)) -(define (set-make-async-callback-poll-wakeup! make-wakeup) - (set! make-async-callback-poll-wakeup make-wakeup)) - -(define-record async-callback-queue (lock condition in wakeup)) - -(define-virtual-register current-async-callback-queue #f) - -;; Returns callbacks to run in atomic mode -(define (poll-async-callbacks) - (let ([q (current-async-callback-queue)]) - (mutex-acquire (async-callback-queue-lock q)) - (let ([in (async-callback-queue-in q)]) - (cond - [(null? in) - (mutex-release (async-callback-queue-lock q)) - '()] - [else - (set-async-callback-queue-in! q '()) - (mutex-release (async-callback-queue-lock q)) - (reverse in)])))) - ;; ---------------------------------------- (define-record-type (callback create-callback ffi-callback?) diff --git a/racket/src/cs/rumble/future.ss b/racket/src/cs/rumble/future.ss index c65b90a31c..14bb04bc92 100644 --- a/racket/src/cs/rumble/future.ss +++ b/racket/src/cs/rumble/future.ss @@ -7,8 +7,24 @@ #'(virtual-register pos))])) (define block-future (lambda () (void))) +(define sync-future (lambda (who thunk) (thunk))) (define current-future-prompt (lambda () (void))) -(define (set-future-callbacks! block current-prompt) +(define (set-future-callbacks! block sync current-prompt) (set! block-future block) + (set! sync-future sync) (set! current-future-prompt current-prompt)) + +;; Call `thunk` in the main thread to synchronizel the thunk must be +;; constant-time, never rasse an exception, and return a single value +(define (future-sync who thunk) + (let ([disabled? (> (disable-interrupts) 1)]) + (enable-interrupts) + (cond + [disabled? + ;; Interrupts were already disabled, so we're holding the global + ;; lock, in a garbage collection, or something like that --- as + ;; synchronized as possible already + (thunk)] + [else + (sync-future who thunk)]))) diff --git a/racket/src/cs/rumble/hash.ss b/racket/src/cs/rumble/hash.ss index e104f61105..c40859c475 100644 --- a/racket/src/cs/rumble/hash.ss +++ b/racket/src/cs/rumble/hash.ss @@ -1,3 +1,6 @@ +;; Mutable hash table are safe for engine-based concurrency, but they +;; are not safe for concurrent access across Scheme threads. + ;; Mutable and weak-equal hash tables need a lock ;; and an iteration vector (define-record locked-iterable-hash (lock @@ -8,7 +11,11 @@ ;; tables in a `mutable-hash` record (define-record mutable-hash locked-iterable-hash (ht)) ; Chez Scheme hashtable +(define-record eq-mutable-hash mutable-hash + ()) + (define (create-mutable-hash ht kind) (make-mutable-hash (make-lock kind) #f #f ht)) +(define (create-eq-mutable-hash ht) (make-eq-mutable-hash (make-lock 'eq?) #f #f ht)) (define (mutable-hash-lock ht) (locked-iterable-hash-lock ht)) (define (mutable-hash-cells ht) (locked-iterable-hash-cells ht)) @@ -25,17 +32,17 @@ (define make-hasheq (case-lambda - [() (create-mutable-hash (make-eq-hashtable) 'eq?)] + [() (create-eq-mutable-hash (make-eq-hashtable))] [(alist) (fill-hash! 'make-hasheq (make-hasheq) alist)])) (define (eq-hashtable->hash ht) - (create-mutable-hash ht 'eq?)) + (create-eq-mutable-hash ht)) (define (hash->eq-hashtable ht) (mutable-hash-ht ht)) (define make-weak-hasheq (case-lambda - [() (create-mutable-hash (make-weak-eq-hashtable) 'eq?)] + [() (create-eq-mutable-hash (make-weak-eq-hashtable))] [(alist) (fill-hash! 'make-weak-hasheq (make-weak-hasheq) alist)])) (define make-hasheqv @@ -89,10 +96,10 @@ (define (hash-set! ht k v) (cond [(mutable-hash? ht) - (lock-acquire (mutable-hash-lock ht)) - (hashtable-set! (mutable-hash-ht ht) k v) - (set-locked-iterable-hash-retry?! ht #t) - (lock-release (mutable-hash-lock ht))] + (cond + [(and (current-future) (eq-mutable-hash? ht)) + (future-sync 'hash-set! (lambda () (mutable-hash-set! ht k v)))] + [else (mutable-hash-set! ht k v)])] [(weak-equal-hash? ht) (weak-hash-set! ht k v)] [(and (impersonator? ht) (let ([ht (impersonator-val ht)]) @@ -101,22 +108,19 @@ (impersonate-hash-set! ht k v)] [else (raise-argument-error 'hash-set! "(and/c hash? (not/c immutable?))" ht)])) +(define (mutable-hash-set! ht k v) + (lock-acquire (mutable-hash-lock ht)) + (hashtable-set! (mutable-hash-ht ht) k v) + (set-locked-iterable-hash-retry?! ht #t) + (lock-release (mutable-hash-lock ht))) + (define (hash-remove! ht k) (cond [(mutable-hash? ht) - (lock-acquire (mutable-hash-lock ht)) - (let ([cell (and (mutable-hash-cells ht) - (hashtable-ref-cell (mutable-hash-ht ht) k))]) - (cond - [cell - (hashtable-delete! (mutable-hash-ht ht) k) - ;; Clear cell, because it may be in `(locked-iterable-hash-cells ht)` - (set-car! cell #!bwp) - (set-cdr! cell #!bwp) - (set-locked-iterable-hash-retry?! ht #t)] - [else - (hashtable-delete! (mutable-hash-ht ht) k)])) - (lock-release (mutable-hash-lock ht))] + (cond + [(and (current-future) (eq-mutable-hash? ht)) + (future-sync 'hash-remove! (lambda () (mutable-hash-remove! ht k)))] + [else (mutable-hash-remove! ht k)])] [(weak-equal-hash? ht) (weak-hash-remove! ht k)] [(and (impersonator? ht) (let ([ht (impersonator-val ht)]) @@ -125,13 +129,27 @@ (impersonate-hash-remove! ht k)] [else (raise-argument-error 'hash-remove! "(and/c hash? (not/c immutable?))" ht)])) +(define (mutable-hash-remove! ht k) + (lock-acquire (mutable-hash-lock ht)) + (let ([cell (and (mutable-hash-cells ht) + (hashtable-ref-cell (mutable-hash-ht ht) k))]) + (cond + [cell + (hashtable-delete! (mutable-hash-ht ht) k) + ;; Clear cell, because it may be in `(locked-iterable-hash-cells ht)` + (set-car! cell #!bwp) + (set-cdr! cell #!bwp) + (set-locked-iterable-hash-retry?! ht #t)] + [else + (hashtable-delete! (mutable-hash-ht ht) k)])) + (lock-release (mutable-hash-lock ht))) + (define (hash-clear! ht) (cond [(mutable-hash? ht) - (lock-acquire (mutable-hash-lock ht)) - (set-locked-iterable-hash-cells! ht #f) - (hashtable-clear! (mutable-hash-ht ht)) - (lock-release (mutable-hash-lock ht))] + (cond + [(current-future) (future-sync 'hash-clear! (lambda () (mutable-hash-clear! ht)))] + [else (mutable-hash-clear! ht)])] [(weak-equal-hash? ht) (weak-hash-clear! ht)] [(and (impersonator? ht) (let ([ht (impersonator-val ht)]) @@ -145,17 +163,19 @@ (loop (hash-iterate-next ht i)))))] [else (raise-argument-error 'hash-clear! "(and/c hash? (not/c immutable?))" ht)])) +(define (mutable-hash-clear! ht) + (lock-acquire (mutable-hash-lock ht)) + (set-locked-iterable-hash-cells! ht #f) + (hashtable-clear! (mutable-hash-ht ht)) + (lock-release (mutable-hash-lock ht))) + (define (hash-copy ht) (cond [(mutable-hash? ht) - (lock-acquire (mutable-hash-lock ht)) - (let ([new-ht (create-mutable-hash (hashtable-copy (mutable-hash-ht ht) #t) - (cond - [(hash-eq? ht) 'eq?] - [(hash-eqv? ht) 'eqv?] - [else 'equal?]))]) - (lock-release (mutable-hash-lock ht)) - new-ht)] + (cond + [(and (current-future) (eq-mutable-hash? ht)) + (future-sync 'hash-copy (lambda () (mutable-hash-copy ht)))] + [else (mutable-hash-copy ht)])] [(weak-equal-hash? ht) (weak-hash-copy ht)] [(intmap? ht) (let ([new-ht (cond @@ -173,6 +193,17 @@ (impersonate-hash-copy ht)] [else (raise-argument-error 'hash-copy "hash?" ht)])) +(define (mutable-hash-copy ht) + (lock-acquire (mutable-hash-lock ht)) + (let ([new-ht (if (eq-mutable-hash? ht) + (create-eq-mutable-hash (hashtable-copy (mutable-hash-ht ht) #t)) + (create-mutable-hash (hashtable-copy (mutable-hash-ht ht) #t) + (cond + [(hash-eqv? ht) 'eqv?] + [else 'equal?])))]) + (lock-release (mutable-hash-lock ht)) + new-ht)) + (define (hash-set ht k v) (cond [(intmap? ht) (intmap-set ht k v)] @@ -209,8 +240,7 @@ (define (hash-eq? ht) (cond - [(mutable-hash? ht) - (eq? (hashtable-equivalence-function (mutable-hash-ht ht)) eq?)] + [(mutable-hash? ht) (eq-mutable-hash? ht)] [(intmap? ht) (intmap-eq? ht)] [(weak-equal-hash? ht) #f] @@ -273,10 +303,18 @@ (define (hash-ref/none ht k) (cond [(mutable-hash? ht) - (lock-acquire (mutable-hash-lock ht)) - (let ([v (hashtable-ref (mutable-hash-ht ht) k none)]) - (lock-release (mutable-hash-lock ht)) - v)] + (cond + [(eq-mutable-hash? ht) + ;; As long as we'e not in a future thread, it's an atomic action + ;; to access the mutable hash table using `eq-hashtable-ref`: + (if (current-future) + (future-sync 'hash-ref (lambda () (eq-hashtable-ref (mutable-hash-ht ht) k none))) + (eq-hashtable-ref (mutable-hash-ht ht) k none))] + [else + (lock-acquire (mutable-hash-lock ht)) + (let ([v (hashtable-ref (mutable-hash-ht ht) k none)]) + (lock-release (mutable-hash-lock ht)) + v)])] [(intmap? ht) (intmap-ref ht k none)] [(weak-equal-hash? ht) @@ -306,11 +344,11 @@ (define (hash-ref-key/none ht k) (cond [(mutable-hash? ht) - (lock-acquire (mutable-hash-lock ht)) - (let* ([pair (hashtable-ref-cell (mutable-hash-ht ht) k)] - [v (if pair (car pair) none)]) - (lock-release (mutable-hash-lock ht)) - v)] + (cond + [(and (current-future) (eq-mutable-hash? ht)) + (future-sync 'hash-ref-key (lambda () (mutable-hash-ref-key/none ht k)))] + [else + (mutable-hash-ref-key/none ht k)])] [(intmap? ht) (intmap-ref-key ht k none)] [(weak-equal-hash? ht) @@ -321,6 +359,13 @@ [else (raise-argument-error 'hash-ref-key "hash?" ht)])) +(define (mutable-hash-ref-key/none ht k) + (lock-acquire (mutable-hash-lock ht)) + (let* ([pair (hashtable-ref-cell (mutable-hash-ht ht) k)] + [v (if pair (car pair) none)]) + (lock-release (mutable-hash-lock ht)) + v)) + (define (fail-hash-ref who default) (if (procedure? default) (if (procedure-arity-includes? default 0) @@ -430,7 +475,12 @@ (define (hash-count ht) (cond - [(mutable-hash? ht) (hashtable-size (mutable-hash-ht ht))] + [(mutable-hash? ht) + (cond + [(current-future) + (future-sync 'hash-count (lambda () (hashtable-size (mutable-hash-ht ht))))] + [else + (hashtable-size (mutable-hash-ht ht))])] [(intmap? ht) (intmap-count ht)] [(weak-equal-hash? ht) (weak-hash-count ht)] [(and (impersonator? ht) diff --git a/racket/src/cs/rumble/lock.ss b/racket/src/cs/rumble/lock.ss index 9d14c8cc24..99ddbdeea6 100644 --- a/racket/src/cs/rumble/lock.ss +++ b/racket/src/cs/rumble/lock.ss @@ -13,32 +13,6 @@ [(not (threaded?)) ;; Using a Chez Scheme build without thread support, ;; but we need to cooperate with engine-based threads. - - ;; `eqv?`- and `eq?`-based tables appear to run with - ;; interrupts disabled, so they're safe for engine-based - ;; threads; just create a Racket-visible lock for - ;; `equal?`-based hash tables - (define (make-lock for-kind) - (and (eq? for-kind 'equal?) - (make-scheduler-lock))) - - (define lock-acquire - (case-lambda ;; so it matches the one below - [(lock) - (when lock - ;; Thread layer sets this callback to wait - ;; on a semaphore: - (scheduler-lock-acquire lock))] - [(lock _) - (when lock - ;; Thread layer sets this callback to wait - ;; on a semaphore: - (scheduler-lock-acquire lock))])) - - (define (lock-release lock) - (when lock - (scheduler-lock-release lock))) - ;; Use `with-global-lock*` when no lock is needed absent threads (define-syntax-rule (with-global-lock* e ...) (begin e ...)) @@ -49,144 +23,38 @@ (with-interrupts-disabled e ...))] [else - ;; Using a Chez Scheme build with thread support; make hash-table - ;; access thread-safe at that level for `eq?`- and `eqv?`-based - ;; tables. - ;; An `equal?`-based table is made safe at the level of Racket - ;; threads, but not at Chez threads. Blocking a Chez thread might - ;; block the Racket scheduler itself, so we just don't support it. + ;; Using a Chez Scheme build with thread support, so a stronger + ;; global lock is needed. - ;; Taking a lock disables interrupts, which ensures that the GC - ;; callback or other atomic actions can use hash tables without - ;; deadlocking. - - ;; Assume low contention on `eq?`- and `eqv?`-based tables across - ;; Chez Scheme threads, in which case a compare-and-set spinlock is - ;; usually good enough. But if not, transition to a real lock; use a - ;; mutex, but transitioning requires using an inintermediate - ;; semaphore. - (define (make-spinlock) - ;; Box content: #f (unlocked), #t (locked), sema (transitioning), or mutex - (box #f)) - (define (spinlock? v) (#%box? v)) - (define (spinlock-acquire q) - (let loop ([n 0]) - (disable-interrupts) - (cond - [(#%box-cas! q #f #t) - ;; Took lock - (#%void)] - [(eq? #t (#%unbox q)) - ;; Spin.. - (enable-interrupts) - (cond - [(fx= n 1000) - ;; There's contention after all, so trasition to a semaphore, - ;; where the current lock holder implicitly owns the semaphore. - ;; That lock holder can replace the semaphore with a mutex, - ;; which is cheaper to acquire and release. - (let ([lk (new-sema)]) - (#%box-cas! q #t lk) - (loop 0))] - [else - (loop (fx+ n 1))])] - [else - (let ([l (#%unbox q)]) - (cond - [(sema? l) - ;; Transitioning to slower lock; wait on semaphore, then - ;; try again - (enable-interrupts) - (sema-wait l) - (loop 0)] - [(mutex? l) - ;; Using (permanent) mutex as lock - (mutex-acquire l)] - [else - (enable-interrupts) - (loop 0)]))]))) - - (define (spinlock-release q) - (unless (#%box-cas! q #t #f) - ;; Contention must have promoted to a semaphore or mutex... - (let ([l (#%unbox q)]) - (cond - [(mutex? l) - ;; Must have been acquired as a plain mutex - (mutex-release l)] - [else - ;; Transitioning, so finish transition to a plain mutex - (#%set-box! q (make-mutex)) - (sema-post-all l)]))) - (enable-interrupts) - (#%void)) - - ;; Semaphores that include a "post all" operation - (define-record sema (v m c)) - (define (new-sema) - (make-sema 0 (make-mutex) (make-condition))) - (define (sema-wait l) - (mutex-acquire (sema-m l)) - (let loop () - (let ([v (sema-v l)]) - (cond - [(eqv? v #t) ; posted all - (mutex-release (sema-m l))] - [(eqv? 0 v) - (condition-wait (sema-c l) (sema-m l)) - (loop)] - [else - (set-sema-v! l (sub1 v)) - (mutex-release (sema-m l))])))) - (define (sema-post l) - (mutex-acquire (sema-m l)) - (set-sema-v! l (add1 (sema-v l))) - (condition-signal (sema-c l)) - (mutex-release (sema-m l))) - (define (sema-post-all l) - (mutex-acquire (sema-m l)) - (set-sema-v! l #t) - (condition-broadcast (sema-c l)) - (mutex-release (sema-m l))) - - (define (make-lock for-kind) - (cond - [(eq? for-kind 'equal?) - (make-scheduler-lock)] - [else - (make-spinlock)])) - - (define lock-acquire - (case-lambda - [(lock) - (cond - [(not lock) (#%void)] - [(spinlock? lock) - (spinlock-acquire lock)] - [else - (scheduler-lock-acquire lock)])] - [(lock block?) - (cond - [(not lock) (#%void)] - [(spinlock? lock) - (spinlock-acquire lock block?)] - [else - (scheduler-lock-acquire lock)])])) - - (define (lock-release lock) - (cond - [(not lock) (#%void)] - [(spinlock? lock) - (spinlock-release lock)] - [else - (scheduler-lock-release lock)])) - - (define global-lock (make-spinlock)) + (define global-lock (make-mutex)) (define-syntax-rule (with-global-lock* e ...) (with-global-lock e ...)) (define-syntax-rule (with-global-lock e ...) (begin - (spinlock-acquire global-lock) + (mutex-acquire global-lock) (begin0 (begin e ...) - (spinlock-release global-lock))))]) + (mutex-release global-lock))))]) + +;; ------------------------------------------------------------ +;; Locks used for hash tables + +(define (make-lock for-kind) + (cond + [(eq? for-kind 'equal?) + (make-scheduler-lock)] + [else #f])) + +(define lock-acquire + (case-lambda + [(lock) + (cond + [(not lock) (disable-interrupts)] + [else + (scheduler-lock-acquire lock)])])) + +(define (lock-release lock) + (cond + [(not lock) (enable-interrupts) (#%void)] + [else + (scheduler-lock-release lock)])) diff --git a/racket/src/cs/rumble/place.ss b/racket/src/cs/rumble/place.ss index b231ee570b..23c863f6ee 100644 --- a/racket/src/cs/rumble/place.ss +++ b/racket/src/cs/rumble/place.ss @@ -4,16 +4,21 @@ ;; that are all in the same place. ;; The first slot in the vector holds a hash table for allocated -;; place-local values, and the rest are used by the thread, io, etc., -;; layers for directly accessed variables. +;; place-local values, the last is used by "async-callback.ss", and +;; the rest are used by the thread, io, etc., layers for directly +;; accessed variables. -(define NUM-PLACE-REGISTERS 128) +(define NUM-PLACE-REGISTERS 128) ; 3 thorugh 126 available for subsystems + +(define LOCAL_TABLE-INDEX 0) +(define ASYNC-CALLBACK-REGISTER-INDEX 1) +;; index 2 is available (define-virtual-register place-registers (#%make-vector NUM-PLACE-REGISTERS 0)) (define place-register-inits (#%make-vector NUM-PLACE-REGISTERS 0)) (define (init-place-locals!) - (#%vector-set! (place-registers) 0 (make-weak-hasheq))) + (#%vector-set! (place-registers) LOCAL_TABLE-INDEX (make-weak-hasheq))) (define-record place-local (default-v)) @@ -21,13 +26,13 @@ (make-place-local v)) (define (unsafe-place-local-ref pl) - (let ([v (hash-ref (#%vector-ref (place-registers) 0) pl none)]) + (let ([v (hash-ref (#%vector-ref (place-registers) LOCAL_TABLE-INDEX) pl none)]) (if (eq? v none) (place-local-default-v pl) v))) (define (unsafe-place-local-set! pl v) - (hash-set! (#%vector-ref (place-registers) 0) pl v)) + (hash-set! (#%vector-ref (place-registers) LOCAL_TABLE-INDEX) pl v)) (define (place-local-register-ref i) (#%vector-ref (place-registers) i)) @@ -47,6 +52,16 @@ ;; ---------------------------------------- +(define place-async-callback-queue + (case-lambda + [() (let ([v (#%vector-ref (place-registers) ASYNC-CALLBACK-REGISTER-INDEX)]) + (if (eqv? v 0) + #f + v))] + [(v) (#%vector-set! (place-registers) ASYNC-CALLBACK-REGISTER-INDEX v)])) + +;; ---------------------------------------- + (define place-specific-table (unsafe-make-place-local #f)) (define (unsafe-get-place-table) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index 13cf3904ef..7eb045d84e 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -28,6 +28,7 @@ [get-thread-id rumble:get-thread-id] [get-initial-pthread rumble:get-initial-pthread] [current-place-roots rumble:current-place-roots] + [call-as-asynchronous-callback rumble:call-as-asynchronous-callback] [set-ctl-c-handler! rumble:set-ctl-c-handler!] [set-break-enabled-transition-hook! rumble:set-break-enabled-transition-hook!] [set-reachable-size-increments-callback! rumble:set-reachable-size-increments-callback!] @@ -162,6 +163,7 @@ 'call-with-current-pthread-continuation call/cc 'exit place-exit 'pthread? rumble:thread? + 'call-as-asynchronous-callback rumble:call-as-asynchronous-callback 'get-thread-id rumble:get-thread-id 'make-condition rumble:make-condition 'condition-wait rumble:condition-wait @@ -194,4 +196,4 @@ (lambda () (current-atomic (fx- (current-atomic) 1)))) - (set-future-callbacks! future-block current-future-prompt)) + (set-future-callbacks! future-block future-sync current-future-prompt)) diff --git a/racket/src/racket/src/schvers.h b/racket/src/racket/src/schvers.h index 60aeba9489..e5554623d4 100644 --- a/racket/src/racket/src/schvers.h +++ b/racket/src/racket/src/schvers.h @@ -16,7 +16,7 @@ #define MZSCHEME_VERSION_X 7 #define MZSCHEME_VERSION_Y 4 #define MZSCHEME_VERSION_Z 0 -#define MZSCHEME_VERSION_W 10 +#define MZSCHEME_VERSION_W 11 /* A level of indirection makes `#` work as needed: */ #define AS_a_STR_HELPER(x) #x diff --git a/racket/src/thread/bootstrap.rkt b/racket/src/thread/bootstrap.rkt index 64e8e8ffa5..ba4701e1e9 100644 --- a/racket/src/thread/bootstrap.rkt +++ b/racket/src/thread/bootstrap.rkt @@ -283,6 +283,7 @@ 'make-mutex (lambda () (make-semaphore 1)) 'mutex-acquire (lambda (s) (semaphore-wait s)) 'mutex-release (lambda (s) (semaphore-post s)) + 'call-as-asynchronous-callback (lambda (thunk) (thunk)) 'continuation-current-primitive (lambda (k) #f))) ;; add dummy definitions that implement pthreads and conditions etc. diff --git a/racket/src/thread/future-logging.rkt b/racket/src/thread/future-logging.rkt index 703abfaf68..0e834fbaac 100644 --- a/racket/src/thread/future-logging.rkt +++ b/racket/src/thread/future-logging.rkt @@ -71,8 +71,11 @@ (number->string proc-id) ": " (if (and (eqv? proc-id 0) - (eq? action 'block)) - (string-append "HANDLING: " + (or (eq? action 'block) + (eq? action 'sync))) + (string-append (if (eq? action 'block) + "HANDLING: " + "synchronizing: ") (symbol->string (or (future-event-prim-name e) '|[unknown]|))) @@ -88,6 +91,7 @@ [(start-work) "started work"] [(end-work) "ended work"] [(block) "BLOCKING on process 0"] + [(sync) "synchronizing with process 0"] [(touch) "touching future: touch"] [(result) "result determined"] [(suspend) "suspended"] diff --git a/racket/src/thread/future.rkt b/racket/src/thread/future.rkt index 70e72866d4..a694aab3aa 100644 --- a/racket/src/thread/future.rkt +++ b/racket/src/thread/future.rkt @@ -28,6 +28,7 @@ would-be-future touch future-block + future-sync current-future-prompt currently-running-future reset-future-logs-for-tracing! @@ -353,6 +354,35 @@ ;; ---------------------------------------- +;; Call `thunk` in the place's main thread: +(define (future-sync who thunk) + (define me-f (current-future)) + (cond + [(future*-would-be? me-f) + (current-future #f) + (log-future 'sync (future*-id me-f) #:prim-name who) + (let ([v (thunk)]) + (log-future 'result (future*-id me-f)) + (current-future me-f) + v)] + [else + ;; Atomic mode prevents getting terminated or swapped out + ;; while we block on the main thread + (current-atomic (add1 (current-atomic))) + (begin0 + ;; Host's `call-as-asynchronous-callback` will post `thunk` + ;; so that it's returned by `host:poll-async-callbacks` to + ;; the scheduler in the place's main thread + (host:call-as-asynchronous-callback + (lambda () + (log-future 'sync (future*-id me-f) #:prim-name who) + (let ([v (thunk)]) + (log-future 'result (future*-id me-f)) + v))) + (current-atomic (sub1 (current-atomic))))])) + +;; ---------------------------------------- + (define pthread-count 1) ;; Called by io layer diff --git a/racket/src/thread/host.rkt b/racket/src/thread/host.rkt index ec6c85115c..e0c24fedbf 100644 --- a/racket/src/thread/host.rkt +++ b/racket/src/thread/host.rkt @@ -95,4 +95,6 @@ [mutex-release host:mutex-release] threaded? + [call-as-asynchronous-callback host:call-as-asynchronous-callback] + continuation-current-primitive) diff --git a/racket/src/thread/main.rkt b/racket/src/thread/main.rkt index cee6e7c747..92710fb8ba 100644 --- a/racket/src/thread/main.rkt +++ b/racket/src/thread/main.rkt @@ -191,6 +191,7 @@ would-be-future current-future future-block + future-sync current-future-prompt reset-future-logs-for-tracing! mark-future-trace-end!