.
original commit: 8073caef42f71b954df36522e1cfcec22cf46f92
This commit is contained in:
parent
bc93ee7b9f
commit
ba5acd44ee
|
@ -245,15 +245,16 @@
|
|||
(cond
|
||||
[(null? l) null]
|
||||
[(pair? l)
|
||||
(let* ([keep? (f (car l))]
|
||||
[frest (loop (cdr l))])
|
||||
(let* ([keep? (f (car l))])
|
||||
(if keep?
|
||||
(cons (car l) frest)
|
||||
frest))]
|
||||
[else (raise (make-exn:application:mismatch
|
||||
(format "filter: second argument must be a (proper) list; given ~e" list)
|
||||
(current-continuation-marks)
|
||||
list))])))))
|
||||
(cons (car l) (loop (cdr l)))
|
||||
(loop (cdr l))))]
|
||||
[else (raise-type-error
|
||||
'filter
|
||||
"proper list"
|
||||
1 ; i.e., 2nd argument
|
||||
f
|
||||
list)])))))
|
||||
|
||||
(define first (polymorphic (lambda (x)
|
||||
(unless (pair? x)
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
(provide consumer-thread
|
||||
with-semaphore
|
||||
semaphore-wait-multiple
|
||||
|
||||
dynamic-disable-break
|
||||
dynamic-enable-break
|
||||
|
@ -75,65 +74,6 @@
|
|||
(semaphore-wait s)
|
||||
(begin0 (f)
|
||||
(semaphore-post s))))
|
||||
|
||||
(define semaphore-wait-multiple
|
||||
(case-lambda
|
||||
[(semaphores) (semaphore-wait-multiple semaphores #f #f)]
|
||||
[(semaphores timeout) (semaphore-wait-multiple semaphores timeout #f)]
|
||||
[(semaphores timeout allow-break?)
|
||||
(let ([break-enabled? (or allow-break? (break-enabled))])
|
||||
(parameterize ([break-enabled #f])
|
||||
(for-each
|
||||
(lambda (s)
|
||||
(or (semaphore? s)
|
||||
(raise-type-error 'semaphore-wait-multiple "list of semaphores" semaphores)))
|
||||
semaphores)
|
||||
(or (not timeout) (real? timeout) (>= timeout 0)
|
||||
(raise-type-error 'semaphore-wait-multiple "positive real number" timeout))
|
||||
(let* ([result-l null]
|
||||
[ok? #f]
|
||||
[set-l (make-semaphore 1)]
|
||||
[one-done (make-semaphore)]
|
||||
[threads (let loop ([l semaphores])
|
||||
(if (null? l)
|
||||
null
|
||||
(cons (let ([s (car l)])
|
||||
(thread (lambda ()
|
||||
(let/ec
|
||||
k
|
||||
(current-exception-handler k)
|
||||
(semaphore-wait/enable-break s)
|
||||
(with-semaphore
|
||||
set-l
|
||||
(lambda () (set! result-l
|
||||
(cons s result-l))))
|
||||
(semaphore-post one-done)))))
|
||||
(loop (cdr l)))))]
|
||||
[timer-thread (if timeout
|
||||
(thread (lambda () (sleep timeout) (semaphore-post one-done)))
|
||||
#f)])
|
||||
(dynamic-wind
|
||||
void
|
||||
(lambda ()
|
||||
; wait until someone is done
|
||||
((if break-enabled? semaphore-wait/enable-break semaphore-wait) one-done)
|
||||
(set! ok? #t))
|
||||
(lambda ()
|
||||
; tell everyone to stop
|
||||
(for-each (lambda (th) (break-thread th)) threads)
|
||||
(when timer-thread (break-thread timer-thread))
|
||||
; wait until everyone's done
|
||||
(for-each thread-wait threads)
|
||||
; If more that too manay suceeded, repost to the extras
|
||||
(let ([extras (if ok?
|
||||
(if (null? result-l)
|
||||
null
|
||||
(cdr result-l))
|
||||
result-l)])
|
||||
(for-each (lambda (s) (semaphore-post s)) extras))))
|
||||
(if (null? result-l)
|
||||
#f
|
||||
(car result-l)))))]))
|
||||
|
||||
(define dynamic-enable-break
|
||||
(polymorphic
|
||||
|
@ -218,16 +158,13 @@
|
|||
(when can-break?
|
||||
(break-enabled #t))
|
||||
(handler r w)))])
|
||||
;; Clean-up thread:
|
||||
;; Clean-up and timeout thread:
|
||||
(thread (lambda ()
|
||||
(thread-wait t)
|
||||
(custodian-shutdown-all c)))
|
||||
;; Timeout thread, if any:
|
||||
(when connection-timeout
|
||||
(thread (lambda ()
|
||||
(sleep connection-timeout)
|
||||
(break-thread t)
|
||||
(sleep connection-timeout)
|
||||
(kill-thread t))))))))))
|
||||
(object-wait-multiple connection-timeout t)
|
||||
(when (thread-running? t)
|
||||
;; Only happens if connection-timeout is not #f
|
||||
(break-thread t))
|
||||
(object-wait-multiple connection-timeout t)
|
||||
(custodian-shutdown-all c)))))))))
|
||||
(loop)))
|
||||
(lambda () (tcp-close l))))))
|
||||
|
|
Loading…
Reference in New Issue
Block a user