db: added prepared statement cache

This commit is contained in:
Ryan Culpepper 2012-01-07 14:05:49 -07:00
parent 6fe7e65ff0
commit f5711c6cc3
14 changed files with 251 additions and 93 deletions

View File

@ -89,15 +89,25 @@ Misc
- can get some tx data from ODBC...
- on the other hand, not supposed to do tx-SQL in ODBC anyway, so low-priority
- postgresql query path optimizations
- once all types have binary readers...
- can eliminate prepare step when args given (use unnamed statement)
- then, can remember what SQL in unnamed statement, avoid re-parse
- make implementation notes section of docs
- explain cursor impl (& rationale)
- explain nested tx impl
- ...
- mysql, sqlite3, odbc query path optimization
- can do something similar, but messier because no unnamed statement
- might make close-on-exec? obsolete
- add sql field to pstmt%, add sql=>pstmt hash in connection (update on pstmt finalize)
- use as stmt cache, avoid re-prepare
- sql field would be good for eventually implementing cursors, too
- PROBLEM: if schema changes, may invalidate pstmt, change types, etc
- better query path: IMMEDIATE STATEMENTS vs PREPARED STATEMENT CACHE
- IMMEDIATE STATEMENTS
In pg terms, pipeline Parse and Bind (ie, 1 round-trip instead of 2).
Can do immediate query with args using typeids of parameters
- problem: results in worse error messages
- problem: error invalidates tx (in pg)
- problem: same problem exists even with no args: arity errors
- benefit: I like the code cleanup.
- PREPARED STATEMENT CACHE
Add sql=>pstmt cache in collection.
- schema can change without warning, invalidate pst type information
- mitigation: only cache w/in tx (but is isolation level sufficient?)
and clear cache on any tx rollback or potential ddl stmt
- problem: (cached) prepare and query not done atomically (two lock entries)
so a statement can be invalidated between cache retrieval and execution
- support logging

View File

@ -69,7 +69,8 @@
(#:mode (or/c 'read-only 'read/write 'create)
#:busy-retry-limit (or/c exact-nonnegative-integer? +inf.0)
#:busy-retry-delay (and/c rational? (not/c negative?))
#:use-place boolean?)
#:use-place boolean?
#:debug? any/c)
connection?)]
;; Duplicates contracts at odbc.rkt

View File

@ -5,7 +5,9 @@
"interfaces.rkt")
(provide define-type-table
locking%
debugging%
transactions%
statement-cache%
isolation-symbol->string
make-sql-classifier
sql-skip-comments
@ -165,8 +167,27 @@
;; ----------------------------------------
(define transactions%
(define debugging%
(class locking%
(super-new)
(field [DEBUG? #f])
(define/public (debug debug?)
(set! DEBUG? debug?))
(define/public (dprintf fmt . args)
(when DEBUG? (apply fprintf (current-error-port) fmt args)))
))
;; ----------------------------------------
(define transactions%
(class debugging%
(inherit dprintf)
(inherit-field DEBUG?)
#|
A transaction created via SQL is "unmanaged".
A transaction created via start-tx, call-with-tx is "managed".
@ -202,6 +223,7 @@
(super call-with-lock fsym
(lambda ()
(begin0 (proc)
(when DEBUG? (dprintf " ** ~a\n" (tx-state->string)))
(when (and (eq? tx-status #f) (not (null? tx-stack)))
(error/internal fsym "managed transaction unexpectedly closed"))))))
@ -350,6 +372,97 @@
;; ----------------------------------------
(define statement-cache%
(class transactions%
(init-field [cache-statements 'in-transaction])
(inherit call-with-lock
get-tx-status
check-valid-tx-status
dprintf)
(super-new)
;; Statement Cache
;; updated by prepare; potentially invalidated by query (via check/invalidate-cache)
(define pst-cache '#hash())
(define/public (get-cached-statement stmt)
(cond [(use-cache?)
(let ([cached-pst (hash-ref pst-cache stmt #f)])
(cond [cached-pst
(dprintf " ** using cached statement\n")
cached-pst]
[else
(dprintf " ** statement not in cache\n")
#f]))]
[else
(dprintf " ** not using statement cache\n")
#f]))
(define/public (safe-statement-type? stmt-type)
(memq stmt-type '(select insert update delete with)))
(define/public (cache-statement! pst)
(when (and (use-cache?) (safe-statement-type? (send pst get-stmt-type)))
(let ([sql (send pst get-stmt)])
(when sql
(dprintf " ** caching statement\n")
(set! pst-cache (hash-set pst-cache sql pst))))))
(define/private (use-cache?)
(case cache-statements
((always) #t)
((never) #f)
((in-transaction) (eq? (get-tx-status) #t))))
;; check/invalidate-cache : statement/pst/symbol/#f -> hash/#f
;; Returns old cache on invalidation, or #f if stmt is safe.
(define/public (check/invalidate-cache x)
#|
Sufficient to check on every query execution whether statement type is safe
(ie, SELECT, INSERT, etc). All statements sent as strings are considered
unsafe, because they're usually transactional SQL.
|#
(cond [(statement-binding? x)
(check/invalidate-cache (statement-binding-pst x))]
[(prepared-statement? x)
(check/invalidate-cache (send x get-stmt-type))]
[else
(cond [(safe-statement-type? x)
#f]
[else
(dprintf " ** invalidating statement cache\n")
(begin0 pst-cache
(set! pst-cache '#hash()))])]))
;; Prepare
(define/public (prepare fsym stmt close-on-exec?)
(call-with-lock fsym
(lambda ()
(check-valid-tx-status fsym)
(prepare1 fsym stmt close-on-exec?))))
(define/public (prepare1 fsym stmt close-on-exec?)
(cond [close-on-exec?
(or (get-cached-statement stmt)
(let* ([stmt-type (classify-stmt stmt)]
[safe? (safe-statement-type? stmt-type)]
[pst (prepare1* fsym stmt (if safe? #f close-on-exec?) stmt-type)])
(when safe? (cache-statement! pst))
pst))]
[else (prepare1* fsym stmt #f (classify-stmt stmt))]))
(define/public (prepare1* fsym stmt close-on-exec?)
(error/internal 'prepare1* "not implemented"))
(define/public (classify-stmt stmt)
(error/internal 'classify-stmt "not implemented"))
))
;; ----------------------------------------
;; Isolation levels
(define (isolation-symbol->string isolation)

View File

@ -195,7 +195,8 @@ considered important.
(define sqlite3-data-source
(mk-specialized 'sqlite3-data-source 'sqlite3 0
'(#:database #:mode #:busy-retry-limit #:busy-retry-delay #:use-place)))
'(#:database #:mode #:busy-retry-limit #:busy-retry-delay
#:use-place #:debug?)))
(define odbc-data-source
(mk-specialized 'odbc-data-source 'odbc 0

View File

@ -288,9 +288,15 @@
(define (call-with-transaction c proc #:isolation [isolation #f])
(send c start-transaction '|call-with-transaction (start)| isolation #t)
(with-handlers ([(lambda (e) #t)
(with-handlers ([exn?
(lambda (e)
(send c end-transaction '|call-with-transaction (rollback)| 'rollback #t)
(with-handlers ([exn?
(lambda (e2)
(error 'call-with-transaction
"error during rollback: ~a\ncaused by underlying error: ~a"
(exn-message e2)
(exn-message e)))])
(send c end-transaction '|call-with-transaction (rollback)| 'rollback #t))
(raise e))])
(begin0 (call-with-continuation-barrier proc)
(send c end-transaction '|call-with-transaction (commit)| 'commit #t))))

View File

@ -17,7 +17,7 @@
;; ========================================
(define connection%
(class* transactions% (connection<%>)
(class* statement-cache% (connection<%>)
(init-private notice-handler)
(define inport #f)
(define outport #f)
@ -28,7 +28,10 @@
check-valid-tx-status
get-tx-status
set-tx-status!
check-statement/tx)
check-statement/tx
dprintf
prepare1
check/invalidate-cache)
(super-new)
@ -39,15 +42,6 @@
;; ========================================
;; == Debugging
(define DEBUG? #f)
(define/public (debug debug?)
(set! DEBUG? debug?))
;; ========================================
;; == Communication
;; (Must be called with lock acquired.)
@ -63,8 +57,7 @@
;; buffer-message : message -> void
(define/private (buffer-message msg)
(when DEBUG?
(fprintf (current-error-port) " >> ~s\n" msg))
(dprintf " >> ~s\n" msg)
(with-disconnect-on-error
(write-packet outport msg next-msg-num)
(set! next-msg-num (add1 next-msg-num))))
@ -94,8 +87,7 @@
(error/comm fsym))
(let-values ([(msg-num next) (parse-packet inport expectation field-dvecs)])
(set! next-msg-num (add1 msg-num))
(when DEBUG?
(eprintf " << ~s\n" next))
(dprintf " << ~s\n" next)
;; Update transaction status (see Transactions below)
(when (ok-packet? next)
(set-tx-status! fsym (bitwise-bit-set? (ok-packet-server-status next) 0)))
@ -144,8 +136,7 @@
(define/private (disconnect* lock-not-held?)
(define (go politely?)
(when DEBUG?
(eprintf " ** Disconnecting\n"))
(dprintf " ** Disconnecting\n")
(let ([outport* outport]
[inport* inport])
(when outport
@ -252,7 +243,6 @@
(query 'mysql-connect "set names 'utf8'" #f)
(void))
;; ========================================
;; == Query
@ -279,6 +269,10 @@
;; query1 : symbol Statement -> QueryResult
(define/private (query1 fsym stmt cursor? warnings?)
(let ([delenda (check/invalidate-cache stmt)])
;; Don't do anything with delenda; too slow!
;; (See comment in query method above.)
(void))
(let ([wbox (and warnings? (box 0))])
(fresh-exchange)
(query1:enqueue stmt cursor?)
@ -392,14 +386,9 @@
;; == Prepare
;; prepare : symbol string boolean -> PreparedStatement
(define/public (prepare fsym stmt close-on-exec?)
(check-valid-tx-status fsym)
(call-with-lock fsym
(lambda ()
(prepare1 fsym stmt close-on-exec?))))
(define/override (classify-stmt sql) (classify-my-sql sql))
(define/private (prepare1 fsym stmt close-on-exec?)
(define/override (prepare1* fsym stmt close-on-exec? stmt-type)
(fresh-exchange)
(send-message (make-command-packet 'statement-prepare stmt))
(let ([r (recv fsym 'prep-ok)])
@ -415,7 +404,7 @@
(param-typeids (map field-dvec->typeid param-dvecs))
(result-dvecs field-dvecs)
(stmt stmt)
(stmt-type (classify-my-sql stmt))
(stmt-type stmt-type)
(owner this)))])))
(define/private (prepare1:get-field-descriptions fsym)

View File

@ -61,6 +61,7 @@
;; We care about:
;; - determining whether commands must be prepared (to use binary data)
;; see http://dev.mysql.com/doc/refman/5.0/en/c-api-prepared-statements.html
;; - determining what statements are safe for the statement cache
;; - detecting commands that affect transaction status (maybe implicitly)
;; see http://dev.mysql.com/doc/refman/5.0/en/implicit-commit.html
@ -71,6 +72,11 @@
("SELECT" select)
("SHOW" show)
;; Do not invalidate statement cache
("INSERT" insert)
("DELETE" delete)
("UPDATE" update)
;; Explicit transaction commands
("ROLLBACK WORK TO" rollback-savepoint)
("ROLLBACK TO" rollback-savepoint)

View File

@ -18,6 +18,10 @@
;; == Connection
;; ODBC connections do not use statement-cache%
;; - safety depends on sql dialect
;; - transaction interactions more complicated
(define connection%
(class* transactions% (connection<%>)
(init-private db
@ -27,7 +31,6 @@
(init strict-parameter-types?)
(define statement-table (make-hasheq))
(define lock (make-semaphore 1))
(define use-describe-param?
(and strict-parameter-types?

View File

@ -23,7 +23,7 @@
start-connection-protocol)) ;; string string string/#f -> void
(define connection-base%
(class* transactions% (connection<%> connector<%>)
(class* statement-cache% (connection<%> connector<%>)
(init-private notice-handler
notification-handler
allow-cleartext-password?)
@ -38,7 +38,10 @@
set-tx-status!
check-valid-tx-status
check-statement/tx
tx-state->string)
tx-state->string
dprintf
prepare1
check/invalidate-cache)
(super-new)
@ -49,16 +52,6 @@
;; ========================================
;; == Debugging
;; Debugging
(define DEBUG? #f)
(define/public (debug debug?)
(set! DEBUG? debug?))
;; ========================================
;; == Communication
;; (Must be called with lock acquired.)
@ -66,8 +59,7 @@
(define/private (raw-recv)
(with-disconnect-on-error
(let ([r (parse-server-message inport)])
(when DEBUG?
(fprintf (current-error-port) " << ~s\n" r))
(dprintf " << ~s\n" r)
r)))
;; recv-message : symbol -> message
@ -90,8 +82,7 @@
;; buffer-message : message -> void
(define/private (buffer-message msg)
(when DEBUG?
(fprintf (current-error-port) " >> ~s\n" msg))
(dprintf " >> ~s\n" msg)
(with-disconnect-on-error
(write-message msg outport)))
@ -144,8 +135,7 @@
;; disconnect* : boolean -> void
(define/private (disconnect* no-lock-held?)
(define (go politely?)
(when DEBUG?
(fprintf (current-error-port) " ** Disconnecting\n"))
(dprintf " ** Disconnecting\n")
(let ([outport* outport]
[inport* inport])
(when outport*
@ -259,12 +249,11 @@
(define/private (query1 fsym stmt close-on-exec? cursor?)
;; if stmt is string, must take no params & results must be binary-readable
(let ([portal (query1:enqueue stmt close-on-exec? cursor?)])
(let* ([delenda (check/invalidate-cache stmt)]
[portal (query1:enqueue delenda stmt close-on-exec? cursor?)])
(send-message (make-Sync))
(begin0 (query1:collect fsym stmt portal (string? stmt) close-on-exec? cursor?)
(check-ready-for-query fsym #f)
(when DEBUG?
(fprintf (current-error-port) " ** ~a\n" (tx-state->string))))))
(begin0 (query1:collect fsym delenda stmt portal (string? stmt) close-on-exec? cursor?)
(check-ready-for-query fsym #f))))
;; check-statement : symbol statement -> statement-binding
;; Convert to statement-binding; need to prepare to get type information, used to
@ -281,12 +270,18 @@
(send pst bind fsym null))]))
;; query1:enqueue : Statement boolean boolean -> string
(define/private (query1:enqueue stmt close-on-exec? cursor?)
(define/private (query1:enqueue delenda stmt close-on-exec? cursor?)
(when delenda
(for ([(_sql pst) (in-hash delenda)])
(buffer-message (make-Close 'statement (send pst get-handle)))
(send pst set-handle #f)))
(let ([portal (if cursor? (generate-name) "")])
(cond [(statement-binding? stmt)
(let* ([pst (statement-binding-pst stmt)]
[pst-name (send pst get-handle)]
[params (statement-binding-params stmt)])
(unless pst-name
(error/internal 'query1:enqueue "statement was deleted: ~s" (send pst get-stmt)))
(buffer-message (make-Bind portal pst-name
(map typeid->format (send pst get-param-typeids))
params
@ -304,7 +299,12 @@
(send pst set-handle #f))))
portal))
(define/private (query1:collect fsym stmt portal simple? close-on-exec? cursor?)
(define/private (query1:collect fsym delenda stmt portal simple? close-on-exec? cursor?)
(when delenda
(for ([(_sql _pst) (in-hash delenda)])
(match (recv-message fsym)
[(struct CloseComplete ()) (void)]
[other-r (query1:error fsym other-r)])))
(when simple?
(match (recv-message fsym)
[(struct ParseComplete ()) (void)]
@ -423,25 +423,21 @@
;; == Prepare
(define/public (prepare fsym stmt close-on-exec?)
(call-with-lock fsym
(lambda ()
(check-valid-tx-status fsym)
(prepare1 fsym stmt close-on-exec?))))
(define/override (classify-stmt sql) (classify-pg-sql sql))
(define/private (prepare1 fsym stmt close-on-exec?)
(define/override (prepare1* fsym stmt close-on-exec? stmt-type)
;; name generation within exchange: synchronized
(let ([name (generate-name)])
(prepare1:enqueue name stmt)
(send-message (make-Sync))
(begin0 (prepare1:collect fsym name close-on-exec? (classify-pg-sql stmt))
(begin0 (prepare1:collect fsym stmt name close-on-exec? stmt-type)
(check-ready-for-query fsym #f))))
(define/private (prepare1:enqueue name stmt)
(buffer-message (make-Parse name stmt null))
(buffer-message (make-Describe 'statement name)))
(define/private (prepare1:collect fsym name close-on-exec? stmt-type)
(define/private (prepare1:collect fsym stmt name close-on-exec? stmt-type)
(match (recv-message fsym)
[(struct ParseComplete ()) (void)]
[other-r (prepare1:error fsym other-r)])
@ -452,6 +448,7 @@
(close-on-exec? close-on-exec?)
(param-typeids param-typeids)
(result-dvecs field-dvecs)
(stmt stmt)
(stmt-type stmt-type)
(owner this))))

View File

@ -48,13 +48,24 @@
;; ========================================
;; SQL "parsing"
;; We just care about detecting commands that affect transaction status.
;; We care about detecting:
;; - statements that affect transaction status
;; - statements that are safe for (vs invalidate) the statement cache
;; classify-pg-sql : string [nat] -> symbol/#f
(define classify-pg-sql
;; Source: http://www.postgresql.org/docs/current/static/sql-commands.html
(make-sql-classifier
`(("ABORT" rollback)
`(;; Statements that do not invalidate previously prepared statements
("SELECT" select)
("INSERT" insert)
("UPDATE" update)
("DELETE" delete)
("WITH" with)
;; Transactional statements
("ABORT" rollback)
("BEGIN" start)
;; COMMIT PREPARED itself is harmless.
("COMMIT PREPARED" #f) ;; Note: before COMMIT

View File

@ -14,7 +14,7 @@
;; == Connection
(define connection%
(class* transactions% (connection<%>)
(class* statement-cache% (connection<%>)
(init db)
(init-private busy-retry-limit
busy-retry-delay)
@ -28,7 +28,11 @@
get-tx-status
set-tx-status!
check-valid-tx-status
check-statement/tx)
check-statement/tx
dprintf
prepare1
check/invalidate-cache)
(inherit-field DEBUG?)
(define/override (call-with-lock fsym proc)
(call-with-lock* fsym (lambda () (set! saved-tx-status (get-tx-status)) (proc)) #f #t))
@ -51,7 +55,13 @@
[params (statement-binding-params stmt)])
(when check-tx? (check-statement/tx fsym (send pst get-stmt-type)))
(let ([db (get-db fsym)]
[delenda (check/invalidate-cache stmt)]
[stmt (send pst get-handle)])
(when DEBUG?
(dprintf " >> query statement #x~x with ~e\n" (cast stmt _pointer _uintptr) params))
(when delenda
(for ([pst (in-hash-values delenda)])
(send pst finalize #f)))
(HANDLE fsym (sqlite3_reset stmt))
(HANDLE fsym (sqlite3_clear_bindings stmt))
(for ([i (in-naturals 1)]
@ -149,14 +159,11 @@
fsym "unknown column type: ~e" type)]))))
vec)])))
(define/public (prepare fsym stmt close-on-exec?)
(call-with-lock fsym
(lambda ()
(check-valid-tx-status fsym)
(prepare1 fsym stmt close-on-exec?))))
(define/override (classify-stmt sql) (classify-sl-sql sql))
(define/private (prepare1 fsym sql close-on-exec?)
(define/override (prepare1* fsym sql close-on-exec? stmt-type)
;; no time between sqlite3_prepare and table entry
(dprintf " >> prepare ~e~a\n" sql (if close-on-exec? " close-on-exec" ""))
(let*-values ([(db) (get-db fsym)]
[(prep-status stmt)
(HANDLE fsym
@ -166,6 +173,8 @@
(when stmt (sqlite3_finalize stmt))
(uerror fsym "multiple SQL statements given: ~e" sql))
(values prep-status stmt)))])
(when DEBUG?
(dprintf " << prepared statement #x~x\n" (cast stmt _pointer _uintptr)))
(unless stmt (uerror fsym "SQL syntax error in ~e" sql))
(let* ([param-typeids
(for/list ([i (in-range (sqlite3_bind_parameter_count stmt))])
@ -178,7 +187,8 @@
(close-on-exec? close-on-exec?)
(param-typeids param-typeids)
(result-dvecs result-dvecs)
(stmt-type (classify-sl-sql sql))
(stmt-type stmt-type)
(stmt sql)
(owner this))])
(hash-set! statement-table pst #t)
pst)))

View File

@ -45,7 +45,13 @@
;; classify-sl-sql : string [nat] -> symbol/#f
(define classify-sl-sql
(make-sql-classifier
'(;; Explicit transaction commands
'(;; Statements that do not invalidate the statement cache
("SELECT" select)
("INSERT" insert)
("UPDATE" update)
("DELETE" delete)
;; Explicit transaction commands
("ROLLBACK TRANSACTION TO" rollback-savepoint)
("ROLLBACK TO" rollback-savepoint)
("RELEASE" release-savepoint)

View File

@ -11,6 +11,7 @@
#:mode [mode 'read/write]
#:busy-retry-delay [busy-retry-delay 0.1]
#:busy-retry-limit [busy-retry-limit 10]
#:debug? [debug? #f]
#:use-place [use-place #f])
(let ([path
(case path
@ -40,10 +41,13 @@
((create)
(+ SQLITE_OPEN_READWRITE SQLITE_OPEN_CREATE))))])
(handle-status* 'sqlite3-connect open-status db)
(new connection%
(db db)
(busy-retry-limit busy-retry-limit)
(busy-retry-delay busy-retry-delay))))])))
(let ([c
(new connection%
(db db)
(busy-retry-limit busy-retry-limit)
(busy-retry-delay busy-retry-delay))])
(when debug? (send c debug #t))
c)))])))
(define sqlite-place-proxy%
(class place-proxy-connection%

View File

@ -10,5 +10,6 @@
(#:mode (or/c 'read-only 'read/write 'create)
#:busy-retry-limit (or/c exact-nonnegative-integer? +inf.0)
#:busy-retry-delay (and/c rational? (not/c negative?))
#:use-place any/c)
#:use-place any/c
#:debug? any/c)
connection?)])