diff --git a/pkgs/racket-test/tests/racket/place-channel-compete.rkt b/pkgs/racket-test/tests/racket/place-channel-compete.rkt new file mode 100644 index 0000000000..e027b67828 --- /dev/null +++ b/pkgs/racket-test/tests/racket/place-channel-compete.rkt @@ -0,0 +1,52 @@ +#lang racket/base +(require racket/place) + +;; Make sure multiple places can compete for a lock that is +;; implemented by a place channel. This test turns out to also make +;; sure that the link to the read end of a message queue is not too +;; weak when a suspend thread is waiting on the channel. + +(define (lock-via-channel lock-ch) + (let ([saved-ch #f]) + (lambda (mode) + (case mode + [(lock) + (define ch (sync lock-ch)) + (place-channel-put ch 'lock) + (set! saved-ch ch)] + [(unlock) + (place-channel-put saved-ch 'done) + (set! saved-ch #f)])))) + +(define (go) + (place pch + #;(printf "start\n") + (define lock-ch (place-channel-get pch)) + (define lock (lock-via-channel lock-ch)) + (for ([i (in-range 100)]) + #;(printf "~s\n" i) + (lock 'lock) + (lock 'unlock)) + #;(printf "done\n"))) + +(module+ main + (define-values (lock-ch lock-ch-in) (place-channel)) + (thread (lambda () + (define-values (ch ch-in) (place-channel)) + (let loop () + (place-channel-put lock-ch-in ch) + (unless (eq? (place-channel-get ch-in) 'lock) + (error "bad lock")) + (unless (eq? (place-channel-get ch-in) 'done) + (error "bad unlock")) + (loop)))) + + (define ps + (for/list ([i (in-range 4)]) + (define p (go)) + (place-channel-put p lock-ch) + p)) + + (map place-wait ps)) + +(module+ test (require (submod ".." main))) diff --git a/racket/src/thread/place.rkt b/racket/src/thread/place.rkt index 568c2ba59a..8ee8dac89c 100644 --- a/racket/src/thread/place.rkt +++ b/racket/src/thread/place.rkt @@ -233,8 +233,9 @@ (struct message-queue (lock [q #:mutable] [rev-q #:mutable] - key-box ; holds write key when non-empty - [waiters #:mutable]) ; hash of waiting place -> semaphore + out-key-box ; holds write key when non-empty + [waiters #:mutable] ; hash of waiting place -> semaphore + in-key-box) ; holds read key when waiters #:authentic) (define (make-message-queue) @@ -242,7 +243,8 @@ '() '() (box #f) - #hash())) + #hash() + (box #f))) (define (enqueue! mq msg wk) (define lock (message-queue-lock mq)) @@ -251,8 +253,11 @@ (set-message-queue-rev-q! mq (cons msg (message-queue-rev-q mq))) (define waiters (message-queue-waiters mq)) (set-message-queue-waiters! mq '#hash()) - (set-box! (message-queue-key-box mq) wk) + (set-box! (message-queue-out-key-box mq) wk) + (set-box! (message-queue-in-key-box mq) #f) (host:mutex-release lock) + ;; Waking all waiters is not great, but we don't know which of + ;; them is still waiting and can reliably succeed (for ([(pl s) (in-hash waiters)]) (host:mutex-acquire (place-lock pl)) (set-place-dequeue-semas! pl (cons s (place-dequeue-semas pl))) @@ -266,7 +271,7 @@ ;; a message. Note that if the message queue becomes ;; inaccessible (so no writers), then the semaphores ;; become inaccessible. -(define (dequeue! mq success-k fail-k) +(define (dequeue! mq rk success-k fail-k) (ensure-wakeup-handle!) (define lock (message-queue-lock mq)) (host:mutex-acquire lock) @@ -286,28 +291,31 @@ [else (define s (make-semaphore)) (set-message-queue-waiters! mq (hash-set waiters current-place s)) + (set-box! (message-queue-in-key-box mq) rk) (host:mutex-release lock) (fail-k s)])] [else (define new-q (cdr q)) (set-message-queue-q! mq new-q) (when (null? new-q) - (set-box! (message-queue-key-box mq) #f)) + (set-box! (message-queue-out-key-box mq) #f)) (host:mutex-release lock) (success-k (car q))])) ;; ---------------------------------------- -(struct pchannel (in-mq-e ; ephemeron of writer key and message-queue - out-mq-e ; ephemeron of reader key and message-queue +(struct pchannel (in-mq-e ; ephemeron of writer key and message-queue + out-mq-e ; ephemeron of reader key and message-queue reader-key writer-key - in-key-box) ; causes in-mq-e to be retained when non-empty + in-key-box ; causes in-mq-e value to be retained when non-empty + out-key-box) ; causes out-mq-e value to be retained when waiters #:reflection-name 'place-channel #:property prop:evt (poller (lambda (self poll-ctx) (define in-mq (ephemeron-value (pchannel-in-mq-e self))) (if in-mq (dequeue! in-mq + (pchannel-reader-key self) (lambda (v) (values #f (wrap-evt @@ -336,8 +344,10 @@ (define wk1 (gensym 'write)) (define rk2 (gensym 'read)) (define wk2 (gensym 'write)) - (values (pchannel (make-ephemeron wk1 mq1) (make-ephemeron rk2 mq2) rk1 wk2 (message-queue-key-box mq1)) - (pchannel (make-ephemeron wk2 mq2) (make-ephemeron rk1 mq1) rk2 wk1 (message-queue-key-box mq2)))) + (values (pchannel (make-ephemeron wk1 mq1) (make-ephemeron rk2 mq2) rk1 wk2 + (message-queue-out-key-box mq1) (message-queue-in-key-box mq2)) + (pchannel (make-ephemeron wk2 mq2) (make-ephemeron rk1 mq1) rk2 wk1 + (message-queue-out-key-box mq2) (message-queue-in-key-box mq1)))) (define/who (place-channel-get in-pch) (check who place-channel? in-pch) @@ -347,6 +357,7 @@ (begin (start-atomic) (dequeue! in-mq + (pchannel-reader-key pch) (lambda (v) (end-atomic) (un-message-ize v))