thread: fix place-channel ephemeron problem
Retains a strong link to a place-channel write end when there's at least one waiting thread. This is symmetic to keeping a strong link to the read end when the place-channel queue is non-empty. The change repairs a problem building documentation with places in `racocs setup`.
This commit is contained in:
parent
c78787a259
commit
a7ae05a414
52
pkgs/racket-test/tests/racket/place-channel-compete.rkt
Normal file
52
pkgs/racket-test/tests/racket/place-channel-compete.rkt
Normal file
|
@ -0,0 +1,52 @@
|
|||
#lang racket/base
|
||||
(require racket/place)
|
||||
|
||||
;; Make sure multiple places can compete for a lock that is
|
||||
;; implemented by a place channel. This test turns out to also make
|
||||
;; sure that the link to the read end of a message queue is not too
|
||||
;; weak when a suspend thread is waiting on the channel.
|
||||
|
||||
(define (lock-via-channel lock-ch)
|
||||
(let ([saved-ch #f])
|
||||
(lambda (mode)
|
||||
(case mode
|
||||
[(lock)
|
||||
(define ch (sync lock-ch))
|
||||
(place-channel-put ch 'lock)
|
||||
(set! saved-ch ch)]
|
||||
[(unlock)
|
||||
(place-channel-put saved-ch 'done)
|
||||
(set! saved-ch #f)]))))
|
||||
|
||||
(define (go)
|
||||
(place pch
|
||||
#;(printf "start\n")
|
||||
(define lock-ch (place-channel-get pch))
|
||||
(define lock (lock-via-channel lock-ch))
|
||||
(for ([i (in-range 100)])
|
||||
#;(printf "~s\n" i)
|
||||
(lock 'lock)
|
||||
(lock 'unlock))
|
||||
#;(printf "done\n")))
|
||||
|
||||
(module+ main
|
||||
(define-values (lock-ch lock-ch-in) (place-channel))
|
||||
(thread (lambda ()
|
||||
(define-values (ch ch-in) (place-channel))
|
||||
(let loop ()
|
||||
(place-channel-put lock-ch-in ch)
|
||||
(unless (eq? (place-channel-get ch-in) 'lock)
|
||||
(error "bad lock"))
|
||||
(unless (eq? (place-channel-get ch-in) 'done)
|
||||
(error "bad unlock"))
|
||||
(loop))))
|
||||
|
||||
(define ps
|
||||
(for/list ([i (in-range 4)])
|
||||
(define p (go))
|
||||
(place-channel-put p lock-ch)
|
||||
p))
|
||||
|
||||
(map place-wait ps))
|
||||
|
||||
(module+ test (require (submod ".." main)))
|
|
@ -233,8 +233,9 @@
|
|||
(struct message-queue (lock
|
||||
[q #:mutable]
|
||||
[rev-q #:mutable]
|
||||
key-box ; holds write key when non-empty
|
||||
[waiters #:mutable]) ; hash of waiting place -> semaphore
|
||||
out-key-box ; holds write key when non-empty
|
||||
[waiters #:mutable] ; hash of waiting place -> semaphore
|
||||
in-key-box) ; holds read key when waiters
|
||||
#:authentic)
|
||||
|
||||
(define (make-message-queue)
|
||||
|
@ -242,7 +243,8 @@
|
|||
'()
|
||||
'()
|
||||
(box #f)
|
||||
#hash()))
|
||||
#hash()
|
||||
(box #f)))
|
||||
|
||||
(define (enqueue! mq msg wk)
|
||||
(define lock (message-queue-lock mq))
|
||||
|
@ -251,8 +253,11 @@
|
|||
(set-message-queue-rev-q! mq (cons msg (message-queue-rev-q mq)))
|
||||
(define waiters (message-queue-waiters mq))
|
||||
(set-message-queue-waiters! mq '#hash())
|
||||
(set-box! (message-queue-key-box mq) wk)
|
||||
(set-box! (message-queue-out-key-box mq) wk)
|
||||
(set-box! (message-queue-in-key-box mq) #f)
|
||||
(host:mutex-release lock)
|
||||
;; Waking all waiters is not great, but we don't know which of
|
||||
;; them is still waiting and can reliably succeed
|
||||
(for ([(pl s) (in-hash waiters)])
|
||||
(host:mutex-acquire (place-lock pl))
|
||||
(set-place-dequeue-semas! pl (cons s (place-dequeue-semas pl)))
|
||||
|
@ -266,7 +271,7 @@
|
|||
;; a message. Note that if the message queue becomes
|
||||
;; inaccessible (so no writers), then the semaphores
|
||||
;; become inaccessible.
|
||||
(define (dequeue! mq success-k fail-k)
|
||||
(define (dequeue! mq rk success-k fail-k)
|
||||
(ensure-wakeup-handle!)
|
||||
(define lock (message-queue-lock mq))
|
||||
(host:mutex-acquire lock)
|
||||
|
@ -286,28 +291,31 @@
|
|||
[else
|
||||
(define s (make-semaphore))
|
||||
(set-message-queue-waiters! mq (hash-set waiters current-place s))
|
||||
(set-box! (message-queue-in-key-box mq) rk)
|
||||
(host:mutex-release lock)
|
||||
(fail-k s)])]
|
||||
[else
|
||||
(define new-q (cdr q))
|
||||
(set-message-queue-q! mq new-q)
|
||||
(when (null? new-q)
|
||||
(set-box! (message-queue-key-box mq) #f))
|
||||
(set-box! (message-queue-out-key-box mq) #f))
|
||||
(host:mutex-release lock)
|
||||
(success-k (car q))]))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
(struct pchannel (in-mq-e ; ephemeron of writer key and message-queue
|
||||
out-mq-e ; ephemeron of reader key and message-queue
|
||||
(struct pchannel (in-mq-e ; ephemeron of writer key and message-queue
|
||||
out-mq-e ; ephemeron of reader key and message-queue
|
||||
reader-key
|
||||
writer-key
|
||||
in-key-box) ; causes in-mq-e to be retained when non-empty
|
||||
in-key-box ; causes in-mq-e value to be retained when non-empty
|
||||
out-key-box) ; causes out-mq-e value to be retained when waiters
|
||||
#:reflection-name 'place-channel
|
||||
#:property prop:evt (poller (lambda (self poll-ctx)
|
||||
(define in-mq (ephemeron-value (pchannel-in-mq-e self)))
|
||||
(if in-mq
|
||||
(dequeue! in-mq
|
||||
(pchannel-reader-key self)
|
||||
(lambda (v)
|
||||
(values #f
|
||||
(wrap-evt
|
||||
|
@ -336,8 +344,10 @@
|
|||
(define wk1 (gensym 'write))
|
||||
(define rk2 (gensym 'read))
|
||||
(define wk2 (gensym 'write))
|
||||
(values (pchannel (make-ephemeron wk1 mq1) (make-ephemeron rk2 mq2) rk1 wk2 (message-queue-key-box mq1))
|
||||
(pchannel (make-ephemeron wk2 mq2) (make-ephemeron rk1 mq1) rk2 wk1 (message-queue-key-box mq2))))
|
||||
(values (pchannel (make-ephemeron wk1 mq1) (make-ephemeron rk2 mq2) rk1 wk2
|
||||
(message-queue-out-key-box mq1) (message-queue-in-key-box mq2))
|
||||
(pchannel (make-ephemeron wk2 mq2) (make-ephemeron rk1 mq1) rk2 wk1
|
||||
(message-queue-out-key-box mq2) (message-queue-in-key-box mq1))))
|
||||
|
||||
(define/who (place-channel-get in-pch)
|
||||
(check who place-channel? in-pch)
|
||||
|
@ -347,6 +357,7 @@
|
|||
(begin
|
||||
(start-atomic)
|
||||
(dequeue! in-mq
|
||||
(pchannel-reader-key pch)
|
||||
(lambda (v)
|
||||
(end-atomic)
|
||||
(un-message-ize v))
|
||||
|
|
Loading…
Reference in New Issue
Block a user