diff --git a/collects/framework/private/text.ss b/collects/framework/private/text.ss index 05381023..f969a4d7 100644 --- a/collects/framework/private/text.ss +++ b/collects/framework/private/text.ss @@ -9,6 +9,7 @@ WARNING: printf is rebound in the body of the unit to always (require (lib "string-constant.ss" "string-constants") (lib "unitsig.ss") (lib "class.ss") + (lib "plt-match.ss") "sig.ss" "../macro.ss" "../gui-utils.ss" @@ -1081,17 +1082,17 @@ WARNING: printf is rebound in the body of the unit to always (let loop (;; text-to-insert : (queue (cons (union snip bytes) style)) [text-to-insert (empty-queue)]) (sync - (finish-evt + (handle-evt flush-chan (lambda (return-evt) (let-values ([(viable-bytes remaining-queue) (split-queue converter text-to-insert)]) (queue-insertion viable-bytes return-evt) (loop remaining-queue)))) - (finish-evt + (handle-evt clear-output-chan (lambda (_) (loop (empty-queue)))) - (finish-evt + (handle-evt write-chan (lambda (pr) (cond @@ -1146,13 +1147,14 @@ WARNING: printf is rebound in the body of the unit to always (define (make-write-special-proc style) (lambda (bytes start-i end-i can-buffer? enable-breaks?) + #; (cond - [(eq? (current-thread) (eventspace-handler-thread eventspace)) - (error 'write-bytes-proc "cannot write to port on eventspace main thread")] - [else - (if (is-a? special snip%) - (channel-put write-chan (cons special style)) - (channel-put write-chan (cons (string->bytes/utf-8 (format "~s" special)) style)))]) + [(eq? (current-thread) (eventspace-handler-thread eventspace)) + (error 'write-bytes-proc "cannot write to port on eventspace main thread")] + [else + (if (is-a? special snip%) + (channel-put write-chan (cons special style)) + (channel-put write-chan (cons (string->bytes/utf-8 (format "~s" special)) style)))]) (- start-i end-i))) (define out-sd (make-object style-delta% 'change-normal)) @@ -1192,6 +1194,9 @@ WARNING: printf is rebound in the body of the unit to always ;; peek-chan : (channel peeker) (define peek-chan (make-channel)) + ;; commit-chan : (channel committer) + (define commit-chan (make-channel)) + (define input-buffer-thread (thread (lambda () @@ -1206,55 +1211,23 @@ WARNING: printf is rebound in the body of the unit to always (define response-evts '()) (define waiters '()) (define data (empty-queue)) - (let loop () - - ;; service-waiter : peeker -> (union #f evt) - ;; if the peeker can be serviced, build an event to service it - ;; otherwise return #f - (define (service-waiter peeker) - (let* ([bytes (peeker-bytes peeker)] - [skip-count (peeker-count peeker)] - [pe (peeker-pe peeker)] - [resp-chan (peeker-resp-chan peeker)] - [nack-evt (peeker-nack-evt peeker)] - (cond - [(not (eq? pe peeker-evt)) - (choice-evt (make-channel-put-evt resp-chan #f) - nack-evt)] - [(queue-has-n? data (+ skip-count 1)) - (let ([nth (queue-nth data (+ skip-count 1))]) - (choice-evt - nack-evt - (if (byte? nth) - (begin - (byte-set! bytes 0 fst) - (make-channel-put-evt resp-chan 1)) - (build-answer-evt - (make-channel-put-evt - resp-chan - (lambda (src line col pos) - nth))))))] - [else #f])))) - - (let-values ([(not-ready-peekers new-response-evts) - (let loop ([peekers peekers] - [response-evts response-evts] - [not-ready-peekers '()]) - (cond - [(null? peekers) (values response-evts not-read-peekers)] - [else (let* ([peeker (car peekers)] - [maybe-evt (server-waiter peeker)]) - (if maybe-evt - (loop (cdr peekers) - (cons maybe-evt response-evts) - not-ready-peekers) - (loop (cdr peekers) - response-evts - (cons peeker not-ready-peekers))))]))]) + + ;; loop : -> alpha + ;; the main loop for this thread + (define (loop) + (let-values ([(not-ready-peekers new-peek-response-evts) + (separate service-waiter waiters)] + [(potential-commits new-commit-response-evts) + (separate + (service-committer peeker-evt data) + committers)]) (set! peekers not-ready-peekers) - (set! response-evts new-response-events) + (set! committers potential-commits) + (set! response-evts (append response-evts + new-peek-response-events + new-commit-response-events)) (sync - (finish-evt + (handle-evt progress-event-chan (lambda (return-pr) (let ([return-chan (car return-pr)] @@ -1265,18 +1238,159 @@ WARNING: printf is rebound in the body of the unit to always (channel-put-evt return-chan peeker-evt)) response-evts)) (loop)))) - (finish-evt + (handle-evt peek-evt (lambda (peeker) (set! peekers (cons peeker waiting-peekers)) (loop))) + (handle-evt + (channel-recv-evt commit-chan) + (lambda (committer) + (set! committers (cons committer committers)) + (loop))) + (apply + choice-evt + (map + (lambda (committer) + (match (make-committer kr + commit-peeker-evt + done-evt + resp-chan + resp-nack) + (choice-evt + (handle-evt + commit-peeker-evt + (lambda (_) + ;; this committer will be thrown out in next iteration + (loop))) + (handle-evt + done-evt + (lambda (v) + (set! data (drop-some-data data)) + (semaphore-post peeker-sema) + (set! peeker-sema (make-semaphore 0)) + (set! peeker-evt (semaphore-peek-evt peeker-sema)) + (set! committers (remq committer committers)) + (set! resp-evts + (cons + (choice-evt + resp-nack + (channel-put-evt resp-chan #t)) + resp-evts)) + (loop)))))) + committers)) + (apply + choice-evt + (map + (lambda (committer) + (match (make-committer kr commit-peeker-evt + done-evt resp-chan resp-nack) + (let ([size (queue-size data)]) + (cond + [(not (eq? peeker-evt commit-peeker-evt)) + (set! resp-evts + (cons + (choice-evt + resp-nack + (channel-put-evt resp-chan #f)) + resp-evts)) + (loop)] + [(< size kr) + (set! resp-evts + (cons + (choice-evt + resp-nack + (channel-put-evt resp-chan 'commit-failure)) + resp-evts)) + (loop)] + [else ;; commit succeeds + (semaphore-post peeker-sema) + (set! peeker-sema (make-semaphore 0)) + (set! peeker-evt (semaphore-peek-evt peeker-sema)) + (set! data (dequeue-n kr data)) + (set! resp-evts + (cons + (choice-evt + resp-nack + (channel-put-evt resp-chan #t)) + resp-evts)) + (loop)])))) + committers)) (apply choice-evt (map (lambda (resp-evt) - (finish-evt + (handle-evt (lambda (_) (set! response-evts (remq resp-evt response-evts)) (loop)))) - response-evts)))))))) + response-evts))))) + + ;; separate (listof X) (X -> (union #f Y)) -> (values (listof X) (listof Y)) + ;; separates `eles' into two lists -- those that `f' returns #f for + ;; and then the results of calling `f' for those where `f' doesn't return #f + (define (separate eles f) + (let loop ([eles eles] + [transformed '()] + [left-alone '()]) + (cond + [(null? peekers) (values left-alone transformed)] + [else (let* ([ele (car eles)] + [maybe (f ele)]) + (if maybe-evt + (loop (cdr eles) + (cons maybe transformed) + left-alone) + (loop (cdr eles) + transformed + (cons ele left-alone))))]))) + + ;; service-committer : queue evt -> committer -> (union #f evt) + ;; if the committer can be dumped, return an evt that + ;; does the dumping. otherwise, return #f + (define ((service-committer data peeker-evt) committer) + (match (make-committer kr commit-peeker-evt + done-evt resp-chan resp-nack) + (let ([size (queue-size data)]) + (cond + [(not (eq? peeker-evt commit-peeker-evt)) + (choice-evt + resp-nack + (channel-put-evt resp-chan #f))] + [(< size kr) + (choice-evt + resp-nack + (channel-put-evt resp-chan 'commit-failure))] + [else ;; commit succeeds + #f])))) + + ;; service-waiter : peeker -> (union #f evt) + ;; if the peeker can be serviced, build an event to service it + ;; otherwise return #f + (define (service-waiter peeker) + (let* ([bytes (peeker-bytes peeker)] + [skip-count (peeker-count peeker)] + [pe (peeker-pe peeker)] + [resp-chan (peeker-resp-chan peeker)] + [nack-evt (peeker-nack-evt peeker)] + (cond + [(not (eq? pe peeker-evt)) + (choice-evt (make-channel-put-evt resp-chan #f) + nack-evt)] + [(queue-has-n? data (+ skip-count 1)) + (let ([nth (queue-nth data (+ skip-count 1))]) + (choice-evt + nack-evt + (if (byte? nth) + (begin + (byte-set! bytes 0 fst) + (make-channel-put-evt resp-chan 1)) + (build-answer-evt + (make-channel-put-evt + resp-chan + (lambda (src line col pos) + nth))))))] + [else #f])))) + + (loop)))) (define/private (init-input-port) @@ -1290,7 +1404,7 @@ WARNING: printf is rebound in the body of the unit to always [v (peek-proc bstr 0 progress-evt)]) (cond [(sync/timeout 0 progress-evt) 0] ; try again - [(evt? v) (convert-evt v (lambda (x) 0))] ; sync, then try again + [(evt? v) (wrap-evt v (lambda (x) 0))] ; sync, then try again [(and (number? v) (zero? v)) 0] ; try again [else (if (optional-commit-proc (if (number? v) v 1) @@ -1304,21 +1418,22 @@ WARNING: printf is rebound in the body of the unit to always (lambda (nack) (let ([chan (make-channel)]) (channel-put peek-chan (make-peeker bstr skip-count progress-evt chan nack)) - (channel-get-evt chan))))) + (channel-recv-evt chan))))) (define (progress-evt-proc) (nack-guard-evt (lambda (nack) (let ([c (make-channel)]) (channel-put process-event-chan (cons c nack)) - (channel-get-evt c))))) + (channel-recv-evt c))))) (define (optional-commit-proc kr progress-evt done-evt) - (nack-guard-evt - (lambda (nack) - (let ([chan (make-channel)]) - (channel-put commit-chan (make-committer kr progress-evt done-evt chan nack)) - (channel-get-evt chan))))) + (sync + (nack-guard-evt + (lambda (nack) + (let ([chan (make-channel)]) + (channel-put commit-chan (list kr progress-evt done-evt chan nack)) + (channel-recv-evt chan)))))) (define (in-close-proc) (void)) @@ -1421,6 +1536,7 @@ WARNING: printf is rebound in the body of the unit to always (init-input-port) (init-output-ports) (super-new))) + #| (define (drscheme-pretty-print-size-hook x _ port) (and (or (eq? port this-out)