* code reformatting
* use kw.ss instead of opt-lambda * moved a debugging comment into an `if' svn: r4273
This commit is contained in:
parent
a0ccd557f5
commit
a0c4d3b454
|
@ -1,9 +1,9 @@
|
|||
|
||||
(module thread mzscheme
|
||||
(require "etc.ss" "contract.ss")
|
||||
(require "kw.ss" "contract.ss")
|
||||
|
||||
(provide run-server
|
||||
consumer-thread)
|
||||
consumer-thread)
|
||||
|
||||
#|
|
||||
t accepts a function, f, and creates a thread. It returns the thread and a
|
||||
|
@ -12,194 +12,188 @@
|
|||
block.
|
||||
|#
|
||||
|
||||
(define consumer-thread
|
||||
(case-lambda
|
||||
[(f) (consumer-thread f void)]
|
||||
[(f init)
|
||||
(unless (procedure? f) (raise-type-error 'consumer-thread "procedure" f))
|
||||
(let ([sema (make-semaphore 0)]
|
||||
[protect (make-semaphore 1)]
|
||||
[front-state null]
|
||||
[back-state null])
|
||||
(values
|
||||
(thread
|
||||
(letrec ([loop
|
||||
(lambda ()
|
||||
(semaphore-wait sema)
|
||||
(let ([local-state
|
||||
(begin
|
||||
(semaphore-wait protect)
|
||||
(if (null? back-state)
|
||||
(let ([new-front (reverse front-state)])
|
||||
(set! back-state (cdr new-front))
|
||||
(set! front-state null)
|
||||
(semaphore-post protect)
|
||||
(car new-front))
|
||||
(begin0
|
||||
(car back-state)
|
||||
(set! back-state (cdr back-state))
|
||||
(semaphore-post protect))))])
|
||||
(apply f local-state))
|
||||
(loop))])
|
||||
(lambda ()
|
||||
(init)
|
||||
(loop))))
|
||||
(lambda new-state
|
||||
(let ([num (length new-state)])
|
||||
(unless (procedure-arity-includes? f num)
|
||||
(raise
|
||||
(make-exn:fail:contract:arity
|
||||
(string->immutable-string
|
||||
(format "<procedure-from-consumer-thread>: consumer procedure arity is ~e; provided ~s argument~a"
|
||||
(procedure-arity f) num (if (= 1 num) "" "s")))
|
||||
(current-continuation-marks)))))
|
||||
(semaphore-wait protect)
|
||||
(set! front-state (cons new-state front-state))
|
||||
(semaphore-post protect)
|
||||
(semaphore-post sema))))]))
|
||||
(define/kw (consumer-thread f #:optional [init void])
|
||||
(unless (procedure? f) (raise-type-error 'consumer-thread "procedure" f))
|
||||
(let ([sema (make-semaphore 0)]
|
||||
[protect (make-semaphore 1)]
|
||||
[front-state null]
|
||||
[back-state null])
|
||||
(values
|
||||
(thread
|
||||
(letrec ([loop
|
||||
(lambda ()
|
||||
(semaphore-wait sema)
|
||||
(let ([local-state
|
||||
(begin
|
||||
(semaphore-wait protect)
|
||||
(if (null? back-state)
|
||||
(let ([new-front (reverse front-state)])
|
||||
(set! back-state (cdr new-front))
|
||||
(set! front-state null)
|
||||
(semaphore-post protect)
|
||||
(car new-front))
|
||||
(begin0
|
||||
(car back-state)
|
||||
(set! back-state (cdr back-state))
|
||||
(semaphore-post protect))))])
|
||||
(apply f local-state))
|
||||
(loop))])
|
||||
(lambda ()
|
||||
(init)
|
||||
(loop))))
|
||||
(lambda new-state
|
||||
(let ([num (length new-state)])
|
||||
(unless (procedure-arity-includes? f num)
|
||||
(raise
|
||||
(make-exn:fail:contract:arity
|
||||
(string->immutable-string
|
||||
(format "<procedure-from-consumer-thread>: consumer procedure arity is ~e; provided ~s argument~a"
|
||||
(procedure-arity f) num (if (= 1 num) "" "s")))
|
||||
(current-continuation-marks)))))
|
||||
(semaphore-wait protect)
|
||||
(set! front-state (cons new-state front-state))
|
||||
(semaphore-post protect)
|
||||
(semaphore-post sema)))))
|
||||
|
||||
(define run-server
|
||||
(opt-lambda (port-number
|
||||
handler
|
||||
connection-timeout
|
||||
[handle-exn void]
|
||||
[tcp-listen tcp-listen]
|
||||
[tcp-close tcp-close]
|
||||
[tcp-accept tcp-accept]
|
||||
[tcp-accept/enable-break tcp-accept/enable-break])
|
||||
(let ([l (tcp-listen port-number 5 #t)]
|
||||
[can-break? (break-enabled)])
|
||||
(dynamic-wind
|
||||
void
|
||||
(lambda ()
|
||||
;; All connections should use the same parameterization,
|
||||
;; to facilitate transferring continuations from one
|
||||
;; connection to another:
|
||||
(let ([paramz (current-parameterization)])
|
||||
;; Loop to handle connections:
|
||||
(let loop ()
|
||||
(with-handlers ([exn:fail:network? handle-exn])
|
||||
;; Make a custodian for the next session:
|
||||
(let ([c (make-custodian)])
|
||||
(parameterize ([current-custodian c])
|
||||
;; disable breaks during session set-up...
|
||||
(parameterize-break #f
|
||||
;; ... but enable breaks while blocked on an accept:
|
||||
(let-values ([(r w) ((if can-break?
|
||||
tcp-accept/enable-break
|
||||
tcp-accept)
|
||||
l)])
|
||||
;; Handler thread:
|
||||
(let ([t (thread (lambda ()
|
||||
;; First, install the parameterization
|
||||
;; used for all connections:
|
||||
(call-with-parameterization
|
||||
paramz
|
||||
(lambda ()
|
||||
;; Install this connection's custodian
|
||||
;; for this thread in the shared
|
||||
;; parameterization:
|
||||
(current-custodian c)
|
||||
;; Enable breaking:
|
||||
(when can-break?
|
||||
(break-enabled #t))
|
||||
;; Call the handler
|
||||
(handler r w)))))])
|
||||
;; Clean-up and timeout thread:
|
||||
(thread (lambda ()
|
||||
(sync/timeout connection-timeout t)
|
||||
(when (thread-running? t)
|
||||
;; Only happens if connection-timeout is not #f
|
||||
(break-thread t))
|
||||
(sync/timeout connection-timeout t)
|
||||
(custodian-shutdown-all c)))))))))
|
||||
(loop))))
|
||||
(lambda () (tcp-close l))))))
|
||||
(define/kw (run-server port-number handler connection-timeout
|
||||
#:optional
|
||||
[handle-exn void]
|
||||
[tcp-listen tcp-listen]
|
||||
[tcp-close tcp-close]
|
||||
[tcp-accept tcp-accept]
|
||||
[tcp-accept/enable-break tcp-accept/enable-break])
|
||||
(let ([l (tcp-listen port-number 5 #t)]
|
||||
[can-break? (break-enabled)])
|
||||
(dynamic-wind
|
||||
void
|
||||
(lambda ()
|
||||
;; All connections should use the same parameterization,
|
||||
;; to facilitate transferring continuations from one
|
||||
;; connection to another:
|
||||
(let ([paramz (current-parameterization)])
|
||||
;; Loop to handle connections:
|
||||
(let loop ()
|
||||
(with-handlers ([exn:fail:network? handle-exn])
|
||||
;; Make a custodian for the next session:
|
||||
(let ([c (make-custodian)])
|
||||
(parameterize ([current-custodian c])
|
||||
;; disable breaks during session set-up...
|
||||
(parameterize-break #f
|
||||
;; ... but enable breaks while blocked on an accept:
|
||||
(let-values ([(r w) ((if can-break?
|
||||
tcp-accept/enable-break
|
||||
tcp-accept)
|
||||
l)])
|
||||
;; Handler thread:
|
||||
(let ([t (thread (lambda ()
|
||||
;; First, install the parameterization
|
||||
;; used for all connections:
|
||||
(call-with-parameterization
|
||||
paramz
|
||||
(lambda ()
|
||||
;; Install this connection's custodian
|
||||
;; for this thread in the shared
|
||||
;; parameterization:
|
||||
(current-custodian c)
|
||||
;; Enable breaking:
|
||||
(when can-break?
|
||||
(break-enabled #t))
|
||||
;; Call the handler
|
||||
(handler r w)))))])
|
||||
;; Clean-up and timeout thread:
|
||||
(thread (lambda ()
|
||||
(sync/timeout connection-timeout t)
|
||||
(when (thread-running? t)
|
||||
;; Only happens if connection-timeout is not #f
|
||||
(break-thread t))
|
||||
(sync/timeout connection-timeout t)
|
||||
(custodian-shutdown-all c)))))))))
|
||||
(loop))))
|
||||
(lambda () (tcp-close l)))))
|
||||
|
||||
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Couroutine
|
||||
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; An X-coroutine-object is
|
||||
;; An X-coroutine-object is
|
||||
;; (make-coroutine-object thread semaphore channel channel X)
|
||||
(define-struct coroutine-object (worker can-stop-lock done-ch ex-ch result))
|
||||
|
||||
|
||||
;; coroutine : ((bool ->) -> X) -> X-coroutine-object
|
||||
(define (coroutine f)
|
||||
;;(printf "2. new coroutine~n")
|
||||
(let* ((can-stop-lock (make-semaphore 1))
|
||||
(done-ch (make-channel))
|
||||
(ex-ch (make-channel))
|
||||
(proceed-sema (make-semaphore))
|
||||
(stop-enabled? #t)
|
||||
(enable-stop
|
||||
(let* ([can-stop-lock (make-semaphore 1)]
|
||||
[done-ch (make-channel)]
|
||||
[ex-ch (make-channel)]
|
||||
[proceed-sema (make-semaphore)]
|
||||
[stop-enabled? #t]
|
||||
[enable-stop
|
||||
(lambda (enable?)
|
||||
;;(printf "3. enabling ~a~n" enable?)
|
||||
(cond
|
||||
((and enable? (not stop-enabled?))
|
||||
(semaphore-post can-stop-lock)
|
||||
(set! stop-enabled? #t))
|
||||
((and (not enable?) stop-enabled?)
|
||||
(semaphore-wait can-stop-lock)
|
||||
(set! stop-enabled? #f)))
|
||||
[(and enable? (not stop-enabled?))
|
||||
(semaphore-post can-stop-lock)
|
||||
(set! stop-enabled? #t)]
|
||||
[(and (not enable?) stop-enabled?)
|
||||
(semaphore-wait can-stop-lock)
|
||||
(set! stop-enabled? #f)])
|
||||
;;(printf "3. finished enabling~n")
|
||||
))
|
||||
(tid (thread (lambda ()
|
||||
(semaphore-wait proceed-sema)
|
||||
;;(printf "3. creating coroutine thread~n")
|
||||
(with-handlers (((lambda (exn) #t)
|
||||
(lambda (exn)
|
||||
(enable-stop #t)
|
||||
(channel-put ex-ch exn))))
|
||||
(let ([v (f enable-stop)])
|
||||
(enable-stop #t)
|
||||
(channel-put done-ch v)))))))
|
||||
(begin0
|
||||
(make-coroutine-object tid can-stop-lock done-ch ex-ch #f)
|
||||
(thread-suspend tid)
|
||||
(semaphore-post proceed-sema))))
|
||||
|
||||
)]
|
||||
[tid (thread (lambda ()
|
||||
(semaphore-wait proceed-sema)
|
||||
;;(printf "3. creating coroutine thread~n")
|
||||
(with-handlers ([(lambda (exn) #t)
|
||||
(lambda (exn)
|
||||
(enable-stop #t)
|
||||
(channel-put ex-ch exn))])
|
||||
(let ([v (f enable-stop)])
|
||||
(enable-stop #t)
|
||||
(channel-put done-ch v)))))])
|
||||
(begin0 (make-coroutine-object tid can-stop-lock done-ch ex-ch #f)
|
||||
(thread-suspend tid)
|
||||
(semaphore-post proceed-sema))))
|
||||
|
||||
;; coroutine : real-number X-coroutine-object -> bool
|
||||
(define (coroutine-run timeout w)
|
||||
(if (coroutine-object-worker w)
|
||||
#;(printf "2. starting coroutine~n")
|
||||
(let ((can-stop-lock (coroutine-object-can-stop-lock w))
|
||||
(worker (coroutine-object-worker w)))
|
||||
(thread-resume worker)
|
||||
(dynamic-wind
|
||||
void
|
||||
;; Let the co-routine run...
|
||||
(lambda ()
|
||||
(sync (choice-evt (wrap-evt (alarm-evt (+ timeout (current-inexact-milliseconds)))
|
||||
(lambda (x)
|
||||
#;(printf "2. alarm-evt~n")
|
||||
(semaphore-wait can-stop-lock)
|
||||
(thread-suspend worker)
|
||||
(semaphore-post can-stop-lock)
|
||||
#f))
|
||||
(wrap-evt (coroutine-object-done-ch w)
|
||||
(lambda (res)
|
||||
#;(printf "2. coroutine-done-evt~n")
|
||||
(set-coroutine-object-result! w res)
|
||||
(coroutine-kill w)
|
||||
#t))
|
||||
(wrap-evt (coroutine-object-ex-ch w)
|
||||
(lambda (exn)
|
||||
#;(printf "2. ex-evt~n")
|
||||
(coroutine-kill w)
|
||||
(raise exn))))))
|
||||
;; In case we escape through a break:
|
||||
(lambda ()
|
||||
(when (thread-running? worker)
|
||||
(semaphore-wait can-stop-lock)
|
||||
(thread-suspend worker)
|
||||
(semaphore-post can-stop-lock)))))
|
||||
#t))
|
||||
(let ([can-stop-lock (coroutine-object-can-stop-lock w)]
|
||||
[worker (coroutine-object-worker w)])
|
||||
#;(printf "2. starting coroutine~n")
|
||||
(thread-resume worker)
|
||||
(dynamic-wind
|
||||
void
|
||||
;; Let the co-routine run...
|
||||
(lambda ()
|
||||
(sync (choice-evt (wrap-evt (alarm-evt (+ timeout (current-inexact-milliseconds)))
|
||||
(lambda (x)
|
||||
#;(printf "2. alarm-evt~n")
|
||||
(semaphore-wait can-stop-lock)
|
||||
(thread-suspend worker)
|
||||
(semaphore-post can-stop-lock)
|
||||
#f))
|
||||
(wrap-evt (coroutine-object-done-ch w)
|
||||
(lambda (res)
|
||||
#;(printf "2. coroutine-done-evt~n")
|
||||
(set-coroutine-object-result! w res)
|
||||
(coroutine-kill w)
|
||||
#t))
|
||||
(wrap-evt (coroutine-object-ex-ch w)
|
||||
(lambda (exn)
|
||||
#;(printf "2. ex-evt~n")
|
||||
(coroutine-kill w)
|
||||
(raise exn))))))
|
||||
;; In case we escape through a break:
|
||||
(lambda ()
|
||||
(when (thread-running? worker)
|
||||
(semaphore-wait can-stop-lock)
|
||||
(thread-suspend worker)
|
||||
(semaphore-post can-stop-lock)))))
|
||||
#t))
|
||||
|
||||
;; coroutine-result : X-coroutine-object -> X
|
||||
(define (coroutine-result w)
|
||||
(coroutine-object-result w))
|
||||
|
||||
|
||||
;; coroutine-kill : X-coroutine-object ->
|
||||
(define (coroutine-kill w)
|
||||
(set-coroutine-object-can-stop-lock! w #f)
|
||||
|
@ -213,9 +207,8 @@
|
|||
(coroutine-object? x))
|
||||
|
||||
(provide coroutine?)
|
||||
(provide/contract
|
||||
(provide/contract
|
||||
(coroutine (((any/c . -> . any) . -> . any) . -> . coroutine?))
|
||||
(coroutine-run (real? coroutine? . -> . boolean?))
|
||||
(coroutine-result (coroutine? . -> . any))
|
||||
(coroutine-kill (coroutine? . -> . any))))
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user