.
original commit: 73a717a5bbe4e70bba9355fd6cf45034223ee4a4
This commit is contained in:
parent
bad87cc12b
commit
1f21e73490
|
@ -3,87 +3,150 @@
|
|||
(require (lib "etc.ss")
|
||||
(lib "contract.ss"))
|
||||
|
||||
;; This library implements buffered channels with
|
||||
;; and optional buffer limit (so that puts block
|
||||
;; if the buffer is full).
|
||||
|
||||
;; We make a fancy structure just so an async-channel
|
||||
;; can be supplied directly to 'object-wait-multiple'.
|
||||
;; The alternative is to use `define-struct' and supply
|
||||
;; a `make-async-channel-get-waitable' procedure.
|
||||
(define-values (struct:ac make-ac async-channel? ac-ref ac-set!)
|
||||
(make-struct-type 'async-channel #f 4 0 #f
|
||||
(make-struct-type 'async-channel #f 5 0 #f
|
||||
(list (cons prop:waitable
|
||||
;; This is the guard that is called when
|
||||
;; we use an async-channel as a waitable
|
||||
;; (to get).
|
||||
(lambda (ac)
|
||||
;; Make sure queue manager is running:
|
||||
(thread-resume (ac-thread ac) (current-thread))
|
||||
(ac-dequeue-ch ac))))
|
||||
(async-channel-get-guard ac))))
|
||||
(current-inspector) #f))
|
||||
|
||||
(define ac-enqueue-ch (make-struct-field-accessor ac-ref 0))
|
||||
(define ac-dequeue-ch (make-struct-field-accessor ac-ref 1))
|
||||
(define ac-empty-ch (make-struct-field-accessor ac-ref 2))
|
||||
(define ac-thread (make-struct-field-accessor ac-ref 3))
|
||||
(define ac-full-ch (make-struct-field-accessor ac-ref 3))
|
||||
(define ac-thread (make-struct-field-accessor ac-ref 4))
|
||||
|
||||
;; Make ----------------------------------------
|
||||
|
||||
(define make-async-channel
|
||||
(opt-lambda ([limit #f])
|
||||
(let* ([enqueue-ch (make-channel)]
|
||||
[dequeue-ch (make-channel)]
|
||||
[empty-ch (make-channel)]
|
||||
[queue-first (cons #f null)]
|
||||
[queue-last queue-first]
|
||||
(let* ([enqueue-ch (make-channel)] ; for puts
|
||||
[dequeue-ch (make-channel)] ; for gets
|
||||
[empty-ch (make-channel)] ; for get polls
|
||||
[full-ch (make-channel)] ; for put polls
|
||||
[queue-first (cons #f null)] ; queue head
|
||||
[queue-last queue-first] ; queue tail
|
||||
;; Waitables:
|
||||
[tell-empty
|
||||
(make-channel-put-waitable empty-ch (make-semaphore))] ; see poll->ch
|
||||
[tell-full
|
||||
(make-channel-put-waitable full-ch (make-semaphore))] ; see poll->ch
|
||||
[enqueue (make-wrapped-waitable
|
||||
enqueue-ch
|
||||
(lambda (v)
|
||||
;; We received a put; enqueue it:
|
||||
(let ([p (cons #f null)])
|
||||
(set-car! queue-last v)
|
||||
(set-cdr! queue-last p)
|
||||
(set! queue-last p))))]
|
||||
[mk-dequeue
|
||||
(lambda ()
|
||||
(make-wrapped-waitable
|
||||
(make-channel-put-waitable dequeue-ch (car queue-first))
|
||||
(lambda (ignored)
|
||||
;; A get succeeded; dequeue it:
|
||||
(set! queue-first (cdr queue-first)))))]
|
||||
[manager-thread
|
||||
;; This thread is the part that makes the channel asynchronous.
|
||||
;; It waits for a combination of gets and puts as appropriate.
|
||||
;; Note that we start it with `thread/suspend-kill', and we
|
||||
;; resume the manager thread with the current thread everytime
|
||||
;; we want to talk to the manager thread, which effectively
|
||||
;; means that the manager thread is not bound by a custodian
|
||||
;; that is weaker than any of its user's custodians (and thus,
|
||||
;; from the user's perspective, is not bound by any custodian
|
||||
;; at all).
|
||||
(thread/suspend-to-kill
|
||||
(lambda ()
|
||||
(let loop ()
|
||||
(let ([mk-dequeue
|
||||
(lambda ()
|
||||
(make-wrapped-waitable
|
||||
(make-channel-put-waitable dequeue-ch (car queue-first))
|
||||
(lambda (ignored)
|
||||
(set! queue-first (cdr queue-first))
|
||||
(loop))))]
|
||||
[mk-enqueue
|
||||
(lambda ()
|
||||
(make-wrapped-waitable
|
||||
enqueue-ch
|
||||
(lambda (v)
|
||||
(let ([p (cons #f null)])
|
||||
(set-car! queue-last v)
|
||||
(set-cdr! queue-last p)
|
||||
(set! queue-last p)
|
||||
(loop)))))]
|
||||
[mk-empty
|
||||
(lambda ()
|
||||
(make-wrapped-waitable
|
||||
(make-channel-put-waitable empty-ch #f)
|
||||
(lambda (ignored)
|
||||
(loop))))])
|
||||
(cond
|
||||
[(= 1 (length queue-first))
|
||||
(object-wait-multiple #f (mk-enqueue) (mk-empty))]
|
||||
[(or (not limit) ((sub1 (length queue-first)) . < . limit))
|
||||
(object-wait-multiple #f (mk-enqueue) (mk-dequeue))]
|
||||
[else
|
||||
(object-wait-multiple #f (mk-dequeue))])))))])
|
||||
(make-ac enqueue-ch dequeue-ch empty-ch manager-thread))))
|
||||
(cond
|
||||
[(= 1 (length queue-first))
|
||||
;; The queue is currently empty:
|
||||
(object-wait-multiple #f enqueue tell-empty)]
|
||||
[(or (not limit) ((sub1 (length queue-first)) . < . limit))
|
||||
(object-wait-multiple #f enqueue (mk-dequeue))]
|
||||
[else
|
||||
(object-wait-multiple #f (mk-dequeue) tell-full)])
|
||||
(loop))))])
|
||||
(make-ac enqueue-ch dequeue-ch empty-ch full-ch manager-thread))))
|
||||
|
||||
;; Get ----------------------------------------
|
||||
|
||||
(define (async-channel-get-guard ac)
|
||||
;; Make sure queue manager is running:
|
||||
(thread-resume (ac-thread ac) (current-thread))
|
||||
;; If it the channel is being polled, it's not
|
||||
;; good enough to poll the dequeue channel, because
|
||||
;; the server thread may be looping. In that case,
|
||||
;; block on the dequeue channel and the empty
|
||||
;; channel, and create a new waitable to report
|
||||
;; the result.
|
||||
(make-poll-guard-waitable
|
||||
(lambda (poll?)
|
||||
(if poll?
|
||||
(poll->ch (ac-dequeue-ch ac) (ac-empty-ch ac))
|
||||
(ac-dequeue-ch ac)))))
|
||||
|
||||
(define (async-channel-get ac)
|
||||
(object-wait-multiple #f ac))
|
||||
|
||||
(define (async-channel-try-get ac)
|
||||
(object-wait-multiple #f ac (ac-empty-ch ac)))
|
||||
(object-wait-multiple 0 ac))
|
||||
|
||||
;; Put ----------------------------------------
|
||||
|
||||
(define (make-async-channel-put-waitable ac v)
|
||||
(letrec ([p
|
||||
(make-wrapped-waitable
|
||||
(make-guard-waitable
|
||||
(lambda ()
|
||||
;; Make sure queue manager is running:
|
||||
(thread-resume (ac-thread ac) (current-thread))
|
||||
(make-channel-put-waitable (ac-enqueue-ch ac) v)))
|
||||
(lambda (ignored) p))])
|
||||
(letrec ([p (make-wrapped-waitable
|
||||
(make-guard-waitable
|
||||
(lambda ()
|
||||
;; Make sure queue manager is running:
|
||||
(thread-resume (ac-thread ac) (current-thread))
|
||||
(let ([p (make-channel-put-waitable (ac-enqueue-ch ac) v)])
|
||||
;; Poll handling, as in `async-channel-get-guard':
|
||||
(make-poll-guard-waitable
|
||||
(lambda (poll?)
|
||||
(if poll?
|
||||
(poll->ch p (ac-full-ch ac))
|
||||
p))))))
|
||||
(lambda (ignored) p))])
|
||||
p))
|
||||
|
||||
(define (async-channel-put ac v)
|
||||
(object-wait-multiple #f (make-async-channel-put-waitable ac v))
|
||||
(thread-resume (ac-thread ac) (current-thread))
|
||||
(object-wait-multiple #f (make-channel-put-waitable (ac-enqueue-ch ac) v))
|
||||
(void))
|
||||
|
||||
;; Poll helper ----------------------------------------
|
||||
|
||||
(define (poll->ch normal not-ready)
|
||||
(object-wait-multiple #f
|
||||
;; If a value becomes available,
|
||||
;; create a waitable that returns
|
||||
;; the value:
|
||||
(make-wrapped-waitable
|
||||
normal
|
||||
(lambda (v)
|
||||
;; Return a waitable for a successful poll:
|
||||
(make-wrapped-waitable
|
||||
(make-semaphore 1)
|
||||
(lambda (ignored) v))))
|
||||
;; If not-ready becomes available,
|
||||
;; the result is supposed to be
|
||||
;; a never-ready waitable:
|
||||
not-ready))
|
||||
|
||||
;; Provides ----------------------------------------
|
||||
|
||||
(provide async-channel?)
|
||||
(provide/contract (make-async-channel (case->
|
||||
(-> async-channel?)
|
||||
|
|
Loading…
Reference in New Issue
Block a user