.
original commit: d17e7a03f3c4d549bcdc4529acb30e6dc8478fa3
This commit is contained in:
parent
df61ab8623
commit
fff6db208c
|
@ -855,6 +855,9 @@ WARNING: printf is rebound in the body of the unit to always
|
|||
get-err-port
|
||||
get-value-port))
|
||||
|
||||
(define-struct peeker (bytes skip-count pe resp-chan))
|
||||
(define-struct peeker-req (bytes skip-count pe resp-chan resp-nack))
|
||||
|
||||
(define ports-mixin
|
||||
(mixin ((class->interface text%) #;scheme:text<%>) (ports<%>)
|
||||
(inherit begin-edit-sequence
|
||||
|
@ -1076,70 +1079,92 @@ WARNING: printf is rebound in the body of the unit to always
|
|||
(define input-buffer-thread
|
||||
(thread
|
||||
(lambda ()
|
||||
(let loop ([data (empty-queue)]
|
||||
[readers (empty-queue)]
|
||||
[peeker-sema #f]
|
||||
[peeker-evt #f])
|
||||
(let ([send-data-evt
|
||||
(if (or (queue-empty? data)
|
||||
(queue-empty? readers))
|
||||
never-evt
|
||||
(let* ([data-hd (queue-first data)]
|
||||
[reader-hd (queue-first readers)]
|
||||
[reader-succeed (car reader-hd)]
|
||||
[reader-fail (cadr reader-hd)])
|
||||
(choice-evt
|
||||
(finish-evt
|
||||
(channel-put-evt reader-succeed data-hd)
|
||||
(lambda (v)
|
||||
(semaphore-post peeker-sema)
|
||||
(loop (queue-rest data) (queue-rest readers) #f #f)))
|
||||
(finish-evt
|
||||
reader-fail
|
||||
(lambda (v)
|
||||
(loop data (queue-rest readers) peeker-sema peeker-evt))))))])
|
||||
|
||||
;; these vars are like arguments to the loop function
|
||||
;; they are only set right before loop is called.
|
||||
;; This is done to avoid passing the same arguments
|
||||
;; over and over to loop.
|
||||
(define peeker-sema (make-semaphore 0))
|
||||
(define peeker-evt (semaphore-peek-evt peeker-sema))
|
||||
(define bytes-peeked 0)
|
||||
(define response-evts '())
|
||||
(define waiters '())
|
||||
(define data (empty-queue))
|
||||
(let loop ()
|
||||
|
||||
(define (service-waiter peeker)
|
||||
;; need a nack for the response!
|
||||
(let* ([bytes (peeker-bytes peeker)]
|
||||
[skip-count (peeker-count peeker)]
|
||||
[pe (peeker-pe peeker)]
|
||||
[resp-chan (peeker-resp-chan peeker)])
|
||||
(cond
|
||||
[(not (eq? pe peeker-evt))
|
||||
(make-channel-put-evt resp-chan #f)]
|
||||
[(queue-has-n? data (+ skip-count 1))
|
||||
(let ([nth (queue-nth data (+ skip-count 1))])
|
||||
(if (byte? nth)
|
||||
(begin
|
||||
(byte-set! bytes 0 fst)
|
||||
(make-channel-put-evt resp-chan 1))
|
||||
(build-answer-evt
|
||||
(make-channel-put-evt
|
||||
resp-chan
|
||||
(lambda (src line col pos)
|
||||
nth)))))]
|
||||
[else #f])))
|
||||
|
||||
(let-values ([(not-ready-peekers extended-response-evts)
|
||||
(let loop ([peekers peekers]
|
||||
[response-evts response-evts]
|
||||
[not-ready-peekers '()])
|
||||
(cond
|
||||
[(null? peekers) (values response-evts not-read-peekers)]
|
||||
[else (let* ([peeker (car peekers)]
|
||||
[maybe-evt (server-waiter peeker)])
|
||||
(if maybe-evt
|
||||
(loop (cdr peekers)
|
||||
(cons maybe-evt response-evts)
|
||||
not-ready-peekers)
|
||||
(loop (cdr peekers)
|
||||
response-evts
|
||||
(cons peeker not-ready-peekers))))]))])
|
||||
(sync
|
||||
send-data-evt
|
||||
(finish-evt
|
||||
read-chan
|
||||
(lambda (new-data)
|
||||
(loop (enqueue new-data data) readers peeker-sema peeker-evt)))
|
||||
(finish-evt
|
||||
readers-chan
|
||||
(lambda (new-reader)
|
||||
(loop data (enqueue new-reader readers) peeker-sema peeker-evt)))
|
||||
(finish-evt
|
||||
progress-event-chan
|
||||
(lambda (return-chan)
|
||||
(let* ([peeker-sema (or peeker-sema (make-semaphore 0))]
|
||||
[peeker-evt (or peeker-evt (semaphore-peek-evt peeker-sema))])
|
||||
(thread (lambda () (channel-put return-chan peeker-evt)))
|
||||
(loop data
|
||||
readers
|
||||
peeker-sema
|
||||
peeker-evt))))
|
||||
(lambda (return-pr)
|
||||
(let ([return-chan (car return-pr)]
|
||||
[return-nack (cdr return-pr)])
|
||||
(set! response-evts
|
||||
(cons (choice-evt
|
||||
return-nack
|
||||
(channel-put-evt return-chan peeker-evt))
|
||||
response-evts))
|
||||
(loop))))
|
||||
(finish-evt
|
||||
peek-evt
|
||||
(lambda (vals)
|
||||
(let ([bytes (first vals)]
|
||||
[skip-count (second vals)]
|
||||
[pe (third vals)]
|
||||
[resp (fourth vals)])
|
||||
(cond
|
||||
[(not (eq? pe peeker-evt))
|
||||
(thread (lambda () (channel-put resp #f)))]
|
||||
[(queue-has-n? data (+ skip-count 1))
|
||||
(let ([nth (queue-nth data (+ skip-count 1))])
|
||||
(if (byte? nth)
|
||||
(begin (byte-set! bytes 0 fst)
|
||||
(thread (lambda () (channel-put resp 1))))
|
||||
...special...))]
|
||||
[else
|
||||
|
||||
|
||||
|
||||
))))))
|
||||
|
||||
(lambda (peek-request)
|
||||
(let ([bytes (peek-req-bytes peek-request)]
|
||||
[skip-count (peek-req-skip-count peek-request)]
|
||||
[pe (peek-req-pe peek-request)]
|
||||
[resp-chan (peek-req-resp-chan request)]
|
||||
[resp-nack (peek-req-resp-nack request)])
|
||||
(set! peekers
|
||||
(cons (make-peeker bytes
|
||||
skip-count
|
||||
pe
|
||||
resp-chan
|
||||
resp-nack)
|
||||
waiting-peekers))
|
||||
(loop))))
|
||||
(apply choice-evt
|
||||
(map (lambda (resp-evt)
|
||||
(finish-evt
|
||||
(lambda (_)
|
||||
(set! response-evts (remq resp-evt response-evts))
|
||||
(loop))))
|
||||
response-evts))))))))
|
||||
|
||||
(define output-buffer-thread
|
||||
(let ([buffer-full 40]
|
||||
[converter (bytes-open-converter "UTF-8-permissive" "UTF-8")])
|
||||
|
|
Loading…
Reference in New Issue
Block a user