db: fix for disconnect during query in OS thread
This change allows a query that is run in an OS thread to succeed even when the connection is custodian-disconnected. Previously, the part in the OS thread would complete, and then the operations needed to package the result would fail. This fix moves some of those operations to the OS thread and makes read-tx-status work when disconnected.
This commit is contained in:
parent
a5b14d74b7
commit
ee43d982e4
|
@ -70,6 +70,8 @@
|
|||
(begin (start-atomic)
|
||||
(unless -db (end-atomic) (error/disconnect-in-lock 'sqlite3))
|
||||
(begin0 (let () e ...) (end-atomic))))
|
||||
(define-syntax-rule (A* e ...)
|
||||
(begin (start-atomic) (begin0 (let () e ...) (end-atomic))))
|
||||
|
||||
(define/private (get-db fsym)
|
||||
(or -db (error/not-connected fsym)))
|
||||
|
@ -108,10 +110,11 @@
|
|||
[saved-last-insert-rowid
|
||||
(and (null? info) (A (sqlite3_last_insert_rowid db)))]
|
||||
[saved-total-changes
|
||||
(and (null? info) (A (sqlite3_total_changes db)))]
|
||||
[result
|
||||
(or cursor?
|
||||
(get-rows fsym stmt #f +inf.0 pst))])
|
||||
(and (null? info) (A (sqlite3_total_changes db)))])
|
||||
(define-values (result last-insert-rowid total-changes changes)
|
||||
(if cursor?
|
||||
(values #t #f #f #f)
|
||||
(get-result fsym stmt #f +inf.0 pst (not (pair? info)))))
|
||||
(unless (eq? (get-tx-status) 'invalid)
|
||||
(set-tx-status! fsym (read-tx-status)))
|
||||
(unless cursor?
|
||||
|
@ -122,10 +125,10 @@
|
|||
(cursor-result info pst (box #f))]
|
||||
[else
|
||||
(simple-result
|
||||
(let ([last-insert-rowid (A (sqlite3_last_insert_rowid db))])
|
||||
(let ()
|
||||
;; Not all statements clear last_insert_rowid, changes; so
|
||||
;; extra guards to make sure results are relevant.
|
||||
(define changes? (> (A (sqlite3_total_changes db)) saved-total-changes))
|
||||
(define changes? (> total-changes saved-total-changes))
|
||||
`((insert-id
|
||||
;; We want to report insert-id if statement was a *successful* INSERT,
|
||||
;; but we can't check that directly. Instead, check if either
|
||||
|
@ -141,7 +144,7 @@
|
|||
. ,(and (or (not (= last-insert-rowid saved-last-insert-rowid))
|
||||
(and changes? (eq? (send pst get-stmt-type) 'insert)))
|
||||
last-insert-rowid))
|
||||
(affected-rows . ,(if changes? (A (sqlite3_changes db)) 0)))))])))))
|
||||
(affected-rows . ,(if changes? changes 0)))))])))))
|
||||
|
||||
(define/public (fetch/cursor fsym cursor fetch-size)
|
||||
(let ([pst (cursor-result-pst cursor)]
|
||||
|
@ -152,9 +155,11 @@
|
|||
(cond [(unbox end-box) #f]
|
||||
[else
|
||||
(let ([stmt (send pst get-handle)])
|
||||
(begin0 (get-rows fsym stmt end-box fetch-size pst)
|
||||
(when (unbox end-box)
|
||||
(send pst after-exec #f))))])))))
|
||||
(define-values (rows _lii _tc _c)
|
||||
(get-result fsym stmt end-box fetch-size pst #f))
|
||||
(when (unbox end-box)
|
||||
(send pst after-exec #f))
|
||||
rows)])))))
|
||||
|
||||
(define/private (check-statement fsym stmt cursor?)
|
||||
(cond [(statement-binding? stmt)
|
||||
|
@ -183,39 +188,38 @@
|
|||
[else
|
||||
(error/internal* fsym "bad parameter value" '("value" value) param)])))
|
||||
|
||||
(define/private (get-rows who stmt end-box fetch-limit pst)
|
||||
(define/private (get-result who stmt end-box fetch-limit pst changes?)
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (e)
|
||||
(A (sqlite3_reset stmt)
|
||||
(sqlite3_clear_bindings stmt))
|
||||
(A* (when -db (sqlite3_reset stmt) (sqlite3_clear_bindings stmt)))
|
||||
(raise e))])
|
||||
(define result
|
||||
(cond [use-os-thread?
|
||||
(define timeout (inexact->exact (ceiling (* 1000 busy-retry-delay))))
|
||||
(sync-call-in-os-thread
|
||||
(lambda (db)
|
||||
(sqlite3_busy_timeout db timeout)
|
||||
(begin0 (get-rows* who stmt end-box fetch-limit pst #f)
|
||||
(sqlite3_busy_timeout db 0))))]
|
||||
[else (get-rows* who stmt end-box fetch-limit pst #t)]))
|
||||
(if (procedure? result) (result) result)))
|
||||
((cond [use-os-thread?
|
||||
(define timeout (inexact->exact (ceiling (* 1000 busy-retry-delay))))
|
||||
(sync-call-in-os-thread
|
||||
(lambda (db)
|
||||
(sqlite3_busy_timeout db timeout)
|
||||
(begin0 (get-result* who db stmt end-box fetch-limit pst changes?)
|
||||
(sqlite3_busy_timeout db 0))))]
|
||||
[else (get-result* who #f stmt end-box fetch-limit pst changes?)]))))
|
||||
|
||||
(define/private (get-rows* who stmt end-box fetch-limit pst fine-atomic?)
|
||||
(define (call-as-fine-atomic thunk) (if fine-atomic? (A (thunk)) (thunk)))
|
||||
(define/private (get-result* who os-db stmt end-box fetch-limit pst changes?)
|
||||
;; os-db is sqlite3_database if in OS thread, #f if in Racket thread
|
||||
(define (call-as-fine-atomic thunk) (if os-db (thunk) (A (thunk))))
|
||||
(define-syntax-rule (FA expr ...) (call-as-fine-atomic (lambda () expr ...)))
|
||||
;; step* : -> (U (Listof Vector) (-> Any))
|
||||
;; step* : -> (-> (values (U Vector (-> Error)) Int/#f Nat/#f Nat/#f))
|
||||
(define (step*)
|
||||
(let loop ([fetch-limit fetch-limit])
|
||||
(cond [(zero? fetch-limit) null]
|
||||
(let loop ([fetch-limit fetch-limit] [acc null])
|
||||
(cond [(zero? fetch-limit)
|
||||
(return (reverse acc))]
|
||||
[(step)
|
||||
=> (lambda (c)
|
||||
(cond [(procedure? c) c]
|
||||
[else (cons c (loop (sub1 fetch-limit)))]))]
|
||||
[else (loop (sub1 fetch-limit) (cons c acc))]))]
|
||||
[else
|
||||
(FA (sqlite3_reset stmt) (sqlite3_clear_bindings stmt))
|
||||
(when end-box (set-box! end-box #t))
|
||||
null])))
|
||||
;; step : -> (U #f Vector (-> Any))
|
||||
(return (reverse acc))])))
|
||||
;; step : -> (U #f Vector (-> Error))
|
||||
(define (step)
|
||||
(let loop ([iteration 0])
|
||||
(define full-s (FA (sqlite3_step stmt)))
|
||||
|
@ -225,12 +229,21 @@
|
|||
[(and (= s SQLITE_BUSY) (< iteration busy-retry-limit))
|
||||
;; Normally, sleep and try again (cooperates w/ scheduler).
|
||||
;; In os-thread, can't sleep; see sqlite3_busy_timeout above.
|
||||
(when fine-atomic? (sleep busy-retry-delay))
|
||||
(unless os-db (sleep busy-retry-delay))
|
||||
(loop (add1 iteration))]
|
||||
[else (lambda () (handle-status who full-s pst))])))
|
||||
;; return : X -> (-> (values X Int/#f Nat/#f Nat/#f))
|
||||
(define (return rows)
|
||||
(define-values (last-insert-rowid total-changes changes)
|
||||
(if changes?
|
||||
(FA (values (sqlite3_last_insert_rowid (or os-db -db))
|
||||
(sqlite3_total_changes (or os-db -db))
|
||||
(sqlite3_changes (or os-db -db))))
|
||||
(values #f #f #f)))
|
||||
(lambda () (values rows last-insert-rowid total-changes changes)))
|
||||
(step*))
|
||||
|
||||
;; -get-row : Symbol stmt -> (U Vector (-> Any))
|
||||
;; -get-row : Symbol stmt -> (U Vector (-> Error))
|
||||
;; PRE: in atomic mode
|
||||
(define/private (-get-row fsym stmt)
|
||||
(define column-count (sqlite3_column_count stmt))
|
||||
|
@ -367,7 +380,9 @@
|
|||
;; http://www.sqlite.org/lang_transaction.html
|
||||
|
||||
(define/private (read-tx-status)
|
||||
(not (A (sqlite3_get_autocommit -db))))
|
||||
;; Allow this to be called after custodian-disconnect so that in-progress
|
||||
;; query can complete.
|
||||
(not (A* (if -db (sqlite3_get_autocommit -db) #t))))
|
||||
|
||||
(define/override (start-transaction* fsym isolation option)
|
||||
;; Isolation level can be set to READ UNCOMMITTED via pragma, but
|
||||
|
|
Loading…
Reference in New Issue
Block a user