more concurrency repairs

svn: r2735
This commit is contained in:
Matthew Flatt 2006-04-21 17:27:01 +00:00
parent 4d867f1cb0
commit e92253f531

View File

@ -6,6 +6,12 @@
;; data, the stream is actually committed to writing the given data ;; data, the stream is actually committed to writing the given data
;; in the future. (This requirement comes from the SSL library.) ;; in the future. (This requirement comes from the SSL library.)
;; Another warning: data that is written and not buffered may still be
;; in flight between MzScheme and the underlying ports. A `flush-output'
;; won't return until sent data is actually in the underlying port.
;; (This is due to the fact that unbuffered data cannot be written
;; without blocking.)
(module mzssl2 mzscheme (module mzssl2 mzscheme
(require (lib "foreign.ss") (require (lib "foreign.ss")
(lib "port.ss") (lib "port.ss")
@ -331,9 +337,7 @@
;; SSL ports ;; SSL ports
(define (mzssl-release mzssl) (define (mzssl-release mzssl)
(call-with-semaphore ;; Lock must be held
(mzssl-lock mzssl)
(lambda ()
(set-mzssl-refcount! mzssl (sub1 (mzssl-refcount mzssl))) (set-mzssl-refcount! mzssl (sub1 (mzssl-refcount mzssl)))
(when (zero? (mzssl-refcount mzssl)) (when (zero? (mzssl-refcount mzssl))
(atomically (atomically
@ -341,7 +345,7 @@
(SSL_free (mzssl-ssl mzssl))) (SSL_free (mzssl-ssl mzssl)))
(when (mzssl-close? mzssl) (when (mzssl-close? mzssl)
(close-input-port (mzssl-i mzssl)) (close-input-port (mzssl-i mzssl))
(close-output-port (mzssl-o mzssl))))))) (close-output-port (mzssl-o mzssl)))))
(define (pump-input-once mzssl need-progress?/out) (define (pump-input-once mzssl need-progress?/out)
(let ([buffer (mzssl-buffer mzssl)] (let ([buffer (mzssl-buffer mzssl)]
@ -450,7 +454,10 @@
#f #f
;; close proc: ;; close proc:
(lambda () (lambda ()
(mzssl-release mzssl)))) (call-with-semaphore
(mzssl-lock mzssl)
(lambda ()
(mzssl-release mzssl))))))
(define (flush-ssl mzssl enable-break?) (define (flush-ssl mzssl enable-break?)
;; Make sure that this SSL connection has said everything that it ;; Make sure that this SSL connection has said everything that it
@ -493,14 +500,13 @@
;; runs with the highest possible custodian: ;; runs with the highest possible custodian:
(kernel-thread (lambda () (kernel-thread (lambda ()
(let loop () (let loop ()
(sync flush-ch)
(let flush-loop ()
(sync flush-ch) (sync flush-ch)
(semaphore-wait (mzssl-lock mzssl)) (semaphore-wait (mzssl-lock mzssl))
(flush-ssl mzssl #f) (flush-ssl mzssl #f)
(semaphore-post (mzssl-flushing? mzssl))
(set-mzssl-flushing?! mzssl #f) (set-mzssl-flushing?! mzssl #f)
(semaphore-post (mzssl-lock mzssl)) (semaphore-post (mzssl-lock mzssl))
(loop))))) (loop))))
;; Create the output port: ;; Create the output port:
(make-output-port (make-output-port
(format "SSL ~a" (object-name (mzssl-o mzssl))) (format "SSL ~a" (object-name (mzssl-o mzssl)))
@ -521,10 +527,21 @@
(let ([n (SSL_write (mzssl-ssl mzssl) xfer-buffer len)]) (let ([n (SSL_write (mzssl-ssl mzssl) xfer-buffer len)])
(if (n . > . 0) (if (n . > . 0)
(begin (begin
;; Start flush in bg thread, if necessary: ;; Start flush as necessary:
(unless (and (not non-block?) (cond
(eq? buffer-mode 'block)) [non-block?
(channel-put flush-ch #t)) ;; We can't block, so start the background thread
;; to flush from SSL to the underlying ports:
(set-mzssl-flushing?! mzssl (make-semaphore))
(channel-put flush-ch #t)]
[(eq? buffer-mode 'block)
;; Otherwise, we could have buffered it, so it might as
;; well sit between SSL and the underlying ports.
(void)]
[else
;; Since we're allowed to block but not buffer, try to
;; flush all the way through:
(flush-ssl mzssl enable-break?)])
n) n)
(let ([err (SSL_get_error (mzssl-ssl mzssl) n)]) (let ([err (SSL_get_error (mzssl-ssl mzssl) n)])
(cond (cond
@ -548,10 +565,10 @@
[top-write [top-write
(lambda (buffer s e non-block? enable-break?) (lambda (buffer s e non-block? enable-break?)
(if (mzssl-flushing? mzssl) (if (mzssl-flushing? mzssl)
;; Oops -- wait until flush done ;; Need to wait until flush done
(if (= s e) (if (= s e)
;; Ok, it's as good as flushed: ;; Let the background flush finish:
0 (list (semaphore-peek-evt (mzssl-flushing? mzssl)))
;; Try again later: ;; Try again later:
(wrap-evt always-evt (lambda (v) #f))) (wrap-evt always-evt (lambda (v) #f)))
;; Normal write (since no flush is active): ;; Normal write (since no flush is active):
@ -562,14 +579,24 @@
(lambda () (wrap-evt (semaphore-peek-evt (mzssl-lock mzssl)) (lambda () (wrap-evt (semaphore-peek-evt (mzssl-lock mzssl))
(lambda (x) #f)))]) (lambda (x) #f)))])
(lambda (buffer s e non-block? enable-break?) (lambda (buffer s e non-block? enable-break?)
(call-with-semaphore (let ([v (call-with-semaphore
(mzssl-lock mzssl) (mzssl-lock mzssl)
top-write top-write
lock-unavailable lock-unavailable
buffer s e non-block? enable-break?))) buffer s e non-block? enable-break?)])
(if (pair? v)
(begin
;; Wait on background flush to implement requested flush
(sync (car v))
0)
v))))
;; close proc: ;; close proc:
(letrec ([do-close
(lambda () (lambda ()
(if (mzssl-flushing? mzssl)
(semaphore-peek-evt (mzssl-flushing? mzssl))
;; issue shutdown (i.e., EOF on read end) ;; issue shutdown (i.e., EOF on read end)
(begin
(let loop ([cnt 0]) (let loop ([cnt 0])
(let ([out-blocked? (pump-output mzssl)]) (let ([out-blocked? (pump-output mzssl)])
(let ([n (SSL_shutdown (mzssl-ssl mzssl))]) (let ([n (SSL_shutdown (mzssl-ssl mzssl))])
@ -589,11 +616,25 @@
;; or if this is our first time around, then wait on the ;; or if this is our first time around, then wait on the
;; underlying output and try again. ;; underlying output and try again.
(when (or (zero? cnt) out-blocked?) (when (or (zero? cnt) out-blocked?)
(sync (mzssl-o mzssl)) (flush-ssl mzssl #f)
(loop (add1 cnt))) (loop (add1 cnt)))
(error 'read-bytes "SSL shutdown failed ~a" (error 'read-bytes "SSL shutdown failed ~a"
(get-error-message (ERR_get_error))))])))))) (get-error-message (ERR_get_error))))]))))))
(mzssl-release mzssl)) (mzssl-release mzssl)
#f)))]
[close-loop
(lambda ()
(let ([v (call-with-semaphore
(mzssl-lock mzssl)
do-close)])
(if v
(begin
;; Wait for background flush to finish:
(sync v)
(close-loop))
v)))])
(lambda ()
(close-loop)))
;; Unimplemented port methods: ;; Unimplemented port methods:
#f #f #f #f #f #f #f #f
void 1 void 1