From 594dfafdf4182a029d797b260518f41d40328d23 Mon Sep 17 00:00:00 2001 From: Ryan Culpepper Date: Sun, 14 Feb 2021 01:27:40 +0100 Subject: [PATCH] db: factor out ffi-common mixin --- .../db/private/generic/ffi-common.rkt | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 racket/collects/db/private/generic/ffi-common.rkt diff --git a/racket/collects/db/private/generic/ffi-common.rkt b/racket/collects/db/private/generic/ffi-common.rkt new file mode 100644 index 0000000000..bf38c7bae3 --- /dev/null +++ b/racket/collects/db/private/generic/ffi-common.rkt @@ -0,0 +1,115 @@ +#lang racket/base +(require racket/class + ffi/unsafe/atomic + ffi/unsafe/custodian + ffi/unsafe/os-thread + ffi/unsafe/os-async-channel + "interfaces.rkt") +(provide (protect-out + ffi-connection-mixin)) + +;; Convention: methods names starting with "-" usually indicate methods that +;; must be called in atomic mode. + +(define (ffi-connection-mixin %) + (class % + (inherit call-with-lock) + (super-new) + + ;; -get-db : -> DB/#f + (abstract -get-db) + + ;; -get-do-disconnect : -> (-> (-> Void)) + ;; Partially disconnect (ie, mark as disconnected) and return a closure that + ;; finishes disconnecting and returns a closure to report disconnection errors. + ;; The first closure result may be called in an OS thread. + ;; The second closure result is always called in a Racket thread (but maybe atomic). + (abstract -get-do-disconnect) + + ;; ---------------------------------------- + ;; Disconnect + + (define/override (disconnect* _politely?) + (super disconnect* _politely?) + (real-disconnect)) + + (define/public (real-disconnect) + (call-as-atomic + (lambda () + (when (-get-db) + ;; Partially disconnect + (define do-disconnect (-get-do-disconnect)) + ;; Finish disconnecting + (cond [os-req-chan + ;; OS thread might be using db, stmts + (define resp-chan (make-os-async-channel)) + (define (shutdown _db) + (define done (do-disconnect)) + (when resp-chan (os-async-channel-put resp-chan done))) + (log-db-debug "disconnect delayed to OS thread") + (os-async-channel-put os-req-chan (cons shutdown #f)) + (when resp-chan + (parameterize ((current-custodian (make-custodian-at-root))) + (thread + (lambda () + (define done (sync resp-chan)) + (log-db-debug "finished delayed disconnect") + (done))))) + (void)] + [else ((do-disconnect))]))))) + + ;; ---------------------------------------- + ;; OS Thread Support + + (define use-os-thread? #f) + (define os-req-chan #f) ;; #f or OS-Async-Channel + (define os-resp-chan #f) ;; #f or OS-Async-Channel + + (define/public (get-use-os-thread?) use-os-thread?) + + (define/public (use-os-thread use?) + (when use? + (unless (os-thread-enabled?) + (raise (exn:fail:unsupported "use-os-thread: not supported" + (current-continuation-marks))))) + (call-with-lock 'use-os-thread + (lambda () + (set! use-os-thread? (and use? #t)) + (when use? + (call-as-atomic + (lambda () + (unless os-req-chan + (define db (-get-db)) + (define req-chan (make-os-async-channel)) + (define resp-chan (make-os-async-channel)) + (call-in-os-thread + (lambda () + (let loop () + (define msg (os-async-channel-get req-chan)) + (define proc (car msg)) + (define loop? (cdr msg)) + (os-async-channel-put resp-chan (proc db)) + (when loop? (loop))))) + (set! os-req-chan req-chan) + (set! os-resp-chan resp-chan)))))))) + + ;; sync-call-in-os-thread : (DB/#f -> X) -> X + ;; Calls proc either normally or in an OS thread. If in OS thread, proc is + ;; passed the saved DB value (in case of partial disconnects); if called + ;; normally, proc is passed #f. + (define/public (sync-call-in-os-thread proc) + (start-atomic) + (cond [(-get-db) + (when os-req-chan + (os-async-channel-put os-req-chan (cons proc #t))) + (end-atomic)] + [else + (end-atomic) + (error)]) + (sync os-resp-chan)) + + ;; sync-call : (DB/#f -> X) -> X + (define/public (sync-call proc) + (cond [use-os-thread? (sync-call-in-os-thread proc)] + [else (proc #f)])) + ))