original commit: 76f27c9ab9282d5437d058897201bf816c37523f
This commit is contained in:
Robby Findler 2004-05-25 13:59:11 +00:00
parent e927bacea3
commit 492e73514c

View File

@ -1006,10 +1006,9 @@ WARNING: printf is rebound in the body of the unit to always
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; syncronization code
;; output port syncronization code
;;
;; flush-chan : (channel (evt void))
;; signals that the buffer-thread should flush pending output
;; the evt inside is waited on to indicate the flush has occurred
@ -1032,9 +1031,6 @@ WARNING: printf is rebound in the body of the unit to always
;; (channel ...)))
(define readers-chan (make-channel))
;; progress-event-chan : (channel (channel event))
(define progress-event-chan (make-channel))
;; queue-insertion : (listof (cons (union string snip) style-delta)) evt -> void
;; txt is in the reverse order of the things to be inserted.
;; the evt is waited on when the text has actually been inserted
@ -1076,95 +1072,7 @@ WARNING: printf is rebound in the body of the unit to always
(lock locked?)
(end-edit-sequence)))
(define input-buffer-thread
(thread
(lambda ()
;; these vars are like arguments to the loop function
;; they are only set right before loop is called.
;; This is done to avoid passing the same arguments
;; over and over to loop.
(define peeker-sema (make-semaphore 0))
(define peeker-evt (semaphore-peek-evt peeker-sema))
(define bytes-peeked 0)
(define response-evts '())
(define waiters '())
(define data (empty-queue))
(let loop ()
(define (service-waiter peeker)
;; need a nack for the response!
(let* ([bytes (peeker-bytes peeker)]
[skip-count (peeker-count peeker)]
[pe (peeker-pe peeker)]
[resp-chan (peeker-resp-chan peeker)])
(cond
[(not (eq? pe peeker-evt))
(make-channel-put-evt resp-chan #f)]
[(queue-has-n? data (+ skip-count 1))
(let ([nth (queue-nth data (+ skip-count 1))])
(if (byte? nth)
(begin
(byte-set! bytes 0 fst)
(make-channel-put-evt resp-chan 1))
(build-answer-evt
(make-channel-put-evt
resp-chan
(lambda (src line col pos)
nth)))))]
[else #f])))
(let-values ([(not-ready-peekers extended-response-evts)
(let loop ([peekers peekers]
[response-evts response-evts]
[not-ready-peekers '()])
(cond
[(null? peekers) (values response-evts not-read-peekers)]
[else (let* ([peeker (car peekers)]
[maybe-evt (server-waiter peeker)])
(if maybe-evt
(loop (cdr peekers)
(cons maybe-evt response-evts)
not-ready-peekers)
(loop (cdr peekers)
response-evts
(cons peeker not-ready-peekers))))]))])
(sync
(finish-evt
progress-event-chan
(lambda (return-pr)
(let ([return-chan (car return-pr)]
[return-nack (cdr return-pr)])
(set! response-evts
(cons (choice-evt
return-nack
(channel-put-evt return-chan peeker-evt))
response-evts))
(loop))))
(finish-evt
peek-evt
(lambda (peek-request)
(let ([bytes (peek-req-bytes peek-request)]
[skip-count (peek-req-skip-count peek-request)]
[pe (peek-req-pe peek-request)]
[resp-chan (peek-req-resp-chan request)]
[resp-nack (peek-req-resp-nack request)])
(set! peekers
(cons (make-peeker bytes
skip-count
pe
resp-chan
resp-nack)
waiting-peekers))
(loop))))
(apply choice-evt
(map (lambda (resp-evt)
(finish-evt
(lambda (_)
(set! response-evts (remq resp-evt response-evts))
(loop))))
response-evts))))))))
(define output-buffer-thread
(let ([buffer-full 40]
[converter (bytes-open-converter "UTF-8-permissive" "UTF-8")])
@ -1272,6 +1180,103 @@ WARNING: printf is rebound in the body of the unit to always
flush-proc
out-close-proc
(make-write-special-proc value-sd))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;
;; input port sync code
;;
;; progress-event-chan : (channel (cons (channel event) nack-evt)))
(define progress-event-chan (make-channel))
;; peek-chan : (channel peeker)
(define peek-chan (make-channel))
(define input-buffer-thread
(thread
(lambda ()
;; these vars are like arguments to the loop function
;; they are only set right before loop is called.
;; This is done to avoid passing the same arguments
;; over and over to loop.
(define peeker-sema (make-semaphore 0))
(define peeker-evt (semaphore-peek-evt peeker-sema))
(define bytes-peeked 0)
(define response-evts '())
(define waiters '())
(define data (empty-queue))
(let loop ()
;; service-waiter : peeker -> (union #f evt)
;; if the peeker can be serviced, build an event to service it
;; otherwise return #f
(define (service-waiter peeker)
(let* ([bytes (peeker-bytes peeker)]
[skip-count (peeker-count peeker)]
[pe (peeker-pe peeker)]
[resp-chan (peeker-resp-chan peeker)]
[nack-evt (peeker-nack-evt peeker)]
(cond
[(not (eq? pe peeker-evt))
(choice-evt (make-channel-put-evt resp-chan #f)
nack-evt)]
[(queue-has-n? data (+ skip-count 1))
(let ([nth (queue-nth data (+ skip-count 1))])
(choice-evt
nack-evt
(if (byte? nth)
(begin
(byte-set! bytes 0 fst)
(make-channel-put-evt resp-chan 1))
(build-answer-evt
(make-channel-put-evt
resp-chan
(lambda (src line col pos)
nth))))))]
[else #f]))))
(let-values ([(not-ready-peekers new-response-evts)
(let loop ([peekers peekers]
[response-evts response-evts]
[not-ready-peekers '()])
(cond
[(null? peekers) (values response-evts not-read-peekers)]
[else (let* ([peeker (car peekers)]
[maybe-evt (server-waiter peeker)])
(if maybe-evt
(loop (cdr peekers)
(cons maybe-evt response-evts)
not-ready-peekers)
(loop (cdr peekers)
response-evts
(cons peeker not-ready-peekers))))]))])
(set! peekers not-ready-peekers)
(set! response-evts new-response-events)
(sync
(finish-evt
progress-event-chan
(lambda (return-pr)
(let ([return-chan (car return-pr)]
[return-nack (cdr return-pr)])
(set! response-evts
(cons (choice-evt
return-nack
(channel-put-evt return-chan peeker-evt))
response-evts))
(loop))))
(finish-evt
peek-evt
(lambda (peeker)
(set! peekers (cons peeker waiting-peekers))
(loop)))
(apply choice-evt
(map (lambda (resp-evt)
(finish-evt
(lambda (_)
(set! response-evts (remq resp-evt response-evts))
(loop))))
response-evts))))))))
(define/private (init-input-port)
@ -1295,15 +1300,25 @@ WARNING: printf is rebound in the body of the unit to always
0)]))) ; try again
(define (peek-proc bstr skip-count progress-evt)
...)
(nack-guard-evt
(lambda (nack)
(let ([chan (make-channel)])
(channel-put peek-chan (make-peeker bstr skip-count progress-evt chan nack))
(channel-get-evt chan)))))
(define (progress-evt-proc)
(let ([c (make-channel)])
(channel-put process-event-chan c)
(channel-get c)))
(nack-guard-evt
(lambda (nack)
(let ([c (make-channel)])
(channel-put process-event-chan (cons c nack))
(channel-get-evt c)))))
(define (optional-commit-proc ...)
...)
(define (optional-commit-proc kr progress-evt done-evt)
(nack-guard-evt
(lambda (nack)
(let ([chan (make-channel)])
(channel-put commit-chan (make-committer kr progress-evt done-evt chan nack))
(channel-get-evt chan)))))
(define (in-close-proc) (void))