From 21e9e21eca52e2fd15f0bfefb069e5c72ac7578d Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Tue, 22 Jul 2003 21:16:16 +0000 Subject: [PATCH] . original commit: 0c20f0c3c777b4b9bfd0e9eb849aa10676815b3c --- collects/mzlib/thread.ss | 110 ++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 37 deletions(-) diff --git a/collects/mzlib/thread.ss b/collects/mzlib/thread.ss index 8749569..3bee805 100644 --- a/collects/mzlib/thread.ss +++ b/collects/mzlib/thread.ss @@ -1,6 +1,7 @@ (module thread mzscheme - (require "spidey.ss") + (require "spidey.ss" + "etc.ss") (provide consumer-thread with-semaphore @@ -12,7 +13,8 @@ merge-input copy-port - run-server) + run-server + make-limited-input-port) #| t accepts a function, f, and creates a thread. It returns the thread and a @@ -144,38 +146,72 @@ (copy a) (copy b) rd))])) - - (define (run-server port-number handler connection-timeout) - (let ([l (tcp-listen port-number)] - [can-break? (break-enabled)]) - (dynamic-wind - void - (lambda () - ;; loop to handle connections - (let loop () - (with-handlers ([not-break-exn? void]) - ;; Make a custodian for the next session: - (let ([c (make-custodian)]) - (parameterize ([current-custodian c]) - ;; disable breaks during session set-up... - (parameterize ([break-enabled #f]) - ;; ... but enable breaks while blocked on an accept: - (let-values ([(r w) ((if can-break? - tcp-accept/enable-break - tcp-accept) - l)]) - ;; Handler thread: - (let ([t (thread (lambda () - (when can-break? - (break-enabled #t)) - (handler r w)))]) - ;; Clean-up and timeout thread: - (thread (lambda () - (object-wait-multiple connection-timeout t) - (when (thread-running? t) - ;; Only happens if connection-timeout is not #f - (break-thread t)) - (object-wait-multiple connection-timeout t) - (custodian-shutdown-all c))))))))) - (loop))) - (lambda () (tcp-close l)))))) + + (define run-server + (opt-lambda (port-number + handler + connection-timeout + [handle-exn void] + [tcp-listen tcp-listen] + [tcp-close tcp-close] + [tcp-accept tcp-accept] + [tcp-accept/enable-break tcp-accept/enable-break]) + (let ([l (tcp-listen port-number)] + [can-break? (break-enabled)]) + (dynamic-wind + void + (lambda () + ;; loop to handle connections + (let loop () + (with-handlers ([not-break-exn? handle-exn]) + ;; Make a custodian for the next session: + (let ([c (make-custodian)]) + (parameterize ([current-custodian c]) + ;; disable breaks during session set-up... + (parameterize ([break-enabled #f]) + ;; ... but enable breaks while blocked on an accept: + (let-values ([(r w) ((if can-break? + tcp-accept/enable-break + tcp-accept) + l)]) + ;; Handler thread: + (let ([t (thread (lambda () + (when can-break? + (break-enabled #t)) + (handler r w)))]) + ;; Clean-up and timeout thread: + (thread (lambda () + (object-wait-multiple connection-timeout t) + (when (thread-running? t) + ;; Only happens if connection-timeout is not #f + (break-thread t)) + (object-wait-multiple connection-timeout t) + (custodian-shutdown-all c))))))))) + (loop))) + (lambda () (tcp-close l)))))) + + (define make-limited-input-port + (opt-lambda (port limit [close-orig? #t]) + (let ([got 0]) + (make-custom-input-port + (lambda (str) + (let ([count (min (- limit got) (string-length str))]) + (if (zero? count) + eof + (let ([n (read-string-avail!* str port 0 count)]) + (cond + [(eq? n 0) port] + [(number? n) (set! got (+ got n)) n] + [else n]))))) + (lambda (str skip) + (let ([count (max 0 (min (- limit got skip) (string-length str)))]) + (if (zero? count) + eof + (let ([n (peek-string-avail!* str skip port 0 count)]) + (if (eq? n 0) + port + n))))) + (lambda () + (when close-orig? + (close-input-port port)))))))) +