original commit: c1fa2d671f47f4cce2e30456b68f5acd076ce7b9
This commit is contained in:
Robby Findler 2004-05-19 20:17:07 +00:00
parent 2ecf76485e
commit df61ab8623

View File

@ -920,7 +920,7 @@ WARNING: printf is rebound in the body of the unit to always
(define/public (clear-input-port)
(channel-put clear-input-chan (void))
(make-input-port))
(init-input-port))
(define/public (clear-output-ports)
(channel-put clear-output-chan (void))
@ -1029,8 +1029,8 @@ WARNING: printf is rebound in the body of the unit to always
;; (channel ...)))
(define readers-chan (make-channel))
;; readers-waiting-chan : (channel (channel boolean))
(define readers-waiting-chan (make-channel))
;; progress-event-chan : (channel (channel event))
(define progress-event-chan (make-channel))
;; queue-insertion : (listof (cons (union string snip) style-delta)) evt -> void
;; txt is in the reverse order of the things to be inserted.
@ -1076,77 +1076,69 @@ WARNING: printf is rebound in the body of the unit to always
(define input-buffer-thread
(thread
(lambda ()
(define (data-waiting data)
(sync
(finish-evt
readers-waiting-chan
(lambda (result)
(channel-put result #t)
(data-waiting data)))
(finish-evt
clear-input-chan
(lambda (_)
(data-and-readers-waiting (empty-queue) (empty-queue))))
(finish-evt
read-chan
(lambda (new-data)
(data-waiting (enqueue new-data data))))
(finish-evt
readers-chan
(lambda (new-reader)
(data-and-readers-waiting data (enqueue new-reader (empty-queue)))))))
(define (readers-waiting readers)
(sync
(finish-evt
clear-input-chan
(lambda (_)
(data-and-readers-waiting (empty-queue) (empty-queue))))
(finish-evt
readers-waiting-chan
(lambda (result)
(channel-put result #f)
(readers-waiting readers)))
(finish-evt
read-chan
(lambda (new-data)
(data-and-readers-waiting (enqueue new-data (empty-queue)) readers)))
(finish-evt
readers-chan
(lambda (new-reader)
(readers-waiting (enqueue new-reader readers))))))
(define (data-and-readers-waiting data readers)
(cond
[(queue-empty? data) (readers-waiting readers)]
[(queue-empty? readers) (data-waiting data)]
[else
(let* ([data-hd (queue-first data)]
[reader-hd (queue-first readers)]
[reader-succeed (car reader-hd)]
[reader-fail (cadr reader-hd)])
(sync
(finish-evt
clear-input-chan
(lambda (_)
(data-and-readers-waiting (empty-queue) (empty-queue))))
(finish-evt
readers-waiting-chan
(lambda (result)
(channel-put result #t)
(data-and-readers-waiting data readers)))
(finish-evt
(channel-put-evt reader-succeed data-hd)
(lambda (v)
(data-and-readers-waiting (queue-rest data)
(queue-rest readers))))
(finish-evt
reader-fail
(lambda (v)
(data-and-readers-waiting data
(queue-rest readers))))))]))
(data-and-readers-waiting (empty-queue) (empty-queue)))))
(let loop ([data (empty-queue)]
[readers (empty-queue)]
[peeker-sema #f]
[peeker-evt #f])
(let ([send-data-evt
(if (or (queue-empty? data)
(queue-empty? readers))
never-evt
(let* ([data-hd (queue-first data)]
[reader-hd (queue-first readers)]
[reader-succeed (car reader-hd)]
[reader-fail (cadr reader-hd)])
(choice-evt
(finish-evt
(channel-put-evt reader-succeed data-hd)
(lambda (v)
(semaphore-post peeker-sema)
(loop (queue-rest data) (queue-rest readers) #f #f)))
(finish-evt
reader-fail
(lambda (v)
(loop data (queue-rest readers) peeker-sema peeker-evt))))))])
(sync
send-data-evt
(finish-evt
read-chan
(lambda (new-data)
(loop (enqueue new-data data) readers peeker-sema peeker-evt)))
(finish-evt
readers-chan
(lambda (new-reader)
(loop data (enqueue new-reader readers) peeker-sema peeker-evt)))
(finish-evt
progress-event-chan
(lambda (return-chan)
(let* ([peeker-sema (or peeker-sema (make-semaphore 0))]
[peeker-evt (or peeker-evt (semaphore-peek-evt peeker-sema))])
(thread (lambda () (channel-put return-chan peeker-evt)))
(loop data
readers
peeker-sema
peeker-evt))))
(finish-evt
peek-evt
(lambda (vals)
(let ([bytes (first vals)]
[skip-count (second vals)]
[pe (third vals)]
[resp (fourth vals)])
(cond
[(not (eq? pe peeker-evt))
(thread (lambda () (channel-put resp #f)))]
[(queue-has-n? data (+ skip-count 1))
(let ([nth (queue-nth data (+ skip-count 1))])
(if (byte? nth)
(begin (byte-set! bytes 0 fst)
(thread (lambda () (channel-put resp 1))))
...special...))]
[else
))))))
(define output-buffer-thread
(let ([buffer-full 40]
@ -1263,29 +1255,30 @@ WARNING: printf is rebound in the body of the unit to always
;; the following must be able to run
;; in any thread (even concurrently)
;;
(define (read-bytes-proc bytes)
;; this shouldn't return 0. it should return a evt and
;; let the system block and then re-call into this thing.
;; yuck.
(let ([readers-waiting-answer-chan (make-channel)])
(channel-put readers-waiting-chan readers-waiting-answer-chan)
(if (channel-get readers-waiting-answer-chan)
(let ([s/c
(sync
(nack-guard-evt
(lambda (fail-channel)
(let ([return-channel (make-channel)])
(channel-put readers-chan (list return-channel fail-channel))
return-channel))))])
(cond
[(byte? s/c)
(bytes-set! bytes 0 s/c)
1]
[(eof-object? s/c) s/c]
[else
(lambda (src line column position)
(values s/c 1))]))
0)))
(define (read-proc bstr)
(let* ([progress-evt (progress-evt-proc)]
[v (peek-proc bstr 0 progress-evt)])
(cond
[(sync/timeout 0 progress-evt) 0] ; try again
[(evt? v) (convert-evt v (lambda (x) 0))] ; sync, then try again
[(and (number? v) (zero? v)) 0] ; try again
[else
(if (optional-commit-proc (if (number? v) v 1)
progress-evt
always-evt)
v ; got a result
0)]))) ; try again
(define (peek-proc bstr skip-count progress-evt)
...)
(define (progress-evt-proc)
(let ([c (make-channel)])
(channel-put process-event-chan c)
(channel-get c)))
(define (optional-commit-proc ...)
...)
(define (in-close-proc) (void))