diff --git a/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/connection-manager.scrbl b/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/connection-manager.scrbl index c741693c38..0e1e7bf519 100644 --- a/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/connection-manager.scrbl +++ b/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/connection-manager.scrbl @@ -21,12 +21,17 @@ for doing this. The connection will last until @racket[timer] triggers. } +@defproc[(connection-manager? [x any/c]) boolean?]{ + Determines if @racket[x] is a connection manager. +} + @defproc[(start-connection-manager) - void]{ + connection-manager?]{ Runs the connection manager (now just the timer manager). } -@defproc[(new-connection [timeout number?] +@defproc[(new-connection [cm connection-manager?] + [timeout number?] [i-port input-port?] [o-port output-port?] [cust custodian?] diff --git a/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/timer.scrbl b/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/timer.scrbl index dabd444ae9..c2a05eddae 100644 --- a/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/timer.scrbl +++ b/pkgs/web-server-pkgs/web-server-doc/web-server/scribblings/timer.scrbl @@ -9,7 +9,12 @@ This module provides a functionality for running procedures after a given amount of time, that may be extended. -@defstruct[timer ([evt evt?] +@defproc[(timer-manager? [x any/c]) boolean?]{ + Determines if @racket[x] is a timer manager. +} + +@defstruct[timer ([tm timer-manager?] + [evt evt?] [expire-seconds number?] [action (-> void)])]{ @racket[evt] is an @racket[alarm-evt] that is ready at @racket[expire-seconds]. @@ -17,11 +22,12 @@ procedures after a given amount of time, that may be extended. } @defproc[(start-timer-manager) - void]{ + timer-manager?]{ Handles the execution and management of timers. } -@defproc[(start-timer [s number?] +@defproc[(start-timer [tm timer-manager?] + [s number?] [action (-> void)]) timer?]{ Registers a timer that runs @racket[action] after @racket[s] seconds. diff --git a/pkgs/web-server-pkgs/web-server-lib/web-server/managers/timeouts.rkt b/pkgs/web-server-pkgs/web-server-lib/web-server/managers/timeouts.rkt index 6a8db43f31..66b2df35a3 100644 --- a/pkgs/web-server-pkgs/web-server-lib/web-server/managers/timeouts.rkt +++ b/pkgs/web-server-pkgs/web-server-lib/web-server/managers/timeouts.rkt @@ -6,11 +6,11 @@ web-server/http web-server/servlet/servlet-structs) (provide/contract - [create-timeout-manager + [create-timeout-manager (-> (or/c false/c (request? . -> . can-be-response?)) - number? number? + number? number? manager?)]) ;; Utility @@ -20,27 +20,31 @@ (set! i (add1 i)) i))) -(define-struct (timeout-manager manager) (instance-expiration-handler - instance-timer-length - continuation-timer-length - ; Private - instances - next-instance-id)) +(define-struct (timeout-manager manager) + (instance-expiration-handler + instance-timer-length + continuation-timer-length + ; Private + instances + next-instance-id)) (define (create-timeout-manager instance-expiration-handler instance-timer-length continuation-timer-length) + (define tm (start-timer-manager)) + ;; Instances (define instances (make-hasheq)) - (define next-instance-id (make-counter)) - - (define-struct instance (k-table timer)) + (define next-instance-id (make-counter)) + + (define-struct instance (k-table timer)) (define (create-instance expire-fn) (define instance-id (next-instance-id)) (hash-set! instances instance-id (make-instance (create-k-table) - (start-timer instance-timer-length + (start-timer tm + instance-timer-length (lambda () (expire-fn) (hash-remove! instances instance-id))))) @@ -48,7 +52,7 @@ (define (adjust-timeout! instance-id secs) (reset-timer! (instance-timer (instance-lookup instance-id #f)) secs)) - + (define (instance-lookup instance-id peek?) (define instance (hash-ref instances instance-id @@ -61,23 +65,23 @@ (increment-timer! (instance-timer instance) instance-timer-length)) instance) - + ;; Continuation table (define-struct k-table (next-id-fn htable)) (define (create-k-table) (make-k-table (make-counter) (make-hasheq))) - - ;; Interface + + ;; Interface (define (clear-continuations! instance-id) (match (instance-lookup instance-id #f) [(struct instance ((and k-table (struct k-table (next-id-fn htable))) instance-timer)) (hash-for-each htable (match-lambda* - [(list k-id (list salt k expiration-handler k-timer)) - (hash-set! htable k-id - (list salt #f expiration-handler k-timer))]))])) - + [(list k-id (list salt k expiration-handler k-timer)) + (hash-set! htable k-id + (list salt #f expiration-handler k-timer))]))])) + (define (continuation-store! instance-id k expiration-handler) (match (instance-lookup instance-id #t) [(struct instance ((struct k-table (next-id-fn htable)) instance-timer)) @@ -86,11 +90,11 @@ (hash-set! htable k-id (list salt k expiration-handler - (start-timer continuation-timer-length + (start-timer tm continuation-timer-length (lambda () (hash-set! htable k-id (list salt #f expiration-handler - (start-timer 0 void))))))) + (start-timer tm 0 void))))))) (list k-id salt)])) (define (continuation-lookup* instance-id a-k-id a-salt peek?) (match (instance-lookup instance-id peek?) @@ -110,28 +114,28 @@ (not k) (and (custodian-box? k) (not (custodian-box-value k)))) - (raise (make-exn:fail:servlet-manager:no-continuation - (format "No continuation for id: ~a" a-k-id) - (current-continuation-marks) - (if expiration-handler - expiration-handler - instance-expiration-handler))) - k)])])) + (raise (make-exn:fail:servlet-manager:no-continuation + (format "No continuation for id: ~a" a-k-id) + (current-continuation-marks) + (if expiration-handler + expiration-handler + instance-expiration-handler))) + k)])])) (define (continuation-lookup instance-id a-k-id a-salt) (continuation-lookup* instance-id a-k-id a-salt #f)) (define (continuation-peek instance-id a-k-id a-salt) (continuation-lookup* instance-id a-k-id a-salt #t)) - - (make-timeout-manager create-instance + + (make-timeout-manager create-instance adjust-timeout! clear-continuations! continuation-store! continuation-lookup continuation-peek - ; Specific + ; Specific instance-expiration-handler instance-timer-length continuation-timer-length - ; Private + ; Private instances next-instance-id)) diff --git a/pkgs/web-server-pkgs/web-server-lib/web-server/private/connection-manager.rkt b/pkgs/web-server-pkgs/web-server-lib/web-server/private/connection-manager.rkt index 0188c1daa1..6ddfe94635 100644 --- a/pkgs/web-server-pkgs/web-server-lib/web-server/private/connection-manager.rkt +++ b/pkgs/web-server-pkgs/web-server-lib/web-server/private/connection-manager.rkt @@ -1,7 +1,9 @@ #lang racket/base (require racket/contract + racket/match "timer.rkt") +(struct connection-manager (i tm)) (define-struct connection (id timer i-port o-port custodian close?) #:mutable) @@ -13,20 +15,25 @@ [o-port output-port?] [custodian custodian?] [close? boolean?])] - [start-connection-manager (-> void)] - [new-connection (number? input-port? output-port? custodian? boolean? . -> . connection?)] - [kill-connection! (connection? . -> . void)] - [adjust-connection-timeout! (connection? number? . -> . void)]) + [start-connection-manager + (-> connection-manager?)] + [new-connection + (-> connection-manager? number? input-port? output-port? custodian? boolean? + connection?)] + [kill-connection! + (connection? . -> . void)] + [adjust-connection-timeout! + (connection? number? . -> . void)]) -;; start-connection-manager: custodian -> void +;; start-connection-manager: custodian -> connection-manager ;; calls the timer manager (define (start-connection-manager) - (start-timer-manager)) + (connection-manager (box 0) (start-timer-manager))) -;; new-connection: number i-port o-port custodian -> connection +;; new-connection: connection-manager number i-port o-port custodian -> connection ;; ask the connection manager for a new connection -(define i (box 0)) -(define (new-connection time-to-live i-port o-port cust close?) +(define (new-connection cm time-to-live i-port o-port cust close?) + (match-define (connection-manager i tm) cm) (define conn (make-connection ;; The id is just for debugging and isn't normally useful @@ -35,7 +42,8 @@ (define conn-wb (make-weak-box conn)) (set-connection-timer! conn - (start-timer time-to-live + (start-timer tm + time-to-live (lambda () (cond [(weak-box-value conn-wb) diff --git a/pkgs/web-server-pkgs/web-server-lib/web-server/private/dispatch-server-unit.rkt b/pkgs/web-server-pkgs/web-server-lib/web-server/private/dispatch-server-unit.rkt index 736b08637c..26a4434d03 100644 --- a/pkgs/web-server-pkgs/web-server-lib/web-server/private/dispatch-server-unit.rkt +++ b/pkgs/web-server-pkgs/web-server-lib/web-server/private/dispatch-server-unit.rkt @@ -23,28 +23,33 @@ (parameterize ([current-custodian the-server-custodian] [current-server-custodian the-server-custodian] #;[current-thread-initial-stack-size 3]) - (start-connection-manager) + (define cm (start-connection-manager)) (thread (lambda () - (run-server 1 ; This is the port argument, but because we specialize listen, it is ignored. - handle-connection - #f - (lambda (exn) - ((error-display-handler) - (format "Connection error: ~a" (exn-message exn)) - exn)) - (lambda (_ mw re) - (with-handlers ([exn? - (λ (x) - (async-channel-put* confirmation-channel x) - (raise x))]) - (define listener (tcp-listen config:port config:max-waiting #t config:listen-ip)) - (let-values ([(local-addr local-port end-addr end-port) (tcp-addresses listener #t)]) - (async-channel-put* confirmation-channel local-port)) - listener)) - tcp-close - tcp-accept - tcp-accept/enable-break)))) + (run-server + ;; This is the port argument, but because we specialize listen, it is ignored. + 1 + (handle-connection/cm cm) + #f + (lambda (exn) + ((error-display-handler) + (format "Connection error: ~a" (exn-message exn)) + exn)) + (lambda (_ mw re) + (with-handlers ([exn? + (λ (x) + (async-channel-put* confirmation-channel x) + (raise x))]) + (define listener + (tcp-listen config:port config:max-waiting #t config:listen-ip)) + (let-values + ([(local-addr local-port end-addr end-port) + (tcp-addresses listener #t)]) + (async-channel-put* confirmation-channel local-port)) + listener)) + tcp-close + tcp-accept + tcp-accept/enable-break)))) (lambda () (custodian-shutdown-all the-server-custodian))) @@ -57,7 +62,8 @@ (parameterize ([current-custodian server-cust] [current-server-custodian server-cust]) (define connection-cust (make-custodian)) - (start-connection-manager) + (define cm (start-connection-manager)) + (define handle-connection (handle-connection/cm cm)) (parameterize ([current-custodian connection-cust]) (thread (lambda () @@ -67,11 +73,12 @@ (values "127.0.0.1" "127.0.0.1")))))))) -;; handle-connection : input-port output-port (input-port -> string string) -> void -(define (handle-connection ip op - #:port-addresses [port-addresses tcp-addresses]) +;; handle-connection : connection-manager input-port output-port (input-port -> string string) -> void +(define ((handle-connection/cm cm) + ip op + #:port-addresses [port-addresses tcp-addresses]) (define conn - (new-connection config:initial-connection-timeout + (new-connection cm config:initial-connection-timeout ip op (current-custodian) #f)) (with-handlers ([(λ (x) @@ -93,7 +100,7 @@ ;; the connection is closed, then peek will get the EOF and the ;; connection will be closed. This shouldn't change any other ;; behavior: read-request is already blocking, peeking doesn't - ;; consume a byte, etc. + ;; consume a byte, etc. (define the-evt (choice-evt (handle-evt diff --git a/pkgs/web-server-pkgs/web-server-lib/web-server/private/timer.rkt b/pkgs/web-server-pkgs/web-server-lib/web-server/private/timer.rkt index 7a9217f21a..f3a50d288f 100644 --- a/pkgs/web-server-pkgs/web-server-lib/web-server/private/timer.rkt +++ b/pkgs/web-server-pkgs/web-server-lib/web-server/private/timer.rkt @@ -2,60 +2,62 @@ (require racket/contract racket/async-channel) -(define-struct timer (evt expire-seconds action) +(struct timer-manager (thread timer-ch)) +(define-struct timer (tm evt expire-seconds action) #:mutable) -(define timer-ch (make-async-channel)) - -; start-timer-manager : -> void -; The timer manager thread +;; start-timer-manager : -> timer-manager? +;; The timer manager thread (define (start-timer-manager) - (thread - (lambda () - (let loop ([timers null]) - ;; (printf "Timers: ~a\n" (length timers)) - ;; Wait for either... - (apply sync - ;; ... a timer-request message ... - (handle-evt - timer-ch - (lambda (req) - ;; represent a req as a (timer-list -> timer-list) function: - ;; add/remove/change timer evet: - (loop (req timers)))) - ;; ... or a timer - (map (lambda (timer) - (handle-evt - (timer-evt timer) - (lambda (_) - ;; execute timer - ((timer-action timer)) - (loop (remq timer timers))))) - timers))))) - (void)) + (define timer-ch (make-async-channel)) + (timer-manager + (thread + (lambda () + (let loop ([timers null]) + ;; (printf "Timers: ~a\n" (length timers)) + ;; Wait for either... + (apply sync + ;; ... a timer-request message ... + (handle-evt + timer-ch + (lambda (req) + ;; represent a req as a (timer-list -> timer-list) function: + ;; add/remove/change timer evet: + (loop (req timers)))) + ;; ... or a timer + (map (lambda (timer) + (handle-evt + (timer-evt timer) + (lambda (_) + ;; execute timer + ((timer-action timer)) + (loop (remq timer timers))))) + timers))))) + timer-ch)) ;; Limitation on this add-timer: thunk cannot make timer ;; requests directly, because it's executed directly by ;; the timer-manager thread -;; add-timer : number (-> void) -> timer -(define (add-timer msecs thunk) +;; add-timer : timer-manager number (-> void) -> timer +(define (add-timer manager msecs thunk) (define now (current-inexact-milliseconds)) - (define timer - (make-timer (alarm-evt (+ now msecs)) - (+ now msecs) - thunk)) - (async-channel-put - timer-ch + (define t + (timer manager + (alarm-evt (+ now msecs)) + (+ now msecs) + thunk)) + (async-channel-put + (timer-manager-timer-ch manager) (lambda (timers) - (list* timer timers))) - timer) + (list* t timers))) + t) -; revise-timer! : timer msecs (-> void) -> timer -; revise the timer to ring msecs from now +;; revise-timer! : timer msecs (-> void) -> timer +;; revise the timer to ring msecs from now (define (revise-timer! timer msecs thunk) (define now (current-inexact-milliseconds)) - (async-channel-put - timer-ch + (async-channel-put + (timer-manager-timer-ch (timer-tm timer)) (lambda (timers) (set-timer-evt! timer (alarm-evt (+ now msecs))) (set-timer-expire-seconds! timer (+ now msecs)) @@ -64,22 +66,22 @@ (define (cancel-timer! timer) (async-channel-put - timer-ch + (timer-manager-timer-ch (timer-tm timer)) (lambda (timers) (remq timer timers)))) -; start-timer : num (-> void) -> timer -; to make a timer that calls to-do after sec from make-timer's application -(define (start-timer secs to-do) - (add-timer (* 1000 secs) to-do)) +;; start-timer : timer-manager num (-> void) -> timer +;; to make a timer that calls to-do after sec from make-timer's application +(define (start-timer tm secs to-do) + (add-timer tm (* 1000 secs) to-do)) -; reset-timer : timer num -> void -; to cause timer to expire after sec from the adjust-msec-to-live's application +;; reset-timer : timer num -> void +;; to cause timer to expire after sec from the adjust-msec-to-live's application (define (reset-timer! timer secs) (revise-timer! timer (* 1000 secs) (timer-action timer))) -; increment-timer! : timer num -> void -; add secs to the timer, rather than replace +;; increment-timer! : timer num -> void +;; add secs to the timer, rather than replace (define (increment-timer! timer secs) (revise-timer! timer (+ (- (timer-expire-seconds timer) (current-inexact-milliseconds)) @@ -88,21 +90,24 @@ (provide/contract - [struct timer ([evt evt?] + [timer-manager? + (-> any/c boolean?)] + [struct timer ([tm timer-manager?] + [evt evt?] [expire-seconds number?] - [action (-> void)])] - [start-timer-manager (-> void)] - [start-timer (number? (-> void) . -> . timer?)] + [action (-> void)])] + [start-timer-manager (-> timer-manager?)] + [start-timer (timer-manager? number? (-> void) . -> . timer?)] [reset-timer! (timer? number? . -> . void)] [increment-timer! (timer? number? . -> . void)] [cancel-timer! (timer? . -> . void)]) -; --- timeout plan +;; --- timeout plan -; start timeout on connection startup -; for POST requests increase the timeout proportionally when content-length is read -; adjust timeout in read-to-eof -; adjust timeout to starting timeout for next request with persistent connections +;; start timeout on connection startup +;; for POST requests increase the timeout proportionally when content-length is read +;; adjust timeout in read-to-eof +;; adjust timeout to starting timeout for next request with persistent connections -; adjust timeout proportionally when responding -; for servlet - make it a day until the output is produced +;; adjust timeout proportionally when responding +;; for servlet - make it a day until the output is produced diff --git a/pkgs/web-server-pkgs/web-server-lib/web-server/test.rkt b/pkgs/web-server-pkgs/web-server-lib/web-server/test.rkt index 7cef89316f..9eeae691f1 100644 --- a/pkgs/web-server-pkgs/web-server-lib/web-server/test.rkt +++ b/pkgs/web-server-pkgs/web-server-lib/web-server/test.rkt @@ -91,7 +91,8 @@ (define (make-mock-connection ib) (define ip (open-input-bytes ib)) (define op (open-output-bytes)) - (values (make-connection 0 (make-timer never-evt +inf.0 (lambda () (void))) + (define tm (start-timer-manager)) + (values (make-connection 0 (make-timer tm never-evt +inf.0 (lambda () (void))) ip op (current-custodian) #t) ip op)) diff --git a/pkgs/web-server-pkgs/web-server-test/tests/web-server/http/xexpr.rkt b/pkgs/web-server-pkgs/web-server-test/tests/web-server/http/xexpr.rkt index a77b44838f..f715739539 100644 --- a/pkgs/web-server-pkgs/web-server-test/tests/web-server/http/xexpr.rkt +++ b/pkgs/web-server-pkgs/web-server-test/tests/web-server/http/xexpr.rkt @@ -6,10 +6,11 @@ web-server/http "../util.rkt") +(define tm (start-timer-manager)) (define (write-response r [redact? #t]) (define-values (i-port o-port) (make-pipe)) (define conn - (connection 0 (start-timer +inf.0 void) + (connection 0 (start-timer tm +inf.0 void) i-port o-port (make-custodian) #t)) (output-response conn r) (close-output-port o-port) diff --git a/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/connection-manager-test.rkt b/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/connection-manager-test.rkt index c1e1607c1a..9320852594 100644 --- a/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/connection-manager-test.rkt +++ b/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/connection-manager-test.rkt @@ -3,7 +3,11 @@ web-server/private/connection-manager) (provide connection-manager-tests) -(start-connection-manager) +(define cm (start-connection-manager)) + +(module+ test + (require rackunit/text-ui) + (run-tests connection-manager-tests)) (define connection-manager-tests (test-suite @@ -15,7 +19,7 @@ (check-true (let ([ib (open-input-bytes #"")] [ob (open-output-bytes)]) - (new-connection 1 ib ob (make-custodian) #t) + (new-connection cm 1 ib ob (make-custodian) #t) (sleep 2) (with-handlers ([exn? (lambda _ #t)]) (read ib) #f)))) @@ -25,7 +29,7 @@ (check-true (let ([ib (open-input-bytes #"")] [ob (open-output-bytes)]) - (new-connection 1 ib ob (make-custodian) #t) + (new-connection cm 1 ib ob (make-custodian) #t) (sleep 2) (with-handlers ([exn? (lambda _ #t)]) (write 1 ob) #f)))) @@ -35,7 +39,7 @@ (check-true (let* ([ib (open-input-bytes #"")] [ob (open-output-bytes)] - [c (new-connection 1 ib ob (make-custodian) #t)]) + [c (new-connection cm 1 ib ob (make-custodian) #t)]) (kill-connection! c) (and (with-handlers ([exn? (lambda _ #t)]) (read ib) #f) @@ -47,9 +51,9 @@ (check-true (let* ([ib (open-input-bytes #"")] [ob (open-output-bytes)] - [c (new-connection 1 ib ob (make-custodian) #t)]) + [c (new-connection cm 2 ib ob (make-custodian) #t)]) (adjust-connection-timeout! c 1) - (sleep 2) + (sleep 4) (and (with-handlers ([exn? (lambda _ #t)]) (read ib) #f) (with-handlers ([exn? (lambda _ #t)]) diff --git a/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/request-test.rkt b/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/request-test.rkt index 5ab62136c3..86525447be 100644 --- a/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/request-test.rkt +++ b/pkgs/web-server-pkgs/web-server-test/tests/web-server/private/request-test.rkt @@ -19,7 +19,7 @@ (number->string (bytes-length b)))))] [ip (open-input-bytes b)] [op (open-output-bytes)]) - (values (make-connection 0 (make-timer ip +inf.0 (lambda () (void))) + (values (make-connection 0 (make-timer tm ip +inf.0 (lambda () (void))) ip op (make-custodian) #f) headers))) @@ -38,11 +38,13 @@ (read-bindings&post-data/raw (connection-i-port conn) #"POST" (string->url "http://localhost") headers)) (lambda (f s) s))) +(define tm (start-timer-manager)) + (define (test-read-request b) (define ip (open-input-bytes b)) (define op (open-output-bytes)) (define c - (make-connection 0 (make-timer ip +inf.0 (lambda () (void))) + (make-connection 0 (make-timer tm ip +inf.0 (lambda () (void))) ip op (make-custodian) #f)) (define-values (req flag) (read-request c 80 (λ (_) (values "to" "from")))) @@ -82,7 +84,7 @@ (lambda () (define ip (open-input-string "GET http://127.0.0.1:8080/servlets/examples/hello.rkt?a=1&b: HTTP/1.1")) (read-request - (make-connection 0 (make-timer ip +inf.0 (lambda () (void))) + (make-connection 0 (make-timer tm ip +inf.0 (lambda () (void))) ip (open-output-bytes) (make-custodian) #f) 8081 diff --git a/pkgs/web-server-pkgs/web-server-test/tests/web-server/util.rkt b/pkgs/web-server-pkgs/web-server-test/tests/web-server/util.rkt index ef5e995ab0..d92ae2f19e 100644 --- a/pkgs/web-server-pkgs/web-server-test/tests/web-server/util.rkt +++ b/pkgs/web-server-pkgs/web-server-test/tests/web-server/util.rkt @@ -52,7 +52,8 @@ (define (make-mock-connection ib) (define ip (open-input-bytes ib)) (define op (open-output-bytes)) - (values (make-connection 0 (make-timer never-evt +inf.0 (lambda () (void))) + (define tm (start-timer-manager)) + (values (make-connection 0 (make-timer tm never-evt +inf.0 (lambda () (void))) ip op (make-custodian) #t) ip op))