From e92253f53100cdba3a9ceb94dfd5b0594a253af7 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Fri, 21 Apr 2006 17:27:01 +0000 Subject: [PATCH] more concurrency repairs svn: r2735 --- collects/openssl/mzssl2.ss | 159 +++++++++++++++++++++++-------------- 1 file changed, 100 insertions(+), 59 deletions(-) diff --git a/collects/openssl/mzssl2.ss b/collects/openssl/mzssl2.ss index aa10566fdc..dcb67bd5da 100644 --- a/collects/openssl/mzssl2.ss +++ b/collects/openssl/mzssl2.ss @@ -3,8 +3,14 @@ ;; It will soon replace "mzssl.c". ;; Warn clients: when a (non-blocking) write fails to write all the -;; data, the stream is actually committed to writing the given data -;; in the future. (This requirement comes from the SSL library.) +;; data, the stream is actually committed to writing the given data +;; in the future. (This requirement comes from the SSL library.) + +;; Another warning: data that is written and not buffered may still be +;; in flight between MzScheme and the underlying ports. A `flush-output' +;; won't return until sent data is actually in the underlying port. +;; (This is due to the fact that unbuffered data cannot be written +;; without blocking.) (module mzssl2 mzscheme (require (lib "foreign.ss") @@ -331,17 +337,15 @@ ;; SSL ports (define (mzssl-release mzssl) - (call-with-semaphore - (mzssl-lock mzssl) - (lambda () - (set-mzssl-refcount! mzssl (sub1 (mzssl-refcount mzssl))) - (when (zero? (mzssl-refcount mzssl)) - (atomically - (set-box! (mzssl-finalizer-cancel mzssl) #f) - (SSL_free (mzssl-ssl mzssl))) - (when (mzssl-close? mzssl) - (close-input-port (mzssl-i mzssl)) - (close-output-port (mzssl-o mzssl))))))) + ;; Lock must be held + (set-mzssl-refcount! mzssl (sub1 (mzssl-refcount mzssl))) + (when (zero? (mzssl-refcount mzssl)) + (atomically + (set-box! (mzssl-finalizer-cancel mzssl) #f) + (SSL_free (mzssl-ssl mzssl))) + (when (mzssl-close? mzssl) + (close-input-port (mzssl-i mzssl)) + (close-output-port (mzssl-o mzssl))))) (define (pump-input-once mzssl need-progress?/out) (let ([buffer (mzssl-buffer mzssl)] @@ -450,7 +454,10 @@ #f ;; close proc: (lambda () - (mzssl-release mzssl)))) + (call-with-semaphore + (mzssl-lock mzssl) + (lambda () + (mzssl-release mzssl)))))) (define (flush-ssl mzssl enable-break?) ;; Make sure that this SSL connection has said everything that it @@ -494,13 +501,12 @@ (kernel-thread (lambda () (let loop () (sync flush-ch) - (let flush-loop () - (sync flush-ch) - (semaphore-wait (mzssl-lock mzssl)) - (flush-ssl mzssl #f) - (set-mzssl-flushing?! mzssl #f) - (semaphore-post (mzssl-lock mzssl)) - (loop))))) + (semaphore-wait (mzssl-lock mzssl)) + (flush-ssl mzssl #f) + (semaphore-post (mzssl-flushing? mzssl)) + (set-mzssl-flushing?! mzssl #f) + (semaphore-post (mzssl-lock mzssl)) + (loop)))) ;; Create the output port: (make-output-port (format "SSL ~a" (object-name (mzssl-o mzssl))) @@ -521,10 +527,21 @@ (let ([n (SSL_write (mzssl-ssl mzssl) xfer-buffer len)]) (if (n . > . 0) (begin - ;; Start flush in bg thread, if necessary: - (unless (and (not non-block?) - (eq? buffer-mode 'block)) - (channel-put flush-ch #t)) + ;; Start flush as necessary: + (cond + [non-block? + ;; We can't block, so start the background thread + ;; to flush from SSL to the underlying ports: + (set-mzssl-flushing?! mzssl (make-semaphore)) + (channel-put flush-ch #t)] + [(eq? buffer-mode 'block) + ;; Otherwise, we could have buffered it, so it might as + ;; well sit between SSL and the underlying ports. + (void)] + [else + ;; Since we're allowed to block but not buffer, try to + ;; flush all the way through: + (flush-ssl mzssl enable-break?)]) n) (let ([err (SSL_get_error (mzssl-ssl mzssl) n)]) (cond @@ -548,10 +565,10 @@ [top-write (lambda (buffer s e non-block? enable-break?) (if (mzssl-flushing? mzssl) - ;; Oops -- wait until flush done + ;; Need to wait until flush done (if (= s e) - ;; Ok, it's as good as flushed: - 0 + ;; Let the background flush finish: + (list (semaphore-peek-evt (mzssl-flushing? mzssl))) ;; Try again later: (wrap-evt always-evt (lambda (v) #f))) ;; Normal write (since no flush is active): @@ -562,38 +579,62 @@ (lambda () (wrap-evt (semaphore-peek-evt (mzssl-lock mzssl)) (lambda (x) #f)))]) (lambda (buffer s e non-block? enable-break?) - (call-with-semaphore - (mzssl-lock mzssl) - top-write - lock-unavailable - buffer s e non-block? enable-break?))) + (let ([v (call-with-semaphore + (mzssl-lock mzssl) + top-write + lock-unavailable + buffer s e non-block? enable-break?)]) + (if (pair? v) + (begin + ;; Wait on background flush to implement requested flush + (sync (car v)) + 0) + v)))) ;; close proc: - (lambda () - ;; issue shutdown (i.e., EOF on read end) - (let loop ([cnt 0]) - (let ([out-blocked? (pump-output mzssl)]) - (let ([n (SSL_shutdown (mzssl-ssl mzssl))]) - (unless (= n 1) - (let ([err (SSL_get_error (mzssl-ssl mzssl) n)]) - (cond - [(= err SSL_ERROR_WANT_READ) - (pump-input-once mzssl (if out-blocked? (mzssl-o mzssl) #t)) - (loop cnt)] - [(= err SSL_ERROR_WANT_WRITE) - (pump-output-once mzssl #t #f) - (loop cnt)] - [else - (if (= n 0) - ;; When 0 is returned, the SSL object no longer correctly - ;; reports what it wants (e.g., a write). If pumping blocked - ;; or if this is our first time around, then wait on the - ;; underlying output and try again. - (when (or (zero? cnt) out-blocked?) - (sync (mzssl-o mzssl)) - (loop (add1 cnt))) - (error 'read-bytes "SSL shutdown failed ~a" - (get-error-message (ERR_get_error))))])))))) - (mzssl-release mzssl)) + (letrec ([do-close + (lambda () + (if (mzssl-flushing? mzssl) + (semaphore-peek-evt (mzssl-flushing? mzssl)) + ;; issue shutdown (i.e., EOF on read end) + (begin + (let loop ([cnt 0]) + (let ([out-blocked? (pump-output mzssl)]) + (let ([n (SSL_shutdown (mzssl-ssl mzssl))]) + (unless (= n 1) + (let ([err (SSL_get_error (mzssl-ssl mzssl) n)]) + (cond + [(= err SSL_ERROR_WANT_READ) + (pump-input-once mzssl (if out-blocked? (mzssl-o mzssl) #t)) + (loop cnt)] + [(= err SSL_ERROR_WANT_WRITE) + (pump-output-once mzssl #t #f) + (loop cnt)] + [else + (if (= n 0) + ;; When 0 is returned, the SSL object no longer correctly + ;; reports what it wants (e.g., a write). If pumping blocked + ;; or if this is our first time around, then wait on the + ;; underlying output and try again. + (when (or (zero? cnt) out-blocked?) + (flush-ssl mzssl #f) + (loop (add1 cnt))) + (error 'read-bytes "SSL shutdown failed ~a" + (get-error-message (ERR_get_error))))])))))) + (mzssl-release mzssl) + #f)))] + [close-loop + (lambda () + (let ([v (call-with-semaphore + (mzssl-lock mzssl) + do-close)]) + (if v + (begin + ;; Wait for background flush to finish: + (sync v) + (close-loop)) + v)))]) + (lambda () + (close-loop))) ;; Unimplemented port methods: #f #f #f #f void 1