original commit: 8ab4460cea563b7cb47bc38ad7f087f671844295
This commit is contained in:
Robby Findler 2004-05-28 20:54:31 +00:00
parent 6e40114a1e
commit b5c6e65d04

View File

@ -1209,14 +1209,15 @@ WARNING: printf is rebound in the body of the unit to always
(define peeker-evt (semaphore-peek-evt peeker-sema)) (define peeker-evt (semaphore-peek-evt peeker-sema))
(define bytes-peeked 0) (define bytes-peeked 0)
(define response-evts '()) (define response-evts '())
(define waiters '()) (define peekers '())
(define committers '())
(define data (empty-queue)) (define data (empty-queue))
;; loop : -> alpha ;; loop : -> alpha
;; the main loop for this thread ;; the main loop for this thread
(define (loop) (define (loop)
(let-values ([(not-ready-peekers new-peek-response-evts) (let-values ([(not-ready-peekers new-peek-response-evts)
(separate service-waiter waiters)] (separate service-waiter peekers)]
[(potential-commits new-commit-response-evts) [(potential-commits new-commit-response-evts)
(separate (separate
(service-committer peeker-evt data) (service-committer peeker-evt data)
@ -1224,8 +1225,8 @@ WARNING: printf is rebound in the body of the unit to always
(set! peekers not-ready-peekers) (set! peekers not-ready-peekers)
(set! committers potential-commits) (set! committers potential-commits)
(set! response-evts (append response-evts (set! response-evts (append response-evts
new-peek-response-events new-peek-response-evts
new-commit-response-events)) new-commit-response-evts))
(sync (sync
(handle-evt (handle-evt
progress-event-chan progress-event-chan
@ -1239,12 +1240,12 @@ WARNING: printf is rebound in the body of the unit to always
response-evts)) response-evts))
(loop)))) (loop))))
(handle-evt (handle-evt
peek-evt peek-chan
(lambda (peeker) (lambda (peeker)
(set! peekers (cons peeker waiting-peekers)) (set! peekers (cons peeker peekers))
(loop))) (loop)))
(handle-evt (handle-evt
(channel-recv-evt commit-chan) commit-chan
(lambda (committer) (lambda (committer)
(set! committers (cons committer committers)) (set! committers (cons committer committers))
(loop))) (loop)))
@ -1332,7 +1333,7 @@ WARNING: printf is rebound in the body of the unit to always
[transformed '()] [transformed '()]
[left-alone '()]) [left-alone '()])
(cond (cond
[(null? peekers) (values left-alone transformed)] [(null? eles) (values left-alone transformed)]
[else (let* ([ele (car eles)] [else (let* ([ele (car eles)]
[maybe (f ele)]) [maybe (f ele)])
(if maybe-evt (if maybe-evt
@ -1399,7 +1400,7 @@ WARNING: printf is rebound in the body of the unit to always
;; the following must be able to run ;; the following must be able to run
;; in any thread (even concurrently) ;; in any thread (even concurrently)
;; ;;
(define (read-proc bstr) (define (read-bytes-proc bstr)
(let* ([progress-evt (progress-evt-proc)] (let* ([progress-evt (progress-evt-proc)]
[v (peek-proc bstr 0 progress-evt)]) [v (peek-proc bstr 0 progress-evt)])
(cond (cond
@ -1407,7 +1408,7 @@ WARNING: printf is rebound in the body of the unit to always
[(evt? v) (wrap-evt v (lambda (x) 0))] ; sync, then try again [(evt? v) (wrap-evt v (lambda (x) 0))] ; sync, then try again
[(and (number? v) (zero? v)) 0] ; try again [(and (number? v) (zero? v)) 0] ; try again
[else [else
(if (optional-commit-proc (if (number? v) v 1) (if (commit-proc (if (number? v) v 1)
progress-evt progress-evt
always-evt) always-evt)
v ; got a result v ; got a result
@ -1418,29 +1419,31 @@ WARNING: printf is rebound in the body of the unit to always
(lambda (nack) (lambda (nack)
(let ([chan (make-channel)]) (let ([chan (make-channel)])
(channel-put peek-chan (make-peeker bstr skip-count progress-evt chan nack)) (channel-put peek-chan (make-peeker bstr skip-count progress-evt chan nack))
(channel-recv-evt chan))))) chan))))
(define (progress-evt-proc) (define (progress-evt-proc)
(nack-guard-evt (nack-guard-evt
(lambda (nack) (lambda (nack)
(let ([c (make-channel)]) (let ([chan (make-channel)])
(channel-put process-event-chan (cons c nack)) (channel-put progress-event-chan (cons chan nack))
(channel-recv-evt c))))) chan))))
(define (optional-commit-proc kr progress-evt done-evt) (define (commit-proc kr progress-evt done-evt)
(sync (sync
(nack-guard-evt (nack-guard-evt
(lambda (nack) (lambda (nack)
(let ([chan (make-channel)]) (let ([chan (make-channel)])
(channel-put commit-chan (list kr progress-evt done-evt chan nack)) (channel-put commit-chan (list kr progress-evt done-evt chan nack))
(channel-recv-evt chan)))))) chan)))))
(define (in-close-proc) (void)) (define (close-proc) (void))
(set! in-port (make-input-port this (set! in-port (make-input-port this
read-bytes-proc read-bytes-proc
#f peek-proc
in-close-proc))) close-proc
progress-evt-proc
commit-proc)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;