diff --git a/collects/mzlib/async-channel.ss b/collects/mzlib/async-channel.ss index d51cb97..12f5c7a 100644 --- a/collects/mzlib/async-channel.ss +++ b/collects/mzlib/async-channel.ss @@ -4,7 +4,7 @@ (lib "contract.ss")) (define-values (struct:ac make-ac async-channel? ac-ref ac-set!) - (make-struct-type 'async-channel #f 3 0 #f + (make-struct-type 'async-channel #f 4 0 #f (list (cons prop:waitable ;; This is the guard that is called when ;; we use an async-channel as a waitable @@ -13,16 +13,18 @@ ;; Make sure queue manager is running: (thread-resume (ac-thread ac) (current-thread)) (ac-dequeue-ch ac)))) - (current-inspector) #f '(0))) + (current-inspector) #f)) (define ac-enqueue-ch (make-struct-field-accessor ac-ref 0)) (define ac-dequeue-ch (make-struct-field-accessor ac-ref 1)) - (define ac-thread (make-struct-field-accessor ac-ref 2)) + (define ac-empty-ch (make-struct-field-accessor ac-ref 2)) + (define ac-thread (make-struct-field-accessor ac-ref 3)) (define make-async-channel (opt-lambda ([limit #f]) (let* ([enqueue-ch (make-channel)] [dequeue-ch (make-channel)] + [empty-ch (make-channel)] [queue-first (cons #f null)] [queue-last queue-first] [manager-thread @@ -41,31 +43,42 @@ (make-wrapped-waitable enqueue-ch (lambda (v) - (let ([p (cons v null)]) + (let ([p (cons #f null)]) + (set-car! queue-last v) (set-cdr! queue-last p) (set! queue-last p) - (loop)))))]) + (loop)))))] + [mk-empty + (lambda () + (make-wrapped-waitable + (make-channel-put-waitable empty-ch #f) + (lambda (ignored) + (loop))))]) (cond [(= 1 (length queue-first)) - (object-wait-multiple #f (mk-enqueue))] + (object-wait-multiple #f (mk-enqueue) (mk-empty))] [(or (not limit) ((sub1 (length queue-first)) . < . limit)) (object-wait-multiple #f (mk-enqueue) (mk-dequeue))] [else (object-wait-multiple #f (mk-dequeue))])))))]) - (make-ac enqueue-ch dequeue-ch manager-thread)))) + (make-ac enqueue-ch dequeue-ch empty-ch manager-thread)))) (define (async-channel-get ac) (object-wait-multiple #f ac)) (define (async-channel-try-get ac) - (object-wait-multiple 0 ac)) + (object-wait-multiple #f ac (ac-empty-ch ac))) (define (make-async-channel-put-waitable ac v) - (make-guard-waitable - (lambda () - ;; Make sure queue manager is running: - (thread-resume (ac-thread ac) (current-thread)) - (make-channel-put-waitable (ac-enqueue-ch ac) v)))) + (letrec ([p + (make-wrapped-waitable + (make-guard-waitable + (lambda () + ;; Make sure queue manager is running: + (thread-resume (ac-thread ac) (current-thread)) + (make-channel-put-waitable (ac-enqueue-ch ac) v))) + (lambda (ignored) p))]) + p)) (define (async-channel-put ac v) (object-wait-multiple #f (make-async-channel-put-waitable ac v))