diff --git a/racket/collects/db/private/sqlite3/connection.rkt b/racket/collects/db/private/sqlite3/connection.rkt index a2a44965bb..360a123176 100644 --- a/racket/collects/db/private/sqlite3/connection.rkt +++ b/racket/collects/db/private/sqlite3/connection.rkt @@ -4,6 +4,9 @@ ffi/unsafe ffi/unsafe/atomic ffi/unsafe/custodian + ffi/unsafe/os-thread + ffi/unsafe/vm + ffi/unsafe/schedule "../generic/interfaces.rkt" "../generic/common.rkt" "../generic/prepared.rkt" @@ -28,10 +31,13 @@ (init-private db-spec ;; #f or (list path mode) busy-retry-limit busy-retry-delay) + (super-new) + + (register-finalizer this shutdown-connection) (define -db db) (define saved-tx-status #f) ;; set by with-lock, only valid while locked - (define creg #f) ;; custodian registration + (define creg (register-custodian-shutdown this shutdown-connection)) (sqlite3_extended_result_codes db #t) @@ -106,7 +112,7 @@ (and (null? info) (A (sqlite3_total_changes db)))] [result (or cursor? - (step* fsym db stmt #f +inf.0 pst))]) + (get-rows fsym stmt #f +inf.0 pst))]) (unless (eq? (get-tx-status) 'invalid) (set-tx-status! fsym (read-tx-status))) (unless cursor? @@ -147,7 +153,7 @@ (cond [(unbox end-box) #f] [else (let ([stmt (send pst get-handle)]) - (begin0 (step* fsym (get-db fsym) stmt end-box fetch-size pst) + (begin0 (get-rows fsym stmt end-box fetch-size pst) (when (unbox end-box) (send pst after-exec #f))))]))))) @@ -178,47 +184,76 @@ [else (error/internal* fsym "bad parameter value" '("value" value) param)]))) - (define/private (step* fsym db stmt end-box fetch-limit pst) + (define/private (get-rows who stmt end-box fetch-limit pst) (with-handlers ([exn:fail? (lambda (e) (A (sqlite3_reset stmt) (sqlite3_clear_bindings stmt)) (raise e))]) - (let loop ([fetch-limit fetch-limit]) - (if (zero? fetch-limit) - null - (let ([c (step fsym db stmt pst)]) - (cond [c - (cons c (loop (sub1 fetch-limit)))] - [else - (A (sqlite3_reset stmt) - (sqlite3_clear_bindings stmt)) - (when end-box (set-box! end-box #t)) - null])))))) + (define result + (cond [use-os-thread? + (let ([timeout (inexact->exact (ceiling (* 1000 busy-retry-delay)))]) + (A (sqlite3_busy_timeout -db timeout))) + (begin0 (sync-call-in-os-thread + (lambda () + (get-rows* who stmt end-box fetch-limit pst #f))) + (A (sqlite3_busy_timeout -db 0)))] + [else (get-rows* who stmt end-box fetch-limit pst #t)])) + (if (procedure? result) (result) result))) - (define/private (step fsym db stmt pst) - (let ([s (HANDLE fsym (A (sqlite3_step stmt)) pst)]) - (cond [(= s SQLITE_DONE) #f] - [(= s SQLITE_ROW) - (let* ([column-count (A (sqlite3_column_count stmt))] - [vec (make-vector column-count)]) - (for ([i (in-range column-count)]) - (vector-set! vec i - (let ([type (A (sqlite3_column_type stmt i))]) - (cond [(= type SQLITE_NULL) - sql-null] - [(= type SQLITE_INTEGER) - (A (sqlite3_column_int64 stmt i))] - [(= type SQLITE_FLOAT) - (A (sqlite3_column_double stmt i))] - [(= type SQLITE_TEXT) - (A (sqlite3_column_text stmt i))] - [(= type SQLITE_BLOB) - (A (sqlite3_column_blob stmt i))] - [else - (error/internal* fsym "unknown column type" - "type" type)])))) - vec)]))) + (define/private (get-rows* who stmt end-box fetch-limit pst fine-atomic?) + (define (call-as-fine-atomic thunk) (if fine-atomic? (A (thunk)) (thunk))) + (define-syntax-rule (FA expr ...) (call-as-fine-atomic (lambda () expr ...))) + ;; step* : -> (U (Listof Vector) (-> Any)) + (define (step*) + (let loop ([fetch-limit fetch-limit]) + (cond [(zero? fetch-limit) null] + [(step) + => (lambda (c) + (cond [(procedure? c) c] + [else (cons c (loop (sub1 fetch-limit)))]))] + [else + (FA (sqlite3_reset stmt) (sqlite3_clear_bindings stmt)) + (when end-box (set-box! end-box #t)) + null]))) + ;; step : -> (U #f Vector (-> Any)) + (define (step) + (let loop ([iteration 0]) + (define full-s (FA (sqlite3_step stmt))) + (define s (simplify-status full-s)) + (cond [(= s SQLITE_DONE) #f] + [(= s SQLITE_ROW) (FA (-get-row who stmt))] + [(and (= s SQLITE_BUSY) (< iteration busy-retry-limit)) + ;; Normally, sleep and try again (cooperates w/ scheduler). + ;; In os-thread, can't sleep; see sqlite3_busy_timeout above. + (when fine-atomic? (sleep busy-retry-delay)) + (loop (add1 iteration))] + [else (lambda () (handle-status who full-s pst))]))) + (step*)) + + ;; -get-row : Symbol stmt -> (U Vector (-> Any)) + ;; PRE: in atomic mode + (define/private (-get-row fsym stmt) + (define column-count (sqlite3_column_count stmt)) + (define vec (make-vector column-count)) + (for ([i (in-range column-count)]) + (define val + (let ([type (sqlite3_column_type stmt i)]) + (cond [(= type SQLITE_NULL) + sql-null] + [(= type SQLITE_INTEGER) + (sqlite3_column_int64 stmt i)] + [(= type SQLITE_FLOAT) + (sqlite3_column_double stmt i)] + [(= type SQLITE_TEXT) + (sqlite3_column_text stmt i)] + [(= type SQLITE_BLOB) + (sqlite3_column_blob stmt i)] + [else + (lambda () + (error/internal* fsym "unknown column type" "type" type))]))) + (vector-set! vec i val)) + vec) (define/override (classify-stmt sql) (classify-sl-sql sql)) @@ -268,19 +303,35 @@ (call-as-atomic (lambda () (let ([db -db]) - (when db - (set! -db #f) - ;; Unregister custodian shutdown, unless called from custodian. - (unless from-custodian? (unregister-custodian-shutdown this creg)) - (set! creg #f) - ;; Free all of connection's prepared statements. This will leave - ;; pst objects with dangling foreign objects, so don't try to free - ;; them again---check that -db is not-#f. - (let ([stmts (hash-keys stmt-table)]) - (hash-clear! stmt-table) - (for-each sqlite3_finalize stmts)) - (HANDLE 'disconnect (sqlite3_close db)) - (void)))))) + (cond [(not db) + ;; Already disconnected + (void)] + [os-result-box + ;; OS thread is running; delay until finished + => (lambda (result-box) + (log-db-debug "disconnect delayed by OS thread") + (define dont-gc this%) + (parameterize ((current-custodian (make-custodian-at-root))) + (thread + (lambda () + (void (sync (box-evt result-box))) + (log-db-debug "continuing delayed disconnect") + (real-disconnect from-custodian?) + (void/reference-sink dont-gc)))))] + [else + ;; Normal disconnect + (set! -db #f) + ;; Unregister custodian shutdown, unless called from custodian. + (unless from-custodian? (unregister-custodian-shutdown this creg)) + (set! creg #f) + ;; Free all of connection's prepared statements. This will leave + ;; pst objects with dangling foreign objects, so don't try to free + ;; them again---check that -db is not-#f. + (let ([stmts (hash-keys stmt-table)]) + (hash-clear! stmt-table) + (for-each sqlite3_finalize stmts)) + (HANDLE 'disconnect (sqlite3_close db)) + (void)]))))) (define/public (get-base) this) @@ -445,18 +496,40 @@ (define/public (get-error-message) (A (sqlite3_errmsg -db))) - ;; ---- - (super-new) - (let ([shutdown - ;; Keep a reference to the class to keep all FFI callout objects - ;; (eg, sqlite3_close) used by its methods from being finalized. - (let ([dont-gc this%]) - (lambda (obj) - (send obj real-disconnect #t) - ;; Dummy result to prevent reference from being optimized away - dont-gc))]) - (register-finalizer this shutdown) - (set! creg (register-custodian-shutdown this shutdown))))) + ;; == OS Thread Support + + (define use-os-thread? #f) + (define os-result-box #f) ;; #f or box -- set if os thread is active + + (define/public (use-os-thread use?) + (when use? + (unless (os-thread-enabled?) + (raise (exn:fail:unsupported "use-os-thread: not supported" + (current-continuation-marks))))) + (call-with-lock 'use-os-thread + (lambda () (set! use-os-thread? (and use? #t))))) + + (define/private (sync-call-in-os-thread proc) + (define result-box (box #f)) + (set! os-result-box result-box) + (call-in-os-thread + (lambda () + (set-box! result-box (proc)) + (set! os-result-box #f) + (signal-received) + (void/reference-sink this))) + (void (sync (box-evt result-box))) + (unbox result-box)) + )) + +(define shutdown-connection + ;; Keep a reference to the class to keep all FFI callout objects + ;; (eg, sqlite3_close) used by its methods from being finalized. + (let ([dont-gc connection%]) + (lambda (obj) + (send obj real-disconnect #t) + ;; Dummy result to prevent reference from being optimized away + dont-gc))) ;; ---------------------------------------- @@ -564,3 +637,17 @@ SQLITE_IOERR SQLITE_IOERR_BLOCKED SQLITE_IOERR_LOCK SQLITE_CORRUPT SQLITE_NOTFOUND SQLITE_FULL SQLITE_CANTOPEN SQLITE_PROTOCOL SQLITE_EMPTY SQLITE_FORMAT SQLITE_NOTADB)) + +;; adapted from readline/rktrl.rkt +(struct box-evt (b) + #:property prop:evt + (unsafe-poller + (lambda (self wakeups) + (if (unbox (box-evt-b self)) + (values (list self) #f) + (values #f self))))) + +(define signal-received + (case (system-type 'vm) + [(chez-scheme) ((vm-primitive 'unsafe-make-signal-received))] + [else void])) diff --git a/racket/collects/db/private/sqlite3/ffi.rkt b/racket/collects/db/private/sqlite3/ffi.rkt index 65cd9aa8dc..f876eed5c8 100644 --- a/racket/collects/db/private/sqlite3/ffi.rkt +++ b/racket/collects/db/private/sqlite3/ffi.rkt @@ -58,6 +58,9 @@ (_fun _sqlite3_database -> _int)) +(define-sqlite sqlite3_busy_timeout + (_fun _sqlite3_database _int -> _int)) + ;; -- Stmt -- (define (trim-and-copy-buffer buffer)