db: use ffi/unsafe/os-async-channel
Also: - use one OS thread per connection instead of one per query - when using OS thread, finish disconnect in OS thread
This commit is contained in:
parent
4e76091c64
commit
c9861931ad
|
@ -5,8 +5,7 @@
|
||||||
ffi/unsafe/atomic
|
ffi/unsafe/atomic
|
||||||
ffi/unsafe/custodian
|
ffi/unsafe/custodian
|
||||||
ffi/unsafe/os-thread
|
ffi/unsafe/os-thread
|
||||||
ffi/unsafe/vm
|
ffi/unsafe/os-async-channel
|
||||||
ffi/unsafe/schedule
|
|
||||||
"../generic/interfaces.rkt"
|
"../generic/interfaces.rkt"
|
||||||
"../generic/common.rkt"
|
"../generic/common.rkt"
|
||||||
"../generic/prepared.rkt"
|
"../generic/prepared.rkt"
|
||||||
|
@ -192,12 +191,12 @@
|
||||||
(raise e))])
|
(raise e))])
|
||||||
(define result
|
(define result
|
||||||
(cond [use-os-thread?
|
(cond [use-os-thread?
|
||||||
(let ([timeout (inexact->exact (ceiling (* 1000 busy-retry-delay)))])
|
(define timeout (inexact->exact (ceiling (* 1000 busy-retry-delay))))
|
||||||
(A (sqlite3_busy_timeout -db timeout)))
|
(sync-call-in-os-thread
|
||||||
(begin0 (sync-call-in-os-thread
|
(lambda (db)
|
||||||
(lambda ()
|
(sqlite3_busy_timeout db timeout)
|
||||||
(get-rows* who stmt end-box fetch-limit pst #f)))
|
(begin0 (get-rows* who stmt end-box fetch-limit pst #f)
|
||||||
(A (sqlite3_busy_timeout -db 0)))]
|
(sqlite3_busy_timeout db 0))))]
|
||||||
[else (get-rows* who stmt end-box fetch-limit pst #t)]))
|
[else (get-rows* who stmt end-box fetch-limit pst #t)]))
|
||||||
(if (procedure? result) (result) result)))
|
(if (procedure? result) (result) result)))
|
||||||
|
|
||||||
|
@ -302,36 +301,43 @@
|
||||||
(define/public (real-disconnect from-custodian?)
|
(define/public (real-disconnect from-custodian?)
|
||||||
(call-as-atomic
|
(call-as-atomic
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(let ([db -db])
|
(when -db
|
||||||
(cond [(not db)
|
;; Save and clear fields
|
||||||
;; Already disconnected
|
(define dont-gc this%)
|
||||||
|
(define db -db)
|
||||||
|
(define stmts (hash-keys stmt-table))
|
||||||
|
(set! -db #f)
|
||||||
|
(hash-clear! stmt-table)
|
||||||
|
;; Unregister custodian shutdown, unless called from custodian.
|
||||||
|
(unless from-custodian? (unregister-custodian-shutdown this creg))
|
||||||
|
(set! creg #f)
|
||||||
|
;; Close db connection
|
||||||
|
(cond [os-req-chan
|
||||||
|
;; OS thread might be using db, stmts
|
||||||
|
(define resp-chan (make-os-async-channel))
|
||||||
|
(define (shutdown _db)
|
||||||
|
(finish-disconnect db stmts)
|
||||||
|
(when resp-chan (os-async-channel-put resp-chan 'done))
|
||||||
|
(void/reference-sink dont-gc))
|
||||||
|
(log-db-debug "disconnect delayed to OS thread")
|
||||||
|
(os-async-channel-put os-req-chan (cons shutdown #f))
|
||||||
|
(when resp-chan
|
||||||
|
(parameterize ((current-custodian (make-custodian-at-root)))
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(void (sync resp-chan))
|
||||||
|
(log-db-debug "finished delayed disconnect")))))
|
||||||
(void)]
|
(void)]
|
||||||
[os-result-box
|
|
||||||
;; OS thread is running; delay until finished
|
|
||||||
=> (lambda (result-box)
|
|
||||||
(log-db-debug "disconnect delayed by OS thread")
|
|
||||||
(define dont-gc this%)
|
|
||||||
(parameterize ((current-custodian (make-custodian-at-root)))
|
|
||||||
(thread
|
|
||||||
(lambda ()
|
|
||||||
(void (sync (box-evt result-box)))
|
|
||||||
(log-db-debug "continuing delayed disconnect")
|
|
||||||
(real-disconnect from-custodian?)
|
|
||||||
(void/reference-sink dont-gc)))))]
|
|
||||||
[else
|
[else
|
||||||
;; Normal disconnect
|
(finish-disconnect db stmts)
|
||||||
(set! -db #f)
|
(void/reference-sink dont-gc)])))))
|
||||||
;; Unregister custodian shutdown, unless called from custodian.
|
|
||||||
(unless from-custodian? (unregister-custodian-shutdown this creg))
|
(define/private (finish-disconnect db stmts) ;; PRE: in atomic mode
|
||||||
(set! creg #f)
|
;; Free all of connection's prepared statements. This will leave
|
||||||
;; Free all of connection's prepared statements. This will leave
|
;; pst objects with dangling foreign objects, so don't try to free
|
||||||
;; pst objects with dangling foreign objects, so don't try to free
|
;; them again---check that -db is not-#f.
|
||||||
;; them again---check that -db is not-#f.
|
(for-each sqlite3_finalize stmts)
|
||||||
(let ([stmts (hash-keys stmt-table)])
|
(HANDLE 'disconnect (sqlite3_close db)))
|
||||||
(hash-clear! stmt-table)
|
|
||||||
(for-each sqlite3_finalize stmts))
|
|
||||||
(HANDLE 'disconnect (sqlite3_close db))
|
|
||||||
(void)])))))
|
|
||||||
|
|
||||||
(define/public (get-base) this)
|
(define/public (get-base) this)
|
||||||
|
|
||||||
|
@ -499,7 +505,8 @@
|
||||||
;; == OS Thread Support
|
;; == OS Thread Support
|
||||||
|
|
||||||
(define use-os-thread? #f)
|
(define use-os-thread? #f)
|
||||||
(define os-result-box #f) ;; #f or box -- set if os thread is active
|
(define os-req-chan #f) ;; #f or OS-Async-Channel
|
||||||
|
(define os-resp-chan #f) ;; #f or OS-Async-Channel
|
||||||
|
|
||||||
(define/public (use-os-thread use?)
|
(define/public (use-os-thread use?)
|
||||||
(when use?
|
(when use?
|
||||||
|
@ -507,19 +514,30 @@
|
||||||
(raise (exn:fail:unsupported "use-os-thread: not supported"
|
(raise (exn:fail:unsupported "use-os-thread: not supported"
|
||||||
(current-continuation-marks)))))
|
(current-continuation-marks)))))
|
||||||
(call-with-lock 'use-os-thread
|
(call-with-lock 'use-os-thread
|
||||||
(lambda () (set! use-os-thread? (and use? #t)))))
|
(lambda ()
|
||||||
|
(set! use-os-thread? (and use? #t))
|
||||||
|
(when use?
|
||||||
|
(call-as-atomic
|
||||||
|
(lambda ()
|
||||||
|
(unless os-req-chan
|
||||||
|
(define db -db)
|
||||||
|
(define req-chan (make-os-async-channel))
|
||||||
|
(define resp-chan (make-os-async-channel))
|
||||||
|
(call-in-os-thread
|
||||||
|
(lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(define msg (os-async-channel-get req-chan))
|
||||||
|
(define proc (car msg))
|
||||||
|
(define loop? (cdr msg))
|
||||||
|
(os-async-channel-put resp-chan (proc db))
|
||||||
|
(when loop? (loop)))))
|
||||||
|
(set! os-req-chan req-chan)
|
||||||
|
(set! os-resp-chan resp-chan))))))))
|
||||||
|
|
||||||
(define/private (sync-call-in-os-thread proc)
|
(define/private (sync-call-in-os-thread proc)
|
||||||
(define result-box (box #f))
|
(A (when os-req-chan
|
||||||
(set! os-result-box result-box)
|
(os-async-channel-put os-req-chan (cons proc #t))))
|
||||||
(call-in-os-thread
|
(sync os-resp-chan))
|
||||||
(lambda ()
|
|
||||||
(set-box! result-box (proc))
|
|
||||||
(set! os-result-box #f)
|
|
||||||
(signal-received)
|
|
||||||
(void/reference-sink this)))
|
|
||||||
(void (sync (box-evt result-box)))
|
|
||||||
(unbox result-box))
|
|
||||||
))
|
))
|
||||||
|
|
||||||
(define shutdown-connection
|
(define shutdown-connection
|
||||||
|
@ -637,17 +655,3 @@
|
||||||
SQLITE_IOERR SQLITE_IOERR_BLOCKED SQLITE_IOERR_LOCK SQLITE_CORRUPT
|
SQLITE_IOERR SQLITE_IOERR_BLOCKED SQLITE_IOERR_LOCK SQLITE_CORRUPT
|
||||||
SQLITE_NOTFOUND SQLITE_FULL SQLITE_CANTOPEN SQLITE_PROTOCOL SQLITE_EMPTY
|
SQLITE_NOTFOUND SQLITE_FULL SQLITE_CANTOPEN SQLITE_PROTOCOL SQLITE_EMPTY
|
||||||
SQLITE_FORMAT SQLITE_NOTADB))
|
SQLITE_FORMAT SQLITE_NOTADB))
|
||||||
|
|
||||||
;; adapted from readline/rktrl.rkt
|
|
||||||
(struct box-evt (b)
|
|
||||||
#:property prop:evt
|
|
||||||
(unsafe-poller
|
|
||||||
(lambda (self wakeups)
|
|
||||||
(if (unbox (box-evt-b self))
|
|
||||||
(values (list self) #f)
|
|
||||||
(values #f self)))))
|
|
||||||
|
|
||||||
(define signal-received
|
|
||||||
(case (system-type 'vm)
|
|
||||||
[(chez-scheme) ((vm-primitive 'unsafe-make-signal-received))]
|
|
||||||
[else void]))
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user