db: clean up communication, locking

Disconnect on break exn within lock; other break-safety fixes.
This commit is contained in:
Ryan Culpepper 2012-01-15 23:18:03 -07:00
parent f5711c6cc3
commit f142a1c5f2
8 changed files with 271 additions and 200 deletions

View File

@ -111,3 +111,5 @@ Misc
so a statement can be invalidated between cache retrieval and execution
- support logging
- audit: make sure no output within atomic section (in drracket, may thread switch)

View File

@ -118,9 +118,11 @@
(define/public-final (call-with-lock* who proc hopeless require-connected?)
(let ([me (thread-dead-evt (current-thread))]
[eb? (break-enabled)]
[result (sync outer-lock lock-holder)])
(cond [(eq? result outer-lock)
;; Got past outer stage
(break-enabled #f)
(let ([proceed?
(begin (start-atomic)
(let ([proceed? (semaphore-try-wait? inner-lock)])
@ -133,21 +135,30 @@
;; Acquired lock
;; - lock-holder = me, and outer-lock is closed again
(when (and require-connected? (not (connected?)))
(unlock)
(break-enabled eb?)
(unlock #f)
(error/not-connected who))
(with-handlers ([values (lambda (e) (unlock) (raise e))])
(begin0 (proc) (unlock)))]
(with-handlers ([(lambda (e) #t)
(lambda (e)
(when (exn:break? e) (on-break-within-lock))
(unlock #f)
(raise e))])
(break-enabled eb?)
(begin0 (proc) (unlock #t)))]
[else
;; Didn't acquire lock; retry
(break-enabled eb?)
(call-with-lock* who proc hopeless require-connected?)]))]
[(eq? result lock-holder)
;; Thread holding lock is dead
(if hopeless (hopeless) (error/hopeless who))]
[(eq? me lock-holder)
(error/internal who "attempted to recursively acquire lock")]
[else
;; lock-holder was stale; retry
(call-with-lock* who proc hopeless require-connected?)])))
(define/private (unlock)
(define/private (unlock run-async-calls?)
(let ([async-calls (reverse delayed-async-calls)])
(set! delayed-async-calls null)
(start-atomic)
@ -155,14 +166,21 @@
(semaphore-post inner-lock)
(semaphore-post outer-sema)
(end-atomic)
(for-each call-with-continuation-barrier async-calls)))
(when run-async-calls?
(for-each call-with-continuation-barrier async-calls))))
;; needs overriding
(define/public (connected?) #f)
(define/public-final (add-delayed-call! proc)
(define/public (add-delayed-call! proc)
(set! delayed-async-calls (cons proc delayed-async-calls)))
;; on-break-within-lock : -> void
;; Called before unlock; makes it easy to disconnect on any break
;; within lock.
(define/public (on-break-within-lock)
(void))
(super-new)))
;; ----------------------------------------

View File

@ -119,7 +119,9 @@
;; timeout? = if connection open, then wait longer
(let* ([c (hash-ref key=>conn key #f)]
[in-trans? (with-handlers ([exn:fail? (lambda (e) #f)])
(and c (send c transaction-status 'virtual-connection)))])
(and c
(send c connected?)
(send c transaction-status 'virtual-connection)))])
(cond [(not c) (void)]
[(and timeout? in-trans?)
(hash-set! alarms c (fresh-alarm-for key))]
@ -182,7 +184,7 @@
(#t '_ (fetch/cursor fsym stmt fetch-size))
(#t '_ (start-transaction fsym isolation cwt?))
(#f (void) (end-transaction fsym mode cwt?))
(#f #f (transaction-status fsym))
(#t '_ (transaction-status fsym))
(#t '_ (list-tables fsym schema)))
(define/public (get-base)

View File

@ -35,44 +35,61 @@
(super-new)
;; with-disconnect-on-error
(define-syntax-rule (with-disconnect-on-error . body)
(with-handlers ([exn:fail? (lambda (e) (disconnect* #f) (raise e))])
. body))
;; ========================================
;; == Communication
;; (Must be called with lock acquired.)
#|
During initial setup, okay to send and recv directly, since reference
to connection does not escape to user. In particular, no danger of trying
to start a new exchange on top of an incomplete failed one.
After initial setup, communication can only happen within lock, and any
error (other than exn:fail:sql) that occurs between sending the message
buffer (flush-message-buffer) and receiving the last message (recv)
must cause the connection to disconnect. Such errors include communication
errors and breaks.
|#
(define msg-buffer null)
(define next-msg-num 0)
(define/private (fresh-exchange)
(set! msg-buffer null)
(set! next-msg-num 0))
;; buffer-message : message -> void
(define/private (buffer-message msg)
(dprintf " >> ~s\n" msg)
(set! msg-buffer (cons (cons msg next-msg-num) msg-buffer))
(set! next-msg-num (add1 next-msg-num)))
;; flush-message-buffer : -> void
(define/private (flush-message-buffer)
(for ([msg+num (in-list (reverse msg-buffer))])
(write-packet outport (car msg+num) (cdr msg+num)))
(flush-output outport))
;; send-message : message -> void
(define/private (send-message msg)
(buffer-message msg)
(flush-message-buffer))
;; buffer-message : message -> void
(define/private (buffer-message msg)
(dprintf " >> ~s\n" msg)
(with-disconnect-on-error
(write-packet outport msg next-msg-num)
(set! next-msg-num (add1 next-msg-num))))
;; flush-message-buffer : -> void
(define/private (flush-message-buffer)
(with-disconnect-on-error
(flush-output outport)))
(define/private (call-with-sync fsym proc)
(with-handlers ([(lambda (e) #t)
(lambda (e)
;; Anything but exn:fail:sql (raised by recv-message) indicates
;; a communication error.
(unless (exn:fail:sql? e)
(disconnect* #f))
(raise e))])
(flush-message-buffer)
(proc)))
;; recv : symbol/#f [(list-of symbol)] -> message
;; Automatically handles asynchronous messages
(define/private (recv fsym expectation [field-dvecs #f])
(define r
(with-disconnect-on-error
(recv* fsym expectation field-dvecs)))
(define r (recv* fsym expectation field-dvecs))
(when (error-packet? r)
(raise-backend-error fsym r))
r)
@ -126,36 +143,34 @@
(err next)])
next))
(define/override (on-break-within-lock)
(disconnect* #f))
;; ========================================
;; Connection management
;; disconnect : -> (void)
;; disconnect : -> void
(define/public (disconnect)
(disconnect* #t))
(when (connected?)
(call-with-lock* 'disconnect
(lambda () (disconnect* #t))
(lambda () (disconnect* #f))
#f)))
(define/private (disconnect* lock-not-held?)
(define (go politely?)
(dprintf " ** Disconnecting\n")
(let ([outport* outport]
[inport* inport])
(when outport
(when politely?
(fresh-exchange)
(send-message (make-command-packet 'quit "")))
(close-output-port outport)
(set! outport #f))
(when inport
(close-input-port inport)
(set! inport #f))))
;; If we don't hold the lock, try to acquire it and disconnect politely.
;; Except, if already disconnected, no need to acquire lock.
(cond [(and lock-not-held? (connected?))
(call-with-lock* 'disconnect
(lambda () (go #t))
(lambda () (go #f))
#f)]
[else (go #f)]))
(define/private (disconnect* politely?)
(dprintf " ** Disconnecting\n")
(let ([outport* outport]
[inport* inport])
(when outport*
(when politely?
(fresh-exchange)
(send-message (make-command-packet 'quit "")))
(close-output-port outport*)
(set! outport #f))
(when inport*
(close-input-port inport*)
(set! inport #f))))
;; connected? : -> boolean
(define/override (connected?)
@ -176,31 +191,30 @@
;; start-connection-protocol : string/#f string string/#f -> void
(define/public (start-connection-protocol dbname username password ssl ssl-context)
(with-disconnect-on-error
(fresh-exchange)
(let ([r (recv 'mysql-connect 'handshake)])
(match r
[(struct handshake-packet (pver sver tid scramble capabilities charset status auth))
(check-required-flags capabilities)
(unless (member auth '("mysql_native_password" #f))
(uerror 'mysql-connect "unsupported authentication plugin: ~s" auth))
(define do-ssl?
(and (case ssl ((yes optional) #t) ((no) #f))
(memq 'ssl capabilities)))
(when (and (eq? ssl 'yes) (not do-ssl?))
(uerror 'mysql-connect "server refused SSL connection"))
(define wanted-capabilities (desired-capabilities capabilities do-ssl? dbname))
(when do-ssl?
(send-message (make-abbrev-client-auth-packet wanted-capabilities))
(let-values ([(sin sout)
(ports->ssl-ports inport outport
#:mode 'connect
#:context ssl-context
#:close-original? #t)])
(attach-to-ports sin sout)))
(authenticate wanted-capabilities username password dbname
(or auth "mysql_native_password") scramble)]
[_ (error/comm 'mysql-connect "during authentication")]))))
(fresh-exchange)
(let ([r (recv 'mysql-connect 'handshake)])
(match r
[(struct handshake-packet (pver sver tid scramble capabilities charset status auth))
(check-required-flags capabilities)
(unless (member auth '("mysql_native_password" #f))
(uerror 'mysql-connect "unsupported authentication plugin: ~s" auth))
(define do-ssl?
(and (case ssl ((yes optional) #t) ((no) #f))
(memq 'ssl capabilities)))
(when (and (eq? ssl 'yes) (not do-ssl?))
(uerror 'mysql-connect "server refused SSL connection"))
(define wanted-capabilities (desired-capabilities capabilities do-ssl? dbname))
(when do-ssl?
(send-message (make-abbrev-client-auth-packet wanted-capabilities))
(let-values ([(sin sout)
(ports->ssl-ports inport outport
#:mode 'connect
#:context ssl-context
#:close-original? #t)])
(attach-to-ports sin sout)))
(authenticate wanted-capabilities username password dbname
(or auth "mysql_native_password") scramble)]
[_ (error/comm 'mysql-connect "during authentication")])))
(define/private (authenticate capabilities username password dbname auth-plugin scramble)
(let loop ([auth-plugin auth-plugin] [scramble scramble] [first? #t])
@ -276,7 +290,8 @@
(let ([wbox (and warnings? (box 0))])
(fresh-exchange)
(query1:enqueue stmt cursor?)
(begin0 (query1:collect fsym stmt (not (string? stmt)) cursor? wbox)
(begin0 (call-with-sync fsym
(lambda () (query1:collect fsym stmt (not (string? stmt)) cursor? wbox)))
(when (and warnings? (not (zero? (unbox wbox))))
(fetch-warnings fsym)))))
@ -307,11 +322,11 @@
[id (send pst get-handle)]
[params (statement-binding-params stmt)]
[null-map (map sql-null? params)])
(send-message
(buffer-message
(let ([flags (if cursor? '(cursor/read-only) '())])
(make-execute-packet id flags null-map params))))]
[else ;; string
(send-message (make-command-packet 'query stmt))]))
(buffer-message (make-command-packet 'query stmt))]))
;; query1:collect : symbol bool -> QueryResult stream
(define/private (query1:collect fsym stmt binary? cursor? wbox)
@ -379,8 +394,9 @@
[else
(let ([wbox (box 0)])
(fresh-exchange)
(send-message (make-fetch-packet (send pst get-handle) fetch-size))
(begin0 (query1:get-rows fsym field-dvecs #t wbox end-box)
(buffer-message (make-fetch-packet (send pst get-handle) fetch-size))
(begin0 (call-with-sync fsym
(lambda () (query1:get-rows fsym field-dvecs #t wbox end-box)))
(when (not (zero? (unbox wbox)))
(fetch-warnings fsym))))]))))))
@ -390,22 +406,24 @@
(define/override (prepare1* fsym stmt close-on-exec? stmt-type)
(fresh-exchange)
(send-message (make-command-packet 'statement-prepare stmt))
(let ([r (recv fsym 'prep-ok)])
(match r
[(struct ok-prepared-statement-packet (id fields params))
(let ([param-dvecs
(if (zero? params) null (prepare1:get-field-descriptions fsym))]
[field-dvecs
(if (zero? fields) null (prepare1:get-field-descriptions fsym))])
(new prepared-statement%
(handle id)
(close-on-exec? close-on-exec?)
(param-typeids (map field-dvec->typeid param-dvecs))
(result-dvecs field-dvecs)
(stmt stmt)
(stmt-type stmt-type)
(owner this)))])))
(buffer-message (make-command-packet 'statement-prepare stmt))
(call-with-sync fsym
(lambda ()
(let ([r (recv fsym 'prep-ok)])
(match r
[(struct ok-prepared-statement-packet (id fields params))
(let ([param-dvecs
(if (zero? params) null (prepare1:get-field-descriptions fsym))]
[field-dvecs
(if (zero? fields) null (prepare1:get-field-descriptions fsym))])
(new prepared-statement%
(handle id)
(close-on-exec? close-on-exec?)
(param-typeids (map field-dvec->typeid param-dvecs))
(result-dvecs field-dvecs)
(stmt stmt)
(stmt-type stmt-type)
(owner this)))])))))
(define/private (prepare1:get-field-descriptions fsym)
(let ([r (recv fsym 'field)])

View File

@ -397,12 +397,7 @@ Based on protocol documentation here:
;; [inp (subport in len)]
[bs (read-bytes len in)]
[inp (open-input-bytes bs)]
[msg
(with-handlers ([exn?
(lambda (e)
(eprintf "packet was: ~s\n" (bytes->list bs))
(raise e))])
(parse-packet/1 inp expect len field-dvecs))])
[msg (parse-packet/1 inp expect len field-dvecs)])
(when (port-has-bytes? inp)
(error/internal 'parse-packet "bytes left over after parsing ~s; bytes were: ~s"
msg (io:read-bytes-to-eof inp)))

View File

@ -51,6 +51,9 @@
check-valid-tx-status
check-statement/tx)
(define/override (on-break-within-lock)
(disconnect*))
(define/public (get-db fsym)
(unless db
(error/not-connected fsym))
@ -477,23 +480,25 @@
(vector name type size digits)))))
(define/public (disconnect)
(define (go)
(start-atomic)
(let ([db* db]
[env* env])
(set! db #f)
(set! env #f)
(end-atomic)
(when db*
(let ([statements (hash-map statement-table (lambda (k v) k))])
(for ([pst (in-list statements)])
(free-statement* 'disconnect pst))
(handle-status 'disconnect (SQLDisconnect db*) db*)
(handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_DBC db*))
(handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_ENV env*))
(void)))))
(define (go) (disconnect*))
(call-with-lock* 'disconnect go go #f))
(define/private (disconnect*)
(start-atomic)
(let ([db* db]
[env* env])
(set! db #f)
(set! env #f)
(end-atomic)
(when db*
(let ([statements (hash-map statement-table (lambda (k v) k))])
(for ([pst (in-list statements)])
(free-statement* 'disconnect pst))
(handle-status 'disconnect (SQLDisconnect db*) db*)
(handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_DBC db*))
(handle-status 'disconnect (SQLFreeHandle SQL_HANDLE_ENV env*))
(void)))))
(define/public (get-base) this)
(define/public (free-statement pst need-lock?)

View File

@ -45,22 +45,58 @@
(super-new)
;; with-disconnect-on-error
(define-syntax-rule (with-disconnect-on-error . body)
(with-handlers ([exn:fail? (lambda (e) (disconnect* #f) (raise e))])
. body))
;; ========================================
;; == Communication
;; (Must be called with lock acquired.)
#|
During initial setup, okay to send and recv directly, since reference
to connection does not escape to user. In particular, no danger of trying
to start a new exchange on top of an incomplete failed one.
;; raw-recv : -> message
(define/private (raw-recv)
(with-disconnect-on-error
(let ([r (parse-server-message inport)])
(dprintf " << ~s\n" r)
r)))
After initial setup, communication can only happen within lock, and any
error (other than exn:fail:sql) that occurs between sending the message
buffer (flush-message-buffer) and receiving the last message (recv-message)
must cause the connection to disconnect. Such errors include communication
errors and breaks.
|#
;; message-buffer : reversed list of messages
(define message-buffer null)
;; fresh-exchange : -> void
(define/private (fresh-exchange)
(set! message-buffer null))
;; buffer-message : message -> void
(define/private (buffer-message msg)
(dprintf " >> ~s\n" msg)
(set! message-buffer (cons msg message-buffer)))
;; flush-message-buffer : -> void
(define/private (flush-message-buffer)
(for ([msg (in-list (reverse message-buffer))])
(write-message msg outport))
(set! message-buffer null)
(flush-output outport))
;; send-message : message -> void
(define/private (send-message msg)
(buffer-message msg)
(flush-message-buffer))
(define/private (call-with-sync fsym proc)
(buffer-message (make-Sync))
(with-handlers ([(lambda (e) #t)
(lambda (e)
;; Anything but exn:fail:sql (raised by recv-message) indicates
;; a communication error.
;; FIXME: alternatively, have check-ready-for-query set an ok flag
(unless (exn:fail:sql? e)
(disconnect* #f))
(raise e))])
(flush-message-buffer)
(begin0 (proc)
(check-ready-for-query fsym #f))))
;; recv-message : symbol -> message
(define/private (recv-message fsym)
@ -75,21 +111,11 @@
(recv-message fsym)]
[else r])))
;; send-message : message -> void
(define/private (send-message msg)
(buffer-message msg)
(flush-message-buffer))
;; buffer-message : message -> void
(define/private (buffer-message msg)
(dprintf " >> ~s\n" msg)
(with-disconnect-on-error
(write-message msg outport)))
;; flush-message-buffer : -> void
(define/private (flush-message-buffer)
(with-disconnect-on-error
(flush-output outport)))
;; raw-recv : -> message
(define/private (raw-recv)
(let ([r (parse-server-message inport)])
(dprintf " << ~s\n" r)
r))
;; check-ready-for-query : symbol -> void
(define/private (check-ready-for-query fsym or-eof?)
@ -104,6 +130,9 @@
[(and or-eof? (eof-object? r)) (void)]
[else (error/comm fsym "expected ready")])))
(define/override (on-break-within-lock)
(disconnect* #f))
;; == Asynchronous messages
;; handle-async-message : message -> void
@ -128,32 +157,27 @@
;; == Connection management
;; disconnect : [boolean] -> (void)
;; disconnect : -> void
(define/public (disconnect)
(disconnect* #t))
(when (connected?)
(call-with-lock* 'disconnect
(lambda () (disconnect* #t))
(lambda () (disconnect* #f))
#f)))
;; disconnect* : boolean -> void
(define/private (disconnect* no-lock-held?)
(define (go politely?)
(dprintf " ** Disconnecting\n")
(let ([outport* outport]
[inport* inport])
(when outport*
(when politely?
(send-message (make-Terminate)))
(close-output-port outport*)
(set! outport #f))
(when inport*
(close-input-port inport*)
(set! inport #f))))
;; If we don't hold the lock, try to acquire it and disconnect politely.
;; Except, if already disconnected, no need to acquire lock.
(cond [(and no-lock-held? (connected?))
(call-with-lock* 'disconnect
(lambda () (go #t))
(lambda () (go #f))
#f)]
[else (go #f)]))
(define/private (disconnect* politely?)
(dprintf " ** Disconnecting\n")
(let ([outport* outport]
[inport* inport])
(when outport*
(when politely?
(send-message (make-Terminate)))
(close-output-port outport*)
(set! outport #f))
(when inport*
(close-input-port inport*)
(set! inport #f))))
;; connected? : -> boolean
(define/override (connected?)
@ -176,8 +200,7 @@
;; start-connection-protocol : string string string/#f -> void
(define/public (start-connection-protocol dbname username password)
(with-disconnect-on-error
(call-with-lock 'postgresql-connect
(call-with-lock 'postgresql-connect
(lambda ()
(send-message
(make-StartupMessage
@ -185,7 +208,7 @@
(cons "database" dbname)
(cons "client_encoding" "UTF8")
(cons "DateStyle" "ISO, MDY"))))
(connect:expect-auth username password)))))
(connect:expect-auth username password))))
;; connect:expect-auth : string/#f -> ConnectionResult
(define/private (connect:expect-auth username password)
@ -249,11 +272,12 @@
(define/private (query1 fsym stmt close-on-exec? cursor?)
;; if stmt is string, must take no params & results must be binary-readable
(fresh-exchange)
(let* ([delenda (check/invalidate-cache stmt)]
[portal (query1:enqueue delenda stmt close-on-exec? cursor?)])
(send-message (make-Sync))
(begin0 (query1:collect fsym delenda stmt portal (string? stmt) close-on-exec? cursor?)
(check-ready-for-query fsym #f))))
(call-with-sync fsym
(lambda ()
(query1:collect fsym delenda stmt portal (string? stmt) close-on-exec? cursor?)))))
;; check-statement : symbol statement -> statement-binding
;; Convert to statement-binding; need to prepare to get type information, used to
@ -402,10 +426,11 @@
(lambda ()
(cond [(unbox end-box) #f]
[else
(fresh-exchange)
(buffer-message (make-Execute portal fetch-size))
(send-message (make-Sync))
(let ([rows (query1:data-loop fsym end-box)])
(check-ready-for-query fsym #f)
(let ([rows
(call-with-sync fsym
(lambda () (query1:data-loop fsym end-box)))])
(when (unbox end-box)
(cursor:close fsym pst portal))
rows)])))])
@ -413,13 +438,13 @@
(define/private (cursor:close fsym pst portal)
(let ([close-on-exec? (send pst get-close-on-exec?)])
(fresh-exchange)
(buffer-message (make-Close 'portal portal))
(when close-on-exec?
(buffer-message (make-Close 'statement (send pst get-handle)))
(send pst set-handle #f))
(send-message (make-Sync))
(query1:expect-close-complete fsym close-on-exec?)
(check-ready-for-query fsym #f)))
(call-with-sync fsym
(lambda () (query1:expect-close-complete fsym close-on-exec?)))))
;; == Prepare
@ -428,10 +453,10 @@
(define/override (prepare1* fsym stmt close-on-exec? stmt-type)
;; name generation within exchange: synchronized
(let ([name (generate-name)])
(fresh-exchange)
(prepare1:enqueue name stmt)
(send-message (make-Sync))
(begin0 (prepare1:collect fsym stmt name close-on-exec? stmt-type)
(check-ready-for-query fsym #f))))
(call-with-sync fsym
(lambda () (prepare1:collect fsym stmt name close-on-exec? stmt-type)))))
(define/private (prepare1:enqueue name stmt)
(buffer-message (make-Parse name stmt null))
@ -483,12 +508,13 @@
(let ([name (send pst get-handle)])
(when (and name outport) ;; outport = connected?
(send pst set-handle #f)
(fresh-exchange)
(buffer-message (make-Close 'statement name))
(buffer-message (make-Sync))
(let ([r (recv-message 'free-statement)])
(cond [(CloseComplete? r) (void)]
[else (error/comm 'free-statement)])
(check-ready-for-query 'free-statement #t)))))
(call-with-sync 'free-statement
(lambda ()
(let ([r (recv-message 'free-statement)])
(cond [(CloseComplete? r) (void)]
[else (error/comm 'free-statement)])))))))
(if need-lock?
(call-with-lock* 'free-statement
do-free-statement

View File

@ -37,6 +37,9 @@
(define/override (call-with-lock fsym proc)
(call-with-lock* fsym (lambda () (set! saved-tx-status (get-tx-status)) (proc)) #f #t))
(define/override (on-break-within-lock)
(disconnect*))
(define/private (get-db fsym)
(or -db (error/not-connected fsym)))
@ -194,19 +197,21 @@
pst)))
(define/public (disconnect)
(define (go)
(start-atomic)
(let ([db -db])
(set! -db #f)
(end-atomic)
(when db
(let ([statements (hash-map statement-table (lambda (k v) k))])
(for ([pst (in-list statements)])
(do-free-statement 'disconnect pst))
(HANDLE 'disconnect2 (sqlite3_close db))
(void)))))
(define (go) (disconnect*))
(call-with-lock* 'disconnect go go #f))
(define/private (disconnect*)
(start-atomic)
(let ([db -db])
(set! -db #f)
(end-atomic)
(when db
(let ([statements (hash-map statement-table (lambda (k v) k))])
(for ([pst (in-list statements)])
(do-free-statement 'disconnect pst))
(HANDLE 'disconnect (sqlite3_close db))
(void)))))
(define/public (get-base) this)
(define/public (free-statement pst need-lock?)