Removing singletons from Web Server internals

This commit is contained in:
Jay McCarthy 2013-11-27 11:38:42 -07:00
parent bb5429a039
commit 033065f632
11 changed files with 192 additions and 148 deletions

View File

@ -21,12 +21,17 @@ for doing this.
The connection will last until @racket[timer] triggers.
}
@defproc[(connection-manager? [x any/c]) boolean?]{
Determines if @racket[x] is a connection manager.
}
@defproc[(start-connection-manager)
void]{
connection-manager?]{
Runs the connection manager (now just the timer manager).
}
@defproc[(new-connection [timeout number?]
@defproc[(new-connection [cm connection-manager?]
[timeout number?]
[i-port input-port?]
[o-port output-port?]
[cust custodian?]

View File

@ -9,7 +9,12 @@
This module provides a functionality for running
procedures after a given amount of time, that may be extended.
@defstruct[timer ([evt evt?]
@defproc[(timer-manager? [x any/c]) boolean?]{
Determines if @racket[x] is a timer manager.
}
@defstruct[timer ([tm timer-manager?]
[evt evt?]
[expire-seconds number?]
[action (-> void)])]{
@racket[evt] is an @racket[alarm-evt] that is ready at @racket[expire-seconds].
@ -17,11 +22,12 @@ procedures after a given amount of time, that may be extended.
}
@defproc[(start-timer-manager)
void]{
timer-manager?]{
Handles the execution and management of timers.
}
@defproc[(start-timer [s number?]
@defproc[(start-timer [tm timer-manager?]
[s number?]
[action (-> void)])
timer?]{
Registers a timer that runs @racket[action] after @racket[s] seconds.

View File

@ -6,11 +6,11 @@
web-server/http
web-server/servlet/servlet-structs)
(provide/contract
[create-timeout-manager
[create-timeout-manager
(->
(or/c false/c
(request? . -> . can-be-response?))
number? number?
number? number?
manager?)])
;; Utility
@ -20,27 +20,31 @@
(set! i (add1 i))
i)))
(define-struct (timeout-manager manager) (instance-expiration-handler
instance-timer-length
continuation-timer-length
; Private
instances
next-instance-id))
(define-struct (timeout-manager manager)
(instance-expiration-handler
instance-timer-length
continuation-timer-length
; Private
instances
next-instance-id))
(define (create-timeout-manager
instance-expiration-handler
instance-timer-length
continuation-timer-length)
(define tm (start-timer-manager))
;; Instances
(define instances (make-hasheq))
(define next-instance-id (make-counter))
(define-struct instance (k-table timer))
(define next-instance-id (make-counter))
(define-struct instance (k-table timer))
(define (create-instance expire-fn)
(define instance-id (next-instance-id))
(hash-set! instances
instance-id
(make-instance (create-k-table)
(start-timer instance-timer-length
(start-timer tm
instance-timer-length
(lambda ()
(expire-fn)
(hash-remove! instances instance-id)))))
@ -48,7 +52,7 @@
(define (adjust-timeout! instance-id secs)
(reset-timer! (instance-timer (instance-lookup instance-id #f))
secs))
(define (instance-lookup instance-id peek?)
(define instance
(hash-ref instances instance-id
@ -61,23 +65,23 @@
(increment-timer! (instance-timer instance)
instance-timer-length))
instance)
;; Continuation table
(define-struct k-table (next-id-fn htable))
(define (create-k-table)
(make-k-table (make-counter) (make-hasheq)))
;; Interface
;; Interface
(define (clear-continuations! instance-id)
(match (instance-lookup instance-id #f)
[(struct instance ((and k-table (struct k-table (next-id-fn htable))) instance-timer))
(hash-for-each
htable
(match-lambda*
[(list k-id (list salt k expiration-handler k-timer))
(hash-set! htable k-id
(list salt #f expiration-handler k-timer))]))]))
[(list k-id (list salt k expiration-handler k-timer))
(hash-set! htable k-id
(list salt #f expiration-handler k-timer))]))]))
(define (continuation-store! instance-id k expiration-handler)
(match (instance-lookup instance-id #t)
[(struct instance ((struct k-table (next-id-fn htable)) instance-timer))
@ -86,11 +90,11 @@
(hash-set! htable
k-id
(list salt k expiration-handler
(start-timer continuation-timer-length
(start-timer tm continuation-timer-length
(lambda ()
(hash-set! htable k-id
(list salt #f expiration-handler
(start-timer 0 void)))))))
(start-timer tm 0 void)))))))
(list k-id salt)]))
(define (continuation-lookup* instance-id a-k-id a-salt peek?)
(match (instance-lookup instance-id peek?)
@ -110,28 +114,28 @@
(not k)
(and (custodian-box? k)
(not (custodian-box-value k))))
(raise (make-exn:fail:servlet-manager:no-continuation
(format "No continuation for id: ~a" a-k-id)
(current-continuation-marks)
(if expiration-handler
expiration-handler
instance-expiration-handler)))
k)])]))
(raise (make-exn:fail:servlet-manager:no-continuation
(format "No continuation for id: ~a" a-k-id)
(current-continuation-marks)
(if expiration-handler
expiration-handler
instance-expiration-handler)))
k)])]))
(define (continuation-lookup instance-id a-k-id a-salt)
(continuation-lookup* instance-id a-k-id a-salt #f))
(define (continuation-peek instance-id a-k-id a-salt)
(continuation-lookup* instance-id a-k-id a-salt #t))
(make-timeout-manager create-instance
(make-timeout-manager create-instance
adjust-timeout!
clear-continuations!
continuation-store!
continuation-lookup
continuation-peek
; Specific
; Specific
instance-expiration-handler
instance-timer-length
continuation-timer-length
; Private
; Private
instances
next-instance-id))

View File

@ -1,7 +1,9 @@
#lang racket/base
(require racket/contract
racket/match
"timer.rkt")
(struct connection-manager (i tm))
(define-struct connection (id timer i-port o-port custodian close?)
#:mutable)
@ -13,20 +15,25 @@
[o-port output-port?]
[custodian custodian?]
[close? boolean?])]
[start-connection-manager (-> void)]
[new-connection (number? input-port? output-port? custodian? boolean? . -> . connection?)]
[kill-connection! (connection? . -> . void)]
[adjust-connection-timeout! (connection? number? . -> . void)])
[start-connection-manager
(-> connection-manager?)]
[new-connection
(-> connection-manager? number? input-port? output-port? custodian? boolean?
connection?)]
[kill-connection!
(connection? . -> . void)]
[adjust-connection-timeout!
(connection? number? . -> . void)])
;; start-connection-manager: custodian -> void
;; start-connection-manager: custodian -> connection-manager
;; calls the timer manager
(define (start-connection-manager)
(start-timer-manager))
(connection-manager (box 0) (start-timer-manager)))
;; new-connection: number i-port o-port custodian -> connection
;; new-connection: connection-manager number i-port o-port custodian -> connection
;; ask the connection manager for a new connection
(define i (box 0))
(define (new-connection time-to-live i-port o-port cust close?)
(define (new-connection cm time-to-live i-port o-port cust close?)
(match-define (connection-manager i tm) cm)
(define conn
(make-connection
;; The id is just for debugging and isn't normally useful
@ -35,7 +42,8 @@
(define conn-wb (make-weak-box conn))
(set-connection-timer!
conn
(start-timer time-to-live
(start-timer tm
time-to-live
(lambda ()
(cond
[(weak-box-value conn-wb)

View File

@ -23,28 +23,33 @@
(parameterize ([current-custodian the-server-custodian]
[current-server-custodian the-server-custodian]
#;[current-thread-initial-stack-size 3])
(start-connection-manager)
(define cm (start-connection-manager))
(thread
(lambda ()
(run-server 1 ; This is the port argument, but because we specialize listen, it is ignored.
handle-connection
#f
(lambda (exn)
((error-display-handler)
(format "Connection error: ~a" (exn-message exn))
exn))
(lambda (_ mw re)
(with-handlers ([exn?
(λ (x)
(async-channel-put* confirmation-channel x)
(raise x))])
(define listener (tcp-listen config:port config:max-waiting #t config:listen-ip))
(let-values ([(local-addr local-port end-addr end-port) (tcp-addresses listener #t)])
(async-channel-put* confirmation-channel local-port))
listener))
tcp-close
tcp-accept
tcp-accept/enable-break))))
(run-server
;; This is the port argument, but because we specialize listen, it is ignored.
1
(handle-connection/cm cm)
#f
(lambda (exn)
((error-display-handler)
(format "Connection error: ~a" (exn-message exn))
exn))
(lambda (_ mw re)
(with-handlers ([exn?
(λ (x)
(async-channel-put* confirmation-channel x)
(raise x))])
(define listener
(tcp-listen config:port config:max-waiting #t config:listen-ip))
(let-values
([(local-addr local-port end-addr end-port)
(tcp-addresses listener #t)])
(async-channel-put* confirmation-channel local-port))
listener))
tcp-close
tcp-accept
tcp-accept/enable-break))))
(lambda ()
(custodian-shutdown-all the-server-custodian)))
@ -57,7 +62,8 @@
(parameterize ([current-custodian server-cust]
[current-server-custodian server-cust])
(define connection-cust (make-custodian))
(start-connection-manager)
(define cm (start-connection-manager))
(define handle-connection (handle-connection/cm cm))
(parameterize ([current-custodian connection-cust])
(thread
(lambda ()
@ -67,11 +73,12 @@
(values "127.0.0.1"
"127.0.0.1"))))))))
;; handle-connection : input-port output-port (input-port -> string string) -> void
(define (handle-connection ip op
#:port-addresses [port-addresses tcp-addresses])
;; handle-connection : connection-manager input-port output-port (input-port -> string string) -> void
(define ((handle-connection/cm cm)
ip op
#:port-addresses [port-addresses tcp-addresses])
(define conn
(new-connection config:initial-connection-timeout
(new-connection cm config:initial-connection-timeout
ip op (current-custodian) #f))
(with-handlers
([(λ (x)
@ -93,7 +100,7 @@
;; the connection is closed, then peek will get the EOF and the
;; connection will be closed. This shouldn't change any other
;; behavior: read-request is already blocking, peeking doesn't
;; consume a byte, etc.
;; consume a byte, etc.
(define the-evt
(choice-evt
(handle-evt

View File

@ -2,60 +2,62 @@
(require racket/contract
racket/async-channel)
(define-struct timer (evt expire-seconds action)
(struct timer-manager (thread timer-ch))
(define-struct timer (tm evt expire-seconds action)
#:mutable)
(define timer-ch (make-async-channel))
; start-timer-manager : -> void
; The timer manager thread
;; start-timer-manager : -> timer-manager?
;; The timer manager thread
(define (start-timer-manager)
(thread
(lambda ()
(let loop ([timers null])
;; (printf "Timers: ~a\n" (length timers))
;; Wait for either...
(apply sync
;; ... a timer-request message ...
(handle-evt
timer-ch
(lambda (req)
;; represent a req as a (timer-list -> timer-list) function:
;; add/remove/change timer evet:
(loop (req timers))))
;; ... or a timer
(map (lambda (timer)
(handle-evt
(timer-evt timer)
(lambda (_)
;; execute timer
((timer-action timer))
(loop (remq timer timers)))))
timers)))))
(void))
(define timer-ch (make-async-channel))
(timer-manager
(thread
(lambda ()
(let loop ([timers null])
;; (printf "Timers: ~a\n" (length timers))
;; Wait for either...
(apply sync
;; ... a timer-request message ...
(handle-evt
timer-ch
(lambda (req)
;; represent a req as a (timer-list -> timer-list) function:
;; add/remove/change timer evet:
(loop (req timers))))
;; ... or a timer
(map (lambda (timer)
(handle-evt
(timer-evt timer)
(lambda (_)
;; execute timer
((timer-action timer))
(loop (remq timer timers)))))
timers)))))
timer-ch))
;; Limitation on this add-timer: thunk cannot make timer
;; requests directly, because it's executed directly by
;; the timer-manager thread
;; add-timer : number (-> void) -> timer
(define (add-timer msecs thunk)
;; add-timer : timer-manager number (-> void) -> timer
(define (add-timer manager msecs thunk)
(define now (current-inexact-milliseconds))
(define timer
(make-timer (alarm-evt (+ now msecs))
(+ now msecs)
thunk))
(async-channel-put
timer-ch
(define t
(timer manager
(alarm-evt (+ now msecs))
(+ now msecs)
thunk))
(async-channel-put
(timer-manager-timer-ch manager)
(lambda (timers)
(list* timer timers)))
timer)
(list* t timers)))
t)
; revise-timer! : timer msecs (-> void) -> timer
; revise the timer to ring msecs from now
;; revise-timer! : timer msecs (-> void) -> timer
;; revise the timer to ring msecs from now
(define (revise-timer! timer msecs thunk)
(define now (current-inexact-milliseconds))
(async-channel-put
timer-ch
(async-channel-put
(timer-manager-timer-ch (timer-tm timer))
(lambda (timers)
(set-timer-evt! timer (alarm-evt (+ now msecs)))
(set-timer-expire-seconds! timer (+ now msecs))
@ -64,22 +66,22 @@
(define (cancel-timer! timer)
(async-channel-put
timer-ch
(timer-manager-timer-ch (timer-tm timer))
(lambda (timers)
(remq timer timers))))
; start-timer : num (-> void) -> timer
; to make a timer that calls to-do after sec from make-timer's application
(define (start-timer secs to-do)
(add-timer (* 1000 secs) to-do))
;; start-timer : timer-manager num (-> void) -> timer
;; to make a timer that calls to-do after sec from make-timer's application
(define (start-timer tm secs to-do)
(add-timer tm (* 1000 secs) to-do))
; reset-timer : timer num -> void
; to cause timer to expire after sec from the adjust-msec-to-live's application
;; reset-timer : timer num -> void
;; to cause timer to expire after sec from the adjust-msec-to-live's application
(define (reset-timer! timer secs)
(revise-timer! timer (* 1000 secs) (timer-action timer)))
; increment-timer! : timer num -> void
; add secs to the timer, rather than replace
;; increment-timer! : timer num -> void
;; add secs to the timer, rather than replace
(define (increment-timer! timer secs)
(revise-timer! timer
(+ (- (timer-expire-seconds timer) (current-inexact-milliseconds))
@ -88,21 +90,24 @@
(provide/contract
[struct timer ([evt evt?]
[timer-manager?
(-> any/c boolean?)]
[struct timer ([tm timer-manager?]
[evt evt?]
[expire-seconds number?]
[action (-> void)])]
[start-timer-manager (-> void)]
[start-timer (number? (-> void) . -> . timer?)]
[action (-> void)])]
[start-timer-manager (-> timer-manager?)]
[start-timer (timer-manager? number? (-> void) . -> . timer?)]
[reset-timer! (timer? number? . -> . void)]
[increment-timer! (timer? number? . -> . void)]
[cancel-timer! (timer? . -> . void)])
; --- timeout plan
;; --- timeout plan
; start timeout on connection startup
; for POST requests increase the timeout proportionally when content-length is read
; adjust timeout in read-to-eof
; adjust timeout to starting timeout for next request with persistent connections
;; start timeout on connection startup
;; for POST requests increase the timeout proportionally when content-length is read
;; adjust timeout in read-to-eof
;; adjust timeout to starting timeout for next request with persistent connections
; adjust timeout proportionally when responding
; for servlet - make it a day until the output is produced
;; adjust timeout proportionally when responding
;; for servlet - make it a day until the output is produced

View File

@ -91,7 +91,8 @@
(define (make-mock-connection ib)
(define ip (open-input-bytes ib))
(define op (open-output-bytes))
(values (make-connection 0 (make-timer never-evt +inf.0 (lambda () (void)))
(define tm (start-timer-manager))
(values (make-connection 0 (make-timer tm never-evt +inf.0 (lambda () (void)))
ip op (current-custodian) #t)
ip
op))

View File

@ -6,10 +6,11 @@
web-server/http
"../util.rkt")
(define tm (start-timer-manager))
(define (write-response r [redact? #t])
(define-values (i-port o-port) (make-pipe))
(define conn
(connection 0 (start-timer +inf.0 void)
(connection 0 (start-timer tm +inf.0 void)
i-port o-port (make-custodian) #t))
(output-response conn r)
(close-output-port o-port)

View File

@ -3,7 +3,11 @@
web-server/private/connection-manager)
(provide connection-manager-tests)
(start-connection-manager)
(define cm (start-connection-manager))
(module+ test
(require rackunit/text-ui)
(run-tests connection-manager-tests))
(define connection-manager-tests
(test-suite
@ -15,7 +19,7 @@
(check-true
(let ([ib (open-input-bytes #"")]
[ob (open-output-bytes)])
(new-connection 1 ib ob (make-custodian) #t)
(new-connection cm 1 ib ob (make-custodian) #t)
(sleep 2)
(with-handlers ([exn? (lambda _ #t)])
(read ib) #f))))
@ -25,7 +29,7 @@
(check-true
(let ([ib (open-input-bytes #"")]
[ob (open-output-bytes)])
(new-connection 1 ib ob (make-custodian) #t)
(new-connection cm 1 ib ob (make-custodian) #t)
(sleep 2)
(with-handlers ([exn? (lambda _ #t)])
(write 1 ob) #f))))
@ -35,7 +39,7 @@
(check-true
(let* ([ib (open-input-bytes #"")]
[ob (open-output-bytes)]
[c (new-connection 1 ib ob (make-custodian) #t)])
[c (new-connection cm 1 ib ob (make-custodian) #t)])
(kill-connection! c)
(and (with-handlers ([exn? (lambda _ #t)])
(read ib) #f)
@ -47,9 +51,9 @@
(check-true
(let* ([ib (open-input-bytes #"")]
[ob (open-output-bytes)]
[c (new-connection 1 ib ob (make-custodian) #t)])
[c (new-connection cm 2 ib ob (make-custodian) #t)])
(adjust-connection-timeout! c 1)
(sleep 2)
(sleep 4)
(and (with-handlers ([exn? (lambda _ #t)])
(read ib) #f)
(with-handlers ([exn? (lambda _ #t)])

View File

@ -19,7 +19,7 @@
(number->string (bytes-length b)))))]
[ip (open-input-bytes b)]
[op (open-output-bytes)])
(values (make-connection 0 (make-timer ip +inf.0 (lambda () (void)))
(values (make-connection 0 (make-timer tm ip +inf.0 (lambda () (void)))
ip op (make-custodian) #f)
headers)))
@ -38,11 +38,13 @@
(read-bindings&post-data/raw (connection-i-port conn) #"POST" (string->url "http://localhost") headers))
(lambda (f s) s)))
(define tm (start-timer-manager))
(define (test-read-request b)
(define ip (open-input-bytes b))
(define op (open-output-bytes))
(define c
(make-connection 0 (make-timer ip +inf.0 (lambda () (void)))
(make-connection 0 (make-timer tm ip +inf.0 (lambda () (void)))
ip op (make-custodian) #f))
(define-values (req flag)
(read-request c 80 (λ (_) (values "to" "from"))))
@ -82,7 +84,7 @@
(lambda ()
(define ip (open-input-string "GET http://127.0.0.1:8080/servlets/examples/hello.rkt?a=1&b: HTTP/1.1"))
(read-request
(make-connection 0 (make-timer ip +inf.0 (lambda () (void)))
(make-connection 0 (make-timer tm ip +inf.0 (lambda () (void)))
ip
(open-output-bytes) (make-custodian) #f)
8081

View File

@ -52,7 +52,8 @@
(define (make-mock-connection ib)
(define ip (open-input-bytes ib))
(define op (open-output-bytes))
(values (make-connection 0 (make-timer never-evt +inf.0 (lambda () (void)))
(define tm (start-timer-manager))
(values (make-connection 0 (make-timer tm never-evt +inf.0 (lambda () (void)))
ip op (make-custodian) #t)
ip
op))