db: finalize close-on-exec psts within lock
This commit is contained in:
parent
4c817d0f7f
commit
618173c97e
|
@ -66,7 +66,7 @@
|
|||
(query fsym stmt)
|
||||
(prepare fsym stmt close-on-exec?)
|
||||
(get-base)
|
||||
(free-statement stmt)
|
||||
(free-statement stmt need-lock?)
|
||||
(transaction-status fsym)
|
||||
(start-transaction fsym isolation cwt?)
|
||||
(end-transaction fsym mode cwt?)
|
||||
|
@ -200,7 +200,7 @@
|
|||
(error fsym "cannot prepare statement with virtual connection"))
|
||||
(send (get-connection #t) prepare fsym stmt close-on-exec?))
|
||||
|
||||
(define/public (free-statement stmt)
|
||||
(define/public (free-statement stmt need-lock?)
|
||||
(error 'free-statement
|
||||
"internal error: virtual connection does not own statements"))))
|
||||
|
||||
|
@ -339,7 +339,7 @@
|
|||
(query fsym stmt)
|
||||
(prepare fsym stmt close-on-exec?)
|
||||
(get-base)
|
||||
(free-statement stmt)
|
||||
(free-statement stmt need-lock?)
|
||||
(transaction-status fsym)
|
||||
(start-transaction fsym isolation cwt?)
|
||||
(end-transaction fsym mode cwt?)
|
||||
|
|
|
@ -41,7 +41,7 @@
|
|||
start-transaction ;; symbol (U 'serializable ...) boolean -> void
|
||||
end-transaction ;; symbol (U 'commit 'rollback) boolean -> void
|
||||
transaction-status ;; symbol -> (U boolean 'invalid)
|
||||
free-statement)) ;; prepared-statement<%> -> void
|
||||
free-statement)) ;; prepared-statement<%> boolean -> void
|
||||
|
||||
;; dbsystem<%>
|
||||
;; Represents brand of database system, SQL dialect, etc
|
||||
|
@ -68,7 +68,8 @@
|
|||
get-handle ;; -> Handle (depends on database system)
|
||||
set-handle ;; Handle -> void
|
||||
|
||||
after-exec ;; -> void (for close-after-exec)
|
||||
get-close-on-exec? ;; -> boolean
|
||||
after-exec ;; boolean -> void (for close-on-exec)
|
||||
|
||||
get-param-count ;; -> nat or #f
|
||||
get-param-typeids ;; -> (listof typeid)
|
||||
|
@ -80,9 +81,6 @@
|
|||
check-owner ;; symbol connection any -> #t (or error)
|
||||
bind ;; symbol (listof param) -> statement-binding
|
||||
|
||||
;; extension hooks: usually shouldn't need to override
|
||||
finalize ;; -> void
|
||||
|
||||
;; inspection only
|
||||
get-param-types ;; -> (listof TypeDesc)
|
||||
get-result-types ;; -> (listof TypeDesc)
|
||||
|
|
|
@ -87,13 +87,13 @@
|
|||
(define/public (list-tables fsym schema)
|
||||
(call 'list-tables fsym schema))
|
||||
|
||||
(define/public (free-statement pst)
|
||||
(define/public (free-statement pst need-lock?)
|
||||
(start-atomic)
|
||||
(let ([handle (send pst get-handle)])
|
||||
(send pst set-handle #f)
|
||||
(end-atomic)
|
||||
(when channel
|
||||
(call/d 'free-statement handle))))
|
||||
(call/d 'free-statement handle need-lock?))))
|
||||
|
||||
(define/private (sexpr->result x)
|
||||
(match x
|
||||
|
|
|
@ -95,8 +95,8 @@ server -> client: (or (list 'values result ...)
|
|||
[(list 'disconnect)
|
||||
(send connection disconnect)
|
||||
(set! connection #f)]
|
||||
[(list 'free-statement pstmt-index)
|
||||
(send connection free-statement (hash-ref pstmt-table pstmt-index))
|
||||
[(list 'free-statement pstmt-index need-lock?)
|
||||
(send connection free-statement (hash-ref pstmt-table pstmt-index) need-lock?)
|
||||
(hash-remove! pstmt-table pstmt-index)]
|
||||
[(list 'query fsym stmt)
|
||||
(send connection query fsym (sexpr->statement stmt))]
|
||||
|
|
|
@ -25,9 +25,10 @@
|
|||
(define/public (get-handle) handle)
|
||||
(define/public (set-handle h) (set! handle h))
|
||||
|
||||
(define/public (after-exec)
|
||||
(define/public (get-close-on-exec?) close-on-exec?)
|
||||
(define/public (after-exec need-lock?)
|
||||
(when close-on-exec? ;; indicates ad-hoc prepared statement
|
||||
(finalize)))
|
||||
(finalize need-lock?)))
|
||||
|
||||
(define/public (get-param-count) (length param-typeids))
|
||||
(define/public (get-param-typeids) param-typeids)
|
||||
|
@ -47,11 +48,11 @@
|
|||
(define/public (check-results fsym checktype obj)
|
||||
(cond [(eq? checktype 'rows)
|
||||
(unless (positive? (get-result-count))
|
||||
(when close-on-exec? (finalize))
|
||||
(when close-on-exec? (finalize #t))
|
||||
(error fsym "expected statement producing rows, got ~e" obj))]
|
||||
[(exact-positive-integer? checktype)
|
||||
(unless (= (get-result-count) checktype)
|
||||
(when close-on-exec? (finalize))
|
||||
(when close-on-exec? (finalize #t))
|
||||
(error fsym
|
||||
"expected statement producing rows with ~a ~a, got ~e"
|
||||
checktype
|
||||
|
@ -66,21 +67,22 @@
|
|||
(define/public (bind fsym params)
|
||||
(statement-binding this (apply-type-handlers fsym params param-handlers)))
|
||||
|
||||
(define/public (finalize)
|
||||
(let ([owner (weak-box-value owner)])
|
||||
(when owner
|
||||
(send owner free-statement this))))
|
||||
(define/public (finalize need-lock?)
|
||||
(when handle
|
||||
(let ([owner (weak-box-value owner)])
|
||||
(when owner
|
||||
(send owner free-statement this need-lock?)))))
|
||||
|
||||
(define/public (register-finalizer)
|
||||
(thread-resume finalizer-thread (current-thread))
|
||||
(will-register will-executor this (lambda (pst) (send pst finalize))))
|
||||
(will-register will-executor this (lambda (pst) (send pst finalize #t))))
|
||||
|
||||
(super-new)
|
||||
(register-finalizer)))
|
||||
|
||||
(define (statement:after-exec stmt)
|
||||
(define (statement:after-exec stmt need-lock?)
|
||||
(when (statement-binding? stmt)
|
||||
(send (statement-binding-pst stmt) after-exec)))
|
||||
(send (statement-binding-pst stmt) after-exec need-lock?)))
|
||||
|
||||
(define (apply-type-handlers fsym params param-handlers)
|
||||
(let ([given-len (length params)]
|
||||
|
|
|
@ -261,19 +261,21 @@
|
|||
;; query : symbol Statement -> QueryResult
|
||||
(define/public (query fsym stmt)
|
||||
(check-valid-tx-status fsym)
|
||||
(let*-values ([(stmt result)
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(let* ([stmt (check-statement fsym stmt)]
|
||||
[stmt-type
|
||||
(cond [(statement-binding? stmt)
|
||||
(send (statement-binding-pst stmt) get-stmt-type)]
|
||||
[(string? stmt)
|
||||
(classify-my-sql stmt)])])
|
||||
(check-statement/tx fsym stmt-type)
|
||||
(values stmt (query1 fsym stmt #t)))))])
|
||||
(when #f ;; DISABLED---for some reason, *really* slow
|
||||
(statement:after-exec stmt))
|
||||
(let ([result
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(let* ([stmt (check-statement fsym stmt)]
|
||||
[stmt-type
|
||||
(cond [(statement-binding? stmt)
|
||||
(send (statement-binding-pst stmt) get-stmt-type)]
|
||||
[(string? stmt)
|
||||
(classify-my-sql stmt)])])
|
||||
(check-statement/tx fsym stmt-type)
|
||||
(begin0 (query1 fsym stmt #t)
|
||||
(when #f ;; DISABLED!
|
||||
;; For some reason, *really* slow; the concurrent tests slow
|
||||
;; down by over an order of magnitude when this is enabled.
|
||||
(statement:after-exec stmt #f))))))])
|
||||
(query1:process-result fsym result)))
|
||||
|
||||
;; query1 : symbol Statement -> QueryResult
|
||||
|
@ -390,16 +392,16 @@
|
|||
|
||||
(define/public (get-base) this)
|
||||
|
||||
(define/public (free-statement pst)
|
||||
(call-with-lock* 'free-statement
|
||||
(lambda ()
|
||||
(let ([id (send pst get-handle)])
|
||||
(when (and id outport) ;; outport = connected?
|
||||
(send pst set-handle #f)
|
||||
(fresh-exchange)
|
||||
(send-message (make-command:statement-packet 'statement-close id)))))
|
||||
void
|
||||
#f))
|
||||
(define/public (free-statement pst need-lock?)
|
||||
(define (do-free-statement)
|
||||
(let ([id (send pst get-handle)])
|
||||
(when (and id outport) ;; outport = connected?
|
||||
(send pst set-handle #f)
|
||||
(fresh-exchange)
|
||||
(send-message (make-command:statement-packet 'statement-close id)))))
|
||||
(if need-lock?
|
||||
(call-with-lock* 'free-statement do-free-statement void #f)
|
||||
(do-free-statement)))
|
||||
|
||||
;; == Warnings
|
||||
|
||||
|
|
|
@ -56,12 +56,11 @@
|
|||
(define/override (connected?) (and db #t))
|
||||
|
||||
(define/public (query fsym stmt)
|
||||
(let-values ([(stmt* dvecs rows)
|
||||
(let-values ([(dvecs rows)
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(check-valid-tx-status fsym)
|
||||
(query1 fsym stmt #t)))])
|
||||
(statement:after-exec stmt*)
|
||||
(cond [(pair? dvecs) (rows-result (map field-dvec->field-info dvecs) rows)]
|
||||
[else (simple-result '())])))
|
||||
|
||||
|
@ -80,8 +79,7 @@
|
|||
(let ([typeid (field-dvec->typeid dvec)])
|
||||
(unless (supported-typeid? typeid)
|
||||
(error/unsupported-type fsym typeid)))))
|
||||
(let-values ([(dvecs rows) (query1:inner fsym pst params)])
|
||||
(values stmt dvecs rows))))
|
||||
(query1:inner fsym pst params)))
|
||||
|
||||
(define/private (query1:inner fsym pst params)
|
||||
(let* ([db (get-db fsym)]
|
||||
|
@ -100,6 +98,7 @@
|
|||
(fetch* fsym stmt (map field-dvec->typeid result-dvecs)))])
|
||||
(handle-status fsym (SQLFreeStmt stmt SQL_CLOSE) stmt)
|
||||
(handle-status fsym (SQLFreeStmt stmt SQL_RESET_PARAMS) stmt)
|
||||
(send pst after-exec #f)
|
||||
(values result-dvecs rows))))
|
||||
|
||||
(define/private (load-param fsym db stmt i param typeid)
|
||||
|
@ -460,9 +459,11 @@
|
|||
|
||||
(define/public (get-base) this)
|
||||
|
||||
(define/public (free-statement pst)
|
||||
(define/public (free-statement pst need-lock?)
|
||||
(define (go) (free-statement* 'free-statement pst))
|
||||
(call-with-lock* 'free-statement go go #f))
|
||||
(if need-lock?
|
||||
(call-with-lock* 'free-statement go go #f)
|
||||
(go)))
|
||||
|
||||
(define/private (free-statement* fsym pst)
|
||||
(start-atomic)
|
||||
|
|
|
@ -241,22 +241,23 @@
|
|||
|
||||
;; query : symbol Statement -> QueryResult
|
||||
(define/public (query fsym stmt0)
|
||||
(let-values ([(stmt result)
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(check-valid-tx-status fsym)
|
||||
(let* ([stmt (check-statement fsym stmt0)]
|
||||
[stmt-type (send (statement-binding-pst stmt) get-stmt-type)])
|
||||
(check-statement/tx fsym stmt-type)
|
||||
(values stmt (query1 fsym stmt #f)))))])
|
||||
(statement:after-exec stmt)
|
||||
(let ([result
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(check-valid-tx-status fsym)
|
||||
(let* ([stmt (check-statement fsym stmt0)]
|
||||
[pst (statement-binding-pst stmt)]
|
||||
[stmt-type (send pst get-stmt-type)]
|
||||
[close-on-exec? (send pst get-close-on-exec?)])
|
||||
(check-statement/tx fsym stmt-type)
|
||||
(query1 fsym stmt #f close-on-exec?))))])
|
||||
(query1:process-result fsym result)))
|
||||
|
||||
(define/private (query1 fsym stmt simple?)
|
||||
(define/private (query1 fsym stmt simple? [close-on-exec? #f])
|
||||
;; if simple?: stmt must be string, no params, & results must be binary-readable
|
||||
(query1:enqueue stmt)
|
||||
(query1:enqueue stmt close-on-exec?)
|
||||
(send-message (make-Sync))
|
||||
(begin0 (query1:collect fsym simple?)
|
||||
(begin0 (query1:collect fsym simple? close-on-exec?)
|
||||
(check-ready-for-query fsym #f)
|
||||
(when DEBUG?
|
||||
(fprintf (current-error-port) " ** ~a\n" (tx-state->string)))))
|
||||
|
@ -276,7 +277,7 @@
|
|||
(send pst bind fsym null))]))
|
||||
|
||||
;; query1:enqueue : Statement -> void
|
||||
(define/private (query1:enqueue stmt)
|
||||
(define/private (query1:enqueue stmt close-on-exec?)
|
||||
(cond [(statement-binding? stmt)
|
||||
(let* ([pst (statement-binding-pst stmt)]
|
||||
[pst-name (send pst get-handle)]
|
||||
|
@ -284,15 +285,21 @@
|
|||
(buffer-message (make-Bind "" pst-name
|
||||
(map typeid->format (send pst get-param-typeids))
|
||||
params
|
||||
(map typeid->format (send pst get-result-typeids)))))]
|
||||
(map typeid->format (send pst get-result-typeids))))
|
||||
(buffer-message (make-Describe 'portal ""))
|
||||
(buffer-message (make-Execute "" 0))
|
||||
(buffer-message (make-Close 'portal ""))
|
||||
(when close-on-exec?
|
||||
(buffer-message (make-Close 'statement pst-name))
|
||||
(send pst set-handle #f)))]
|
||||
[(string? stmt)
|
||||
(buffer-message (make-Parse "" stmt '()))
|
||||
(buffer-message (make-Bind "" "" '() '() '(1)))])
|
||||
(buffer-message (make-Describe 'portal ""))
|
||||
(buffer-message (make-Execute "" 0))
|
||||
(buffer-message (make-Close 'portal "")))
|
||||
(buffer-message (make-Bind "" "" '() '() '(1)))
|
||||
(buffer-message (make-Describe 'portal ""))
|
||||
(buffer-message (make-Execute "" 0))
|
||||
(buffer-message (make-Close 'portal ""))]))
|
||||
|
||||
(define/private (query1:collect fsym simple?)
|
||||
(define/private (query1:collect fsym simple? close-on-exec?)
|
||||
(when simple?
|
||||
(match (recv-message fsym)
|
||||
[(struct ParseComplete ()) (void)]
|
||||
|
@ -303,11 +310,11 @@
|
|||
(match (recv-message fsym)
|
||||
[(struct RowDescription (field-dvecs))
|
||||
(let* ([rows (query1:data-loop fsym)])
|
||||
(query1:expect-close-complete fsym)
|
||||
(query1:expect-close-complete fsym close-on-exec?)
|
||||
(vector 'rows field-dvecs rows))]
|
||||
[(struct NoData ())
|
||||
(let* ([command (query1:expect-completion fsym)])
|
||||
(query1:expect-close-complete fsym)
|
||||
(query1:expect-close-complete fsym close-on-exec?)
|
||||
(vector 'command command))]
|
||||
[other-r (query1:error fsym other-r)]))
|
||||
|
||||
|
@ -324,9 +331,10 @@
|
|||
[(struct EmptyQueryResponse ()) '()]
|
||||
[other-r (query1:error fsym other-r)]))
|
||||
|
||||
(define/private (query1:expect-close-complete fsym)
|
||||
(define/private (query1:expect-close-complete fsym close-on-exec?)
|
||||
(match (recv-message fsym)
|
||||
[(struct CloseComplete ()) (void)]
|
||||
[(struct CloseComplete ())
|
||||
(when close-on-exec? (query1:expect-close-complete fsym #f))]
|
||||
[other-r (query1:error fsym other-r)]))
|
||||
|
||||
(define/private (query1:error fsym r)
|
||||
|
@ -421,20 +429,23 @@
|
|||
(define/public (get-base) this)
|
||||
|
||||
;; free-statement : prepared-statement -> void
|
||||
(define/public (free-statement pst)
|
||||
(call-with-lock* 'free-statement
|
||||
(lambda ()
|
||||
(let ([name (send pst get-handle)])
|
||||
(when (and name outport) ;; outport = connected?
|
||||
(send pst set-handle #f)
|
||||
(buffer-message (make-Close 'statement name))
|
||||
(buffer-message (make-Sync))
|
||||
(let ([r (recv-message 'free-statement)])
|
||||
(cond [(CloseComplete? r) (void)]
|
||||
[else (error/comm 'free-statement)])
|
||||
(check-ready-for-query 'free-statement #t)))))
|
||||
void
|
||||
#f))
|
||||
(define/public (free-statement pst need-lock?)
|
||||
(define (do-free-statement)
|
||||
(let ([name (send pst get-handle)])
|
||||
(when (and name outport) ;; outport = connected?
|
||||
(send pst set-handle #f)
|
||||
(buffer-message (make-Close 'statement name))
|
||||
(buffer-message (make-Sync))
|
||||
(let ([r (recv-message 'free-statement)])
|
||||
(cond [(CloseComplete? r) (void)]
|
||||
[else (error/comm 'free-statement)])
|
||||
(check-ready-for-query 'free-statement #t)))))
|
||||
(if need-lock?
|
||||
(call-with-lock* 'free-statement
|
||||
do-free-statement
|
||||
void
|
||||
#f)
|
||||
(do-free-statement)))
|
||||
|
||||
;; == Transactions
|
||||
|
||||
|
|
|
@ -39,13 +39,10 @@
|
|||
(define/override (connected?) (and -db #t))
|
||||
|
||||
(define/public (query fsym stmt)
|
||||
(let-values ([(stmt* result)
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(check-valid-tx-status fsym)
|
||||
(query1 fsym stmt #t)))])
|
||||
(statement:after-exec stmt)
|
||||
result))
|
||||
(call-with-lock fsym
|
||||
(lambda ()
|
||||
(check-valid-tx-status fsym)
|
||||
(query1 fsym stmt #t))))
|
||||
|
||||
(define/private (query1 fsym stmt check-tx?)
|
||||
(let* ([stmt (cond [(string? stmt)
|
||||
|
@ -73,11 +70,11 @@
|
|||
(HANDLE fsym (sqlite3_clear_bindings stmt))
|
||||
(unless (eq? tx-status 'invalid)
|
||||
(set! tx-status (get-tx-status)))
|
||||
(values stmt
|
||||
(cond [(pair? info)
|
||||
(rows-result info rows)]
|
||||
[else
|
||||
(simple-result '())]))))))
|
||||
(send pst after-exec #f)
|
||||
(cond [(pair? info)
|
||||
(rows-result info rows)]
|
||||
[else
|
||||
(simple-result '())])))))
|
||||
|
||||
(define/private (load-param fsym db stmt i param)
|
||||
(HANDLE fsym
|
||||
|
@ -174,9 +171,11 @@
|
|||
|
||||
(define/public (get-base) this)
|
||||
|
||||
(define/public (free-statement pst)
|
||||
(define/public (free-statement pst need-lock?)
|
||||
(define (go) (do-free-statement 'free-statement pst))
|
||||
(call-with-lock* 'free-statement go go #f))
|
||||
(if need-lock?
|
||||
(call-with-lock* 'free-statement go go #f)
|
||||
(go)))
|
||||
|
||||
(define/private (do-free-statement fsym pst)
|
||||
(start-atomic)
|
||||
|
@ -188,6 +187,10 @@
|
|||
(HANDLE fsym (sqlite3_finalize stmt))
|
||||
(void))))
|
||||
|
||||
;; Internal query
|
||||
|
||||
(define/private (internal-query1 fsym sql)
|
||||
(query1 fsym sql #f))
|
||||
|
||||
;; == Transactions
|
||||
|
||||
|
@ -202,25 +205,25 @@
|
|||
;; FIXME: modes are DEFERRED | IMMEDIATE | EXCLUSIVE
|
||||
(cond [(eq? isolation 'nested)
|
||||
(let ([savepoint (generate-name)])
|
||||
(query1 fsym (format "SAVEPOINT ~a" savepoint) #f)
|
||||
(internal-query1 fsym (format "SAVEPOINT ~a" savepoint))
|
||||
savepoint)]
|
||||
[else
|
||||
(query1 fsym "BEGIN TRANSACTION" #f)
|
||||
(internal-query1 fsym "BEGIN TRANSACTION")
|
||||
#f]))
|
||||
|
||||
(define/override (end-transaction* fsym mode savepoint)
|
||||
(case mode
|
||||
((commit)
|
||||
(cond [savepoint
|
||||
(query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #f)]
|
||||
(internal-query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint))]
|
||||
[else
|
||||
(query1 fsym "COMMIT TRANSACTION" #f)]))
|
||||
(internal-query1 fsym "COMMIT TRANSACTION")]))
|
||||
((rollback)
|
||||
(cond [savepoint
|
||||
(query1 fsym (format "ROLLBACK TO SAVEPOINT ~a" savepoint) #f)
|
||||
(query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint) #f)]
|
||||
(internal-query1 fsym (format "ROLLBACK TO SAVEPOINT ~a" savepoint))
|
||||
(internal-query1 fsym (format "RELEASE SAVEPOINT ~a" savepoint))]
|
||||
[else
|
||||
(query1 fsym "ROLLBACK TRANSACTION" #f)])
|
||||
(internal-query1 fsym "ROLLBACK TRANSACTION")])
|
||||
;; remove 'invalid status, if necessary
|
||||
(set! tx-status (get-tx-status))))
|
||||
(void))
|
||||
|
@ -241,10 +244,7 @@
|
|||
;; schema ignored, because sqlite doesn't support
|
||||
(string-append "SELECT tbl_name from sqlite_master "
|
||||
"WHERE type = 'table' or type = 'view'")])
|
||||
(let-values ([(stmt result)
|
||||
(call-with-lock fsym
|
||||
(lambda () (query1 fsym stmt #f)))])
|
||||
(statement:after-exec stmt)
|
||||
(let ([result (call-with-lock fsym (lambda () (internal-query1 fsym stmt)))])
|
||||
(for/list ([row (in-list (rows-result-rows result))])
|
||||
(vector-ref row 0)))))
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user