From f142a1c5f2e70b0daa6177c3a239ff8f47eb1b95 Mon Sep 17 00:00:00 2001 From: Ryan Culpepper Date: Sun, 15 Jan 2012 23:18:03 -0700 Subject: [PATCH] db: clean up communication, locking Disconnect on break exn within lock; other break-safety fixes. --- collects/db/TODO | 2 + collects/db/private/generic/common.rkt | 30 ++- collects/db/private/generic/connect-util.rkt | 6 +- collects/db/private/mysql/connection.rkt | 198 ++++++++++-------- collects/db/private/mysql/message.rkt | 7 +- collects/db/private/odbc/connection.rkt | 35 ++-- collects/db/private/postgresql/connection.rkt | 166 ++++++++------- collects/db/private/sqlite3/connection.rkt | 27 ++- 8 files changed, 271 insertions(+), 200 deletions(-) diff --git a/collects/db/TODO b/collects/db/TODO index 94ad041425..25f76e28dd 100644 --- a/collects/db/TODO +++ b/collects/db/TODO @@ -111,3 +111,5 @@ Misc so a statement can be invalidated between cache retrieval and execution - support logging + +- audit: make sure no output within atomic section (in drracket, may thread switch) diff --git a/collects/db/private/generic/common.rkt b/collects/db/private/generic/common.rkt index 31ece5ac30..d7544844b3 100644 --- a/collects/db/private/generic/common.rkt +++ b/collects/db/private/generic/common.rkt @@ -118,9 +118,11 @@ (define/public-final (call-with-lock* who proc hopeless require-connected?) (let ([me (thread-dead-evt (current-thread))] + [eb? (break-enabled)] [result (sync outer-lock lock-holder)]) (cond [(eq? result outer-lock) ;; Got past outer stage + (break-enabled #f) (let ([proceed? (begin (start-atomic) (let ([proceed? (semaphore-try-wait? inner-lock)]) @@ -133,21 +135,30 @@ ;; Acquired lock ;; - lock-holder = me, and outer-lock is closed again (when (and require-connected? (not (connected?))) - (unlock) + (break-enabled eb?) + (unlock #f) (error/not-connected who)) - (with-handlers ([values (lambda (e) (unlock) (raise e))]) - (begin0 (proc) (unlock)))] + (with-handlers ([(lambda (e) #t) + (lambda (e) + (when (exn:break? e) (on-break-within-lock)) + (unlock #f) + (raise e))]) + (break-enabled eb?) + (begin0 (proc) (unlock #t)))] [else ;; Didn't acquire lock; retry + (break-enabled eb?) (call-with-lock* who proc hopeless require-connected?)]))] [(eq? result lock-holder) ;; Thread holding lock is dead (if hopeless (hopeless) (error/hopeless who))] + [(eq? me lock-holder) + (error/internal who "attempted to recursively acquire lock")] [else ;; lock-holder was stale; retry (call-with-lock* who proc hopeless require-connected?)]))) - (define/private (unlock) + (define/private (unlock run-async-calls?) (let ([async-calls (reverse delayed-async-calls)]) (set! delayed-async-calls null) (start-atomic) @@ -155,14 +166,21 @@ (semaphore-post inner-lock) (semaphore-post outer-sema) (end-atomic) - (for-each call-with-continuation-barrier async-calls))) + (when run-async-calls? + (for-each call-with-continuation-barrier async-calls)))) ;; needs overriding (define/public (connected?) #f) - (define/public-final (add-delayed-call! proc) + (define/public (add-delayed-call! proc) (set! delayed-async-calls (cons proc delayed-async-calls))) + ;; on-break-within-lock : -> void + ;; Called before unlock; makes it easy to disconnect on any break + ;; within lock. + (define/public (on-break-within-lock) + (void)) + (super-new))) ;; ---------------------------------------- diff --git a/collects/db/private/generic/connect-util.rkt b/collects/db/private/generic/connect-util.rkt index 372f0dd2d4..a19b277ad3 100644 --- a/collects/db/private/generic/connect-util.rkt +++ b/collects/db/private/generic/connect-util.rkt @@ -119,7 +119,9 @@ ;; timeout? = if connection open, then wait longer (let* ([c (hash-ref key=>conn key #f)] [in-trans? (with-handlers ([exn:fail? (lambda (e) #f)]) - (and c (send c transaction-status 'virtual-connection)))]) + (and c + (send c connected?) + (send c transaction-status 'virtual-connection)))]) (cond [(not c) (void)] [(and timeout? in-trans?) (hash-set! alarms c (fresh-alarm-for key))] @@ -182,7 +184,7 @@ (#t '_ (fetch/cursor fsym stmt fetch-size)) (#t '_ (start-transaction fsym isolation cwt?)) (#f (void) (end-transaction fsym mode cwt?)) - (#f #f (transaction-status fsym)) + (#t '_ (transaction-status fsym)) (#t '_ (list-tables fsym schema))) (define/public (get-base) diff --git a/collects/db/private/mysql/connection.rkt b/collects/db/private/mysql/connection.rkt index 65cd9b5cfb..d305453357 100644 --- a/collects/db/private/mysql/connection.rkt +++ b/collects/db/private/mysql/connection.rkt @@ -35,44 +35,61 @@ (super-new) - ;; with-disconnect-on-error - (define-syntax-rule (with-disconnect-on-error . body) - (with-handlers ([exn:fail? (lambda (e) (disconnect* #f) (raise e))]) - . body)) - ;; ======================================== ;; == Communication - ;; (Must be called with lock acquired.) + #| + During initial setup, okay to send and recv directly, since reference + to connection does not escape to user. In particular, no danger of trying + to start a new exchange on top of an incomplete failed one. + + After initial setup, communication can only happen within lock, and any + error (other than exn:fail:sql) that occurs between sending the message + buffer (flush-message-buffer) and receiving the last message (recv) + must cause the connection to disconnect. Such errors include communication + errors and breaks. + |# + + (define msg-buffer null) (define next-msg-num 0) (define/private (fresh-exchange) + (set! msg-buffer null) (set! next-msg-num 0)) + ;; buffer-message : message -> void + (define/private (buffer-message msg) + (dprintf " >> ~s\n" msg) + (set! msg-buffer (cons (cons msg next-msg-num) msg-buffer)) + (set! next-msg-num (add1 next-msg-num))) + + ;; flush-message-buffer : -> void + (define/private (flush-message-buffer) + (for ([msg+num (in-list (reverse msg-buffer))]) + (write-packet outport (car msg+num) (cdr msg+num))) + (flush-output outport)) + ;; send-message : message -> void (define/private (send-message msg) (buffer-message msg) (flush-message-buffer)) - ;; buffer-message : message -> void - (define/private (buffer-message msg) - (dprintf " >> ~s\n" msg) - (with-disconnect-on-error - (write-packet outport msg next-msg-num) - (set! next-msg-num (add1 next-msg-num)))) - - ;; flush-message-buffer : -> void - (define/private (flush-message-buffer) - (with-disconnect-on-error - (flush-output outport))) + (define/private (call-with-sync fsym proc) + (with-handlers ([(lambda (e) #t) + (lambda (e) + ;; Anything but exn:fail:sql (raised by recv-message) indicates + ;; a communication error. + (unless (exn:fail:sql? e) + (disconnect* #f)) + (raise e))]) + (flush-message-buffer) + (proc))) ;; recv : symbol/#f [(list-of symbol)] -> message ;; Automatically handles asynchronous messages (define/private (recv fsym expectation [field-dvecs #f]) - (define r - (with-disconnect-on-error - (recv* fsym expectation field-dvecs))) + (define r (recv* fsym expectation field-dvecs)) (when (error-packet? r) (raise-backend-error fsym r)) r) @@ -126,36 +143,34 @@ (err next)]) next)) + (define/override (on-break-within-lock) + (disconnect* #f)) + ;; ======================================== ;; Connection management - ;; disconnect : -> (void) + ;; disconnect : -> void (define/public (disconnect) - (disconnect* #t)) + (when (connected?) + (call-with-lock* 'disconnect + (lambda () (disconnect* #t)) + (lambda () (disconnect* #f)) + #f))) - (define/private (disconnect* lock-not-held?) - (define (go politely?) - (dprintf " ** Disconnecting\n") - (let ([outport* outport] - [inport* inport]) - (when outport - (when politely? - (fresh-exchange) - (send-message (make-command-packet 'quit ""))) - (close-output-port outport) - (set! outport #f)) - (when inport - (close-input-port inport) - (set! inport #f)))) - ;; If we don't hold the lock, try to acquire it and disconnect politely. - ;; Except, if already disconnected, no need to acquire lock. - (cond [(and lock-not-held? (connected?)) - (call-with-lock* 'disconnect - (lambda () (go #t)) - (lambda () (go #f)) - #f)] - [else (go #f)])) + (define/private (disconnect* politely?) + (dprintf " ** Disconnecting\n") + (let ([outport* outport] + [inport* inport]) + (when outport* + (when politely? + (fresh-exchange) + (send-message (make-command-packet 'quit ""))) + (close-output-port outport*) + (set! outport #f)) + (when inport* + (close-input-port inport*) + (set! inport #f)))) ;; connected? : -> boolean (define/override (connected?) @@ -176,31 +191,30 @@ ;; start-connection-protocol : string/#f string string/#f -> void (define/public (start-connection-protocol dbname username password ssl ssl-context) - (with-disconnect-on-error - (fresh-exchange) - (let ([r (recv 'mysql-connect 'handshake)]) - (match r - [(struct handshake-packet (pver sver tid scramble capabilities charset status auth)) - (check-required-flags capabilities) - (unless (member auth '("mysql_native_password" #f)) - (uerror 'mysql-connect "unsupported authentication plugin: ~s" auth)) - (define do-ssl? - (and (case ssl ((yes optional) #t) ((no) #f)) - (memq 'ssl capabilities))) - (when (and (eq? ssl 'yes) (not do-ssl?)) - (uerror 'mysql-connect "server refused SSL connection")) - (define wanted-capabilities (desired-capabilities capabilities do-ssl? dbname)) - (when do-ssl? - (send-message (make-abbrev-client-auth-packet wanted-capabilities)) - (let-values ([(sin sout) - (ports->ssl-ports inport outport - #:mode 'connect - #:context ssl-context - #:close-original? #t)]) - (attach-to-ports sin sout))) - (authenticate wanted-capabilities username password dbname - (or auth "mysql_native_password") scramble)] - [_ (error/comm 'mysql-connect "during authentication")])))) + (fresh-exchange) + (let ([r (recv 'mysql-connect 'handshake)]) + (match r + [(struct handshake-packet (pver sver tid scramble capabilities charset status auth)) + (check-required-flags capabilities) + (unless (member auth '("mysql_native_password" #f)) + (uerror 'mysql-connect "unsupported authentication plugin: ~s" auth)) + (define do-ssl? + (and (case ssl ((yes optional) #t) ((no) #f)) + (memq 'ssl capabilities))) + (when (and (eq? ssl 'yes) (not do-ssl?)) + (uerror 'mysql-connect "server refused SSL connection")) + (define wanted-capabilities (desired-capabilities capabilities do-ssl? dbname)) + (when do-ssl? + (send-message (make-abbrev-client-auth-packet wanted-capabilities)) + (let-values ([(sin sout) + (ports->ssl-ports inport outport + #:mode 'connect + #:context ssl-context + #:close-original? #t)]) + (attach-to-ports sin sout))) + (authenticate wanted-capabilities username password dbname + (or auth "mysql_native_password") scramble)] + [_ (error/comm 'mysql-connect "during authentication")]))) (define/private (authenticate capabilities username password dbname auth-plugin scramble) (let loop ([auth-plugin auth-plugin] [scramble scramble] [first? #t]) @@ -276,7 +290,8 @@ (let ([wbox (and warnings? (box 0))]) (fresh-exchange) (query1:enqueue stmt cursor?) - (begin0 (query1:collect fsym stmt (not (string? stmt)) cursor? wbox) + (begin0 (call-with-sync fsym + (lambda () (query1:collect fsym stmt (not (string? stmt)) cursor? wbox))) (when (and warnings? (not (zero? (unbox wbox)))) (fetch-warnings fsym))))) @@ -307,11 +322,11 @@ [id (send pst get-handle)] [params (statement-binding-params stmt)] [null-map (map sql-null? params)]) - (send-message + (buffer-message (let ([flags (if cursor? '(cursor/read-only) '())]) (make-execute-packet id flags null-map params))))] [else ;; string - (send-message (make-command-packet 'query stmt))])) + (buffer-message (make-command-packet 'query stmt))])) ;; query1:collect : symbol bool -> QueryResult stream (define/private (query1:collect fsym stmt binary? cursor? wbox) @@ -379,8 +394,9 @@ [else (let ([wbox (box 0)]) (fresh-exchange) - (send-message (make-fetch-packet (send pst get-handle) fetch-size)) - (begin0 (query1:get-rows fsym field-dvecs #t wbox end-box) + (buffer-message (make-fetch-packet (send pst get-handle) fetch-size)) + (begin0 (call-with-sync fsym + (lambda () (query1:get-rows fsym field-dvecs #t wbox end-box))) (when (not (zero? (unbox wbox))) (fetch-warnings fsym))))])))))) @@ -390,22 +406,24 @@ (define/override (prepare1* fsym stmt close-on-exec? stmt-type) (fresh-exchange) - (send-message (make-command-packet 'statement-prepare stmt)) - (let ([r (recv fsym 'prep-ok)]) - (match r - [(struct ok-prepared-statement-packet (id fields params)) - (let ([param-dvecs - (if (zero? params) null (prepare1:get-field-descriptions fsym))] - [field-dvecs - (if (zero? fields) null (prepare1:get-field-descriptions fsym))]) - (new prepared-statement% - (handle id) - (close-on-exec? close-on-exec?) - (param-typeids (map field-dvec->typeid param-dvecs)) - (result-dvecs field-dvecs) - (stmt stmt) - (stmt-type stmt-type) - (owner this)))]))) + (buffer-message (make-command-packet 'statement-prepare stmt)) + (call-with-sync fsym + (lambda () + (let ([r (recv fsym 'prep-ok)]) + (match r + [(struct ok-prepared-statement-packet (id fields params)) + (let ([param-dvecs + (if (zero? params) null (prepare1:get-field-descriptions fsym))] + [field-dvecs + (if (zero? fields) null (prepare1:get-field-descriptions fsym))]) + (new prepared-statement% + (handle id) + (close-on-exec? close-on-exec?) + (param-typeids (map field-dvec->typeid param-dvecs)) + (result-dvecs field-dvecs) + (stmt stmt) + (stmt-type stmt-type) + (owner this)))]))))) (define/private (prepare1:get-field-descriptions fsym) (let ([r (recv fsym 'field)]) diff --git a/collects/db/private/mysql/message.rkt b/collects/db/private/mysql/message.rkt index ef789f0b16..ff949cabd2 100644 --- a/collects/db/private/mysql/message.rkt +++ b/collects/db/private/mysql/message.rkt @@ -397,12 +397,7 @@ Based on protocol documentation here: ;; [inp (subport in len)] [bs (read-bytes len in)] [inp (open-input-bytes bs)] - [msg - (with-handlers ([exn? - (lambda (e) - (eprintf "packet was: ~s\n" (bytes->list bs)) - (raise e))]) - (parse-packet/1 inp expect len field-dvecs))]) + [msg (parse-packet/1 inp expect len field-dvecs)]) (when (port-has-bytes? inp) (error/internal 'parse-packet "bytes left over after parsing ~s; bytes were: ~s" msg (io:read-bytes-to-eof inp))) diff --git a/collects/db/private/odbc/connection.rkt b/collects/db/private/odbc/connection.rkt index 6d14074f0e..c04452b7cf 100644 --- a/collects/db/private/odbc/connection.rkt +++ b/collects/db/private/odbc/connection.rkt @@ -51,6 +51,9 @@ check-valid-tx-status check-statement/tx) + (define/override (on-break-within-lock) + (disconnect*)) + (define/public (get-db fsym) (unless db (error/not-connected fsym)) @@ -477,23 +480,25 @@ (vector name type size digits))))) (define/public (disconnect) - (define (go) - (start-atomic) - (let ([db* db] - [env* env]) - (set! db #f) - (set! env #f) - (end-atomic) - (when db* - (let ([statements (hash-map statement-table (lambda (k v) k))]) - (for ([pst (in-list statements)]) - (free-statement* 'disconnect pst)) - (handle-status 'disconnect (SQLDisconnect db*) db*) - (handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_DBC db*)) - (handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_ENV env*)) - (void))))) + (define (go) (disconnect*)) (call-with-lock* 'disconnect go go #f)) + (define/private (disconnect*) + (start-atomic) + (let ([db* db] + [env* env]) + (set! db #f) + (set! env #f) + (end-atomic) + (when db* + (let ([statements (hash-map statement-table (lambda (k v) k))]) + (for ([pst (in-list statements)]) + (free-statement* 'disconnect pst)) + (handle-status 'disconnect (SQLDisconnect db*) db*) + (handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_DBC db*)) + (handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_ENV env*)) + (void))))) + (define/public (get-base) this) (define/public (free-statement pst need-lock?) diff --git a/collects/db/private/postgresql/connection.rkt b/collects/db/private/postgresql/connection.rkt index d55e1036f2..4c71371135 100644 --- a/collects/db/private/postgresql/connection.rkt +++ b/collects/db/private/postgresql/connection.rkt @@ -45,22 +45,58 @@ (super-new) - ;; with-disconnect-on-error - (define-syntax-rule (with-disconnect-on-error . body) - (with-handlers ([exn:fail? (lambda (e) (disconnect* #f) (raise e))]) - . body)) - ;; ======================================== ;; == Communication - ;; (Must be called with lock acquired.) + #| + During initial setup, okay to send and recv directly, since reference + to connection does not escape to user. In particular, no danger of trying + to start a new exchange on top of an incomplete failed one. - ;; raw-recv : -> message - (define/private (raw-recv) - (with-disconnect-on-error - (let ([r (parse-server-message inport)]) - (dprintf " << ~s\n" r) - r))) + After initial setup, communication can only happen within lock, and any + error (other than exn:fail:sql) that occurs between sending the message + buffer (flush-message-buffer) and receiving the last message (recv-message) + must cause the connection to disconnect. Such errors include communication + errors and breaks. + |# + + ;; message-buffer : reversed list of messages + (define message-buffer null) + + ;; fresh-exchange : -> void + (define/private (fresh-exchange) + (set! message-buffer null)) + + ;; buffer-message : message -> void + (define/private (buffer-message msg) + (dprintf " >> ~s\n" msg) + (set! message-buffer (cons msg message-buffer))) + + ;; flush-message-buffer : -> void + (define/private (flush-message-buffer) + (for ([msg (in-list (reverse message-buffer))]) + (write-message msg outport)) + (set! message-buffer null) + (flush-output outport)) + + ;; send-message : message -> void + (define/private (send-message msg) + (buffer-message msg) + (flush-message-buffer)) + + (define/private (call-with-sync fsym proc) + (buffer-message (make-Sync)) + (with-handlers ([(lambda (e) #t) + (lambda (e) + ;; Anything but exn:fail:sql (raised by recv-message) indicates + ;; a communication error. + ;; FIXME: alternatively, have check-ready-for-query set an ok flag + (unless (exn:fail:sql? e) + (disconnect* #f)) + (raise e))]) + (flush-message-buffer) + (begin0 (proc) + (check-ready-for-query fsym #f)))) ;; recv-message : symbol -> message (define/private (recv-message fsym) @@ -75,21 +111,11 @@ (recv-message fsym)] [else r]))) - ;; send-message : message -> void - (define/private (send-message msg) - (buffer-message msg) - (flush-message-buffer)) - - ;; buffer-message : message -> void - (define/private (buffer-message msg) - (dprintf " >> ~s\n" msg) - (with-disconnect-on-error - (write-message msg outport))) - - ;; flush-message-buffer : -> void - (define/private (flush-message-buffer) - (with-disconnect-on-error - (flush-output outport))) + ;; raw-recv : -> message + (define/private (raw-recv) + (let ([r (parse-server-message inport)]) + (dprintf " << ~s\n" r) + r)) ;; check-ready-for-query : symbol -> void (define/private (check-ready-for-query fsym or-eof?) @@ -104,6 +130,9 @@ [(and or-eof? (eof-object? r)) (void)] [else (error/comm fsym "expected ready")]))) + (define/override (on-break-within-lock) + (disconnect* #f)) + ;; == Asynchronous messages ;; handle-async-message : message -> void @@ -128,32 +157,27 @@ ;; == Connection management - ;; disconnect : [boolean] -> (void) + ;; disconnect : -> void (define/public (disconnect) - (disconnect* #t)) + (when (connected?) + (call-with-lock* 'disconnect + (lambda () (disconnect* #t)) + (lambda () (disconnect* #f)) + #f))) ;; disconnect* : boolean -> void - (define/private (disconnect* no-lock-held?) - (define (go politely?) - (dprintf " ** Disconnecting\n") - (let ([outport* outport] - [inport* inport]) - (when outport* - (when politely? - (send-message (make-Terminate))) - (close-output-port outport*) - (set! outport #f)) - (when inport* - (close-input-port inport*) - (set! inport #f)))) - ;; If we don't hold the lock, try to acquire it and disconnect politely. - ;; Except, if already disconnected, no need to acquire lock. - (cond [(and no-lock-held? (connected?)) - (call-with-lock* 'disconnect - (lambda () (go #t)) - (lambda () (go #f)) - #f)] - [else (go #f)])) + (define/private (disconnect* politely?) + (dprintf " ** Disconnecting\n") + (let ([outport* outport] + [inport* inport]) + (when outport* + (when politely? + (send-message (make-Terminate))) + (close-output-port outport*) + (set! outport #f)) + (when inport* + (close-input-port inport*) + (set! inport #f)))) ;; connected? : -> boolean (define/override (connected?) @@ -176,8 +200,7 @@ ;; start-connection-protocol : string string string/#f -> void (define/public (start-connection-protocol dbname username password) - (with-disconnect-on-error - (call-with-lock 'postgresql-connect + (call-with-lock 'postgresql-connect (lambda () (send-message (make-StartupMessage @@ -185,7 +208,7 @@ (cons "database" dbname) (cons "client_encoding" "UTF8") (cons "DateStyle" "ISO, MDY")))) - (connect:expect-auth username password))))) + (connect:expect-auth username password)))) ;; connect:expect-auth : string/#f -> ConnectionResult (define/private (connect:expect-auth username password) @@ -249,11 +272,12 @@ (define/private (query1 fsym stmt close-on-exec? cursor?) ;; if stmt is string, must take no params & results must be binary-readable + (fresh-exchange) (let* ([delenda (check/invalidate-cache stmt)] [portal (query1:enqueue delenda stmt close-on-exec? cursor?)]) - (send-message (make-Sync)) - (begin0 (query1:collect fsym delenda stmt portal (string? stmt) close-on-exec? cursor?) - (check-ready-for-query fsym #f)))) + (call-with-sync fsym + (lambda () + (query1:collect fsym delenda stmt portal (string? stmt) close-on-exec? cursor?))))) ;; check-statement : symbol statement -> statement-binding ;; Convert to statement-binding; need to prepare to get type information, used to @@ -402,10 +426,11 @@ (lambda () (cond [(unbox end-box) #f] [else + (fresh-exchange) (buffer-message (make-Execute portal fetch-size)) - (send-message (make-Sync)) - (let ([rows (query1:data-loop fsym end-box)]) - (check-ready-for-query fsym #f) + (let ([rows + (call-with-sync fsym + (lambda () (query1:data-loop fsym end-box)))]) (when (unbox end-box) (cursor:close fsym pst portal)) rows)])))]) @@ -413,13 +438,13 @@ (define/private (cursor:close fsym pst portal) (let ([close-on-exec? (send pst get-close-on-exec?)]) + (fresh-exchange) (buffer-message (make-Close 'portal portal)) (when close-on-exec? (buffer-message (make-Close 'statement (send pst get-handle))) (send pst set-handle #f)) - (send-message (make-Sync)) - (query1:expect-close-complete fsym close-on-exec?) - (check-ready-for-query fsym #f))) + (call-with-sync fsym + (lambda () (query1:expect-close-complete fsym close-on-exec?))))) ;; == Prepare @@ -428,10 +453,10 @@ (define/override (prepare1* fsym stmt close-on-exec? stmt-type) ;; name generation within exchange: synchronized (let ([name (generate-name)]) + (fresh-exchange) (prepare1:enqueue name stmt) - (send-message (make-Sync)) - (begin0 (prepare1:collect fsym stmt name close-on-exec? stmt-type) - (check-ready-for-query fsym #f)))) + (call-with-sync fsym + (lambda () (prepare1:collect fsym stmt name close-on-exec? stmt-type))))) (define/private (prepare1:enqueue name stmt) (buffer-message (make-Parse name stmt null)) @@ -483,12 +508,13 @@ (let ([name (send pst get-handle)]) (when (and name outport) ;; outport = connected? (send pst set-handle #f) + (fresh-exchange) (buffer-message (make-Close 'statement name)) - (buffer-message (make-Sync)) - (let ([r (recv-message 'free-statement)]) - (cond [(CloseComplete? r) (void)] - [else (error/comm 'free-statement)]) - (check-ready-for-query 'free-statement #t))))) + (call-with-sync 'free-statement + (lambda () + (let ([r (recv-message 'free-statement)]) + (cond [(CloseComplete? r) (void)] + [else (error/comm 'free-statement)]))))))) (if need-lock? (call-with-lock* 'free-statement do-free-statement diff --git a/collects/db/private/sqlite3/connection.rkt b/collects/db/private/sqlite3/connection.rkt index 504277eb79..a30b59d27e 100644 --- a/collects/db/private/sqlite3/connection.rkt +++ b/collects/db/private/sqlite3/connection.rkt @@ -37,6 +37,9 @@ (define/override (call-with-lock fsym proc) (call-with-lock* fsym (lambda () (set! saved-tx-status (get-tx-status)) (proc)) #f #t)) + (define/override (on-break-within-lock) + (disconnect*)) + (define/private (get-db fsym) (or -db (error/not-connected fsym))) @@ -194,19 +197,21 @@ pst))) (define/public (disconnect) - (define (go) - (start-atomic) - (let ([db -db]) - (set! -db #f) - (end-atomic) - (when db - (let ([statements (hash-map statement-table (lambda (k v) k))]) - (for ([pst (in-list statements)]) - (do-free-statement 'disconnect pst)) - (HANDLE 'disconnect2 (sqlite3_close db)) - (void))))) + (define (go) (disconnect*)) (call-with-lock* 'disconnect go go #f)) + (define/private (disconnect*) + (start-atomic) + (let ([db -db]) + (set! -db #f) + (end-atomic) + (when db + (let ([statements (hash-map statement-table (lambda (k v) k))]) + (for ([pst (in-list statements)]) + (do-free-statement 'disconnect pst)) + (HANDLE 'disconnect (sqlite3_close db)) + (void))))) + (define/public (get-base) this) (define/public (free-statement pst need-lock?)