db: add option to run query in separate OS thread

This commit is contained in:
Ryan Culpepper 2021-01-26 08:26:34 +01:00
parent e110b07cc8
commit ad6bbe9809
2 changed files with 153 additions and 63 deletions

View File

@ -4,6 +4,9 @@
ffi/unsafe ffi/unsafe
ffi/unsafe/atomic ffi/unsafe/atomic
ffi/unsafe/custodian ffi/unsafe/custodian
ffi/unsafe/os-thread
ffi/unsafe/vm
ffi/unsafe/schedule
"../generic/interfaces.rkt" "../generic/interfaces.rkt"
"../generic/common.rkt" "../generic/common.rkt"
"../generic/prepared.rkt" "../generic/prepared.rkt"
@ -28,10 +31,13 @@
(init-private db-spec ;; #f or (list path mode) (init-private db-spec ;; #f or (list path mode)
busy-retry-limit busy-retry-limit
busy-retry-delay) busy-retry-delay)
(super-new)
(register-finalizer this shutdown-connection)
(define -db db) (define -db db)
(define saved-tx-status #f) ;; set by with-lock, only valid while locked (define saved-tx-status #f) ;; set by with-lock, only valid while locked
(define creg #f) ;; custodian registration (define creg (register-custodian-shutdown this shutdown-connection))
(sqlite3_extended_result_codes db #t) (sqlite3_extended_result_codes db #t)
@ -106,7 +112,7 @@
(and (null? info) (A (sqlite3_total_changes db)))] (and (null? info) (A (sqlite3_total_changes db)))]
[result [result
(or cursor? (or cursor?
(step* fsym db stmt #f +inf.0 pst))]) (get-rows fsym stmt #f +inf.0 pst))])
(unless (eq? (get-tx-status) 'invalid) (unless (eq? (get-tx-status) 'invalid)
(set-tx-status! fsym (read-tx-status))) (set-tx-status! fsym (read-tx-status)))
(unless cursor? (unless cursor?
@ -147,7 +153,7 @@
(cond [(unbox end-box) #f] (cond [(unbox end-box) #f]
[else [else
(let ([stmt (send pst get-handle)]) (let ([stmt (send pst get-handle)])
(begin0 (step* fsym (get-db fsym) stmt end-box fetch-size pst) (begin0 (get-rows fsym stmt end-box fetch-size pst)
(when (unbox end-box) (when (unbox end-box)
(send pst after-exec #f))))]))))) (send pst after-exec #f))))])))))
@ -178,47 +184,76 @@
[else [else
(error/internal* fsym "bad parameter value" '("value" value) param)]))) (error/internal* fsym "bad parameter value" '("value" value) param)])))
(define/private (step* fsym db stmt end-box fetch-limit pst) (define/private (get-rows who stmt end-box fetch-limit pst)
(with-handlers ([exn:fail? (with-handlers ([exn:fail?
(lambda (e) (lambda (e)
(A (sqlite3_reset stmt) (A (sqlite3_reset stmt)
(sqlite3_clear_bindings stmt)) (sqlite3_clear_bindings stmt))
(raise e))]) (raise e))])
(let loop ([fetch-limit fetch-limit]) (define result
(if (zero? fetch-limit) (cond [use-os-thread?
null (let ([timeout (inexact->exact (ceiling (* 1000 busy-retry-delay)))])
(let ([c (step fsym db stmt pst)]) (A (sqlite3_busy_timeout -db timeout)))
(cond [c (begin0 (sync-call-in-os-thread
(cons c (loop (sub1 fetch-limit)))] (lambda ()
[else (get-rows* who stmt end-box fetch-limit pst #f)))
(A (sqlite3_reset stmt) (A (sqlite3_busy_timeout -db 0)))]
(sqlite3_clear_bindings stmt)) [else (get-rows* who stmt end-box fetch-limit pst #t)]))
(when end-box (set-box! end-box #t)) (if (procedure? result) (result) result)))
null]))))))
(define/private (step fsym db stmt pst) (define/private (get-rows* who stmt end-box fetch-limit pst fine-atomic?)
(let ([s (HANDLE fsym (A (sqlite3_step stmt)) pst)]) (define (call-as-fine-atomic thunk) (if fine-atomic? (A (thunk)) (thunk)))
(cond [(= s SQLITE_DONE) #f] (define-syntax-rule (FA expr ...) (call-as-fine-atomic (lambda () expr ...)))
[(= s SQLITE_ROW) ;; step* : -> (U (Listof Vector) (-> Any))
(let* ([column-count (A (sqlite3_column_count stmt))] (define (step*)
[vec (make-vector column-count)]) (let loop ([fetch-limit fetch-limit])
(for ([i (in-range column-count)]) (cond [(zero? fetch-limit) null]
(vector-set! vec i [(step)
(let ([type (A (sqlite3_column_type stmt i))]) => (lambda (c)
(cond [(= type SQLITE_NULL) (cond [(procedure? c) c]
sql-null] [else (cons c (loop (sub1 fetch-limit)))]))]
[(= type SQLITE_INTEGER) [else
(A (sqlite3_column_int64 stmt i))] (FA (sqlite3_reset stmt) (sqlite3_clear_bindings stmt))
[(= type SQLITE_FLOAT) (when end-box (set-box! end-box #t))
(A (sqlite3_column_double stmt i))] null])))
[(= type SQLITE_TEXT) ;; step : -> (U #f Vector (-> Any))
(A (sqlite3_column_text stmt i))] (define (step)
[(= type SQLITE_BLOB) (let loop ([iteration 0])
(A (sqlite3_column_blob stmt i))] (define full-s (FA (sqlite3_step stmt)))
[else (define s (simplify-status full-s))
(error/internal* fsym "unknown column type" (cond [(= s SQLITE_DONE) #f]
"type" type)])))) [(= s SQLITE_ROW) (FA (-get-row who stmt))]
vec)]))) [(and (= s SQLITE_BUSY) (< iteration busy-retry-limit))
;; Normally, sleep and try again (cooperates w/ scheduler).
;; In os-thread, can't sleep; see sqlite3_busy_timeout above.
(when fine-atomic? (sleep busy-retry-delay))
(loop (add1 iteration))]
[else (lambda () (handle-status who full-s pst))])))
(step*))
;; -get-row : Symbol stmt -> (U Vector (-> Any))
;; PRE: in atomic mode
(define/private (-get-row fsym stmt)
(define column-count (sqlite3_column_count stmt))
(define vec (make-vector column-count))
(for ([i (in-range column-count)])
(define val
(let ([type (sqlite3_column_type stmt i)])
(cond [(= type SQLITE_NULL)
sql-null]
[(= type SQLITE_INTEGER)
(sqlite3_column_int64 stmt i)]
[(= type SQLITE_FLOAT)
(sqlite3_column_double stmt i)]
[(= type SQLITE_TEXT)
(sqlite3_column_text stmt i)]
[(= type SQLITE_BLOB)
(sqlite3_column_blob stmt i)]
[else
(lambda ()
(error/internal* fsym "unknown column type" "type" type))])))
(vector-set! vec i val))
vec)
(define/override (classify-stmt sql) (classify-sl-sql sql)) (define/override (classify-stmt sql) (classify-sl-sql sql))
@ -268,19 +303,35 @@
(call-as-atomic (call-as-atomic
(lambda () (lambda ()
(let ([db -db]) (let ([db -db])
(when db (cond [(not db)
(set! -db #f) ;; Already disconnected
;; Unregister custodian shutdown, unless called from custodian. (void)]
(unless from-custodian? (unregister-custodian-shutdown this creg)) [os-result-box
(set! creg #f) ;; OS thread is running; delay until finished
;; Free all of connection's prepared statements. This will leave => (lambda (result-box)
;; pst objects with dangling foreign objects, so don't try to free (log-db-debug "disconnect delayed by OS thread")
;; them again---check that -db is not-#f. (define dont-gc this%)
(let ([stmts (hash-keys stmt-table)]) (parameterize ((current-custodian (make-custodian-at-root)))
(hash-clear! stmt-table) (thread
(for-each sqlite3_finalize stmts)) (lambda ()
(HANDLE 'disconnect (sqlite3_close db)) (void (sync (box-evt result-box)))
(void)))))) (log-db-debug "continuing delayed disconnect")
(real-disconnect from-custodian?)
(void/reference-sink dont-gc)))))]
[else
;; Normal disconnect
(set! -db #f)
;; Unregister custodian shutdown, unless called from custodian.
(unless from-custodian? (unregister-custodian-shutdown this creg))
(set! creg #f)
;; Free all of connection's prepared statements. This will leave
;; pst objects with dangling foreign objects, so don't try to free
;; them again---check that -db is not-#f.
(let ([stmts (hash-keys stmt-table)])
(hash-clear! stmt-table)
(for-each sqlite3_finalize stmts))
(HANDLE 'disconnect (sqlite3_close db))
(void)])))))
(define/public (get-base) this) (define/public (get-base) this)
@ -445,18 +496,40 @@
(define/public (get-error-message) (define/public (get-error-message)
(A (sqlite3_errmsg -db))) (A (sqlite3_errmsg -db)))
;; ---- ;; == OS Thread Support
(super-new)
(let ([shutdown (define use-os-thread? #f)
;; Keep a reference to the class to keep all FFI callout objects (define os-result-box #f) ;; #f or box -- set if os thread is active
;; (eg, sqlite3_close) used by its methods from being finalized.
(let ([dont-gc this%]) (define/public (use-os-thread use?)
(lambda (obj) (when use?
(send obj real-disconnect #t) (unless (os-thread-enabled?)
;; Dummy result to prevent reference from being optimized away (raise (exn:fail:unsupported "use-os-thread: not supported"
dont-gc))]) (current-continuation-marks)))))
(register-finalizer this shutdown) (call-with-lock 'use-os-thread
(set! creg (register-custodian-shutdown this shutdown))))) (lambda () (set! use-os-thread? (and use? #t)))))
(define/private (sync-call-in-os-thread proc)
(define result-box (box #f))
(set! os-result-box result-box)
(call-in-os-thread
(lambda ()
(set-box! result-box (proc))
(set! os-result-box #f)
(signal-received)
(void/reference-sink this)))
(void (sync (box-evt result-box)))
(unbox result-box))
))
(define shutdown-connection
;; Keep a reference to the class to keep all FFI callout objects
;; (eg, sqlite3_close) used by its methods from being finalized.
(let ([dont-gc connection%])
(lambda (obj)
(send obj real-disconnect #t)
;; Dummy result to prevent reference from being optimized away
dont-gc)))
;; ---------------------------------------- ;; ----------------------------------------
@ -564,3 +637,17 @@
SQLITE_IOERR SQLITE_IOERR_BLOCKED SQLITE_IOERR_LOCK SQLITE_CORRUPT SQLITE_IOERR SQLITE_IOERR_BLOCKED SQLITE_IOERR_LOCK SQLITE_CORRUPT
SQLITE_NOTFOUND SQLITE_FULL SQLITE_CANTOPEN SQLITE_PROTOCOL SQLITE_EMPTY SQLITE_NOTFOUND SQLITE_FULL SQLITE_CANTOPEN SQLITE_PROTOCOL SQLITE_EMPTY
SQLITE_FORMAT SQLITE_NOTADB)) SQLITE_FORMAT SQLITE_NOTADB))
;; adapted from readline/rktrl.rkt
(struct box-evt (b)
#:property prop:evt
(unsafe-poller
(lambda (self wakeups)
(if (unbox (box-evt-b self))
(values (list self) #f)
(values #f self)))))
(define signal-received
(case (system-type 'vm)
[(chez-scheme) ((vm-primitive 'unsafe-make-signal-received))]
[else void]))

View File

@ -58,6 +58,9 @@
(_fun _sqlite3_database (_fun _sqlite3_database
-> _int)) -> _int))
(define-sqlite sqlite3_busy_timeout
(_fun _sqlite3_database _int -> _int))
;; -- Stmt -- ;; -- Stmt --
(define (trim-and-copy-buffer buffer) (define (trim-and-copy-buffer buffer)