racket/collects/db/private/generic/connect-util.rkt
2012-12-05 12:19:16 -05:00

380 lines
12 KiB
Racket

#lang racket/base
(require racket/class
"interfaces.rkt"
"common.rkt")
(provide kill-safe-connection
virtual-connection
connection-pool
connection-pool?
connection-pool-lease)
;; manager% implements kill-safe manager thread w/ request channel
(define manager%
(class object%
;; other-evt : (-> evt)
;; generates other evt to sync on besides req-channel, eg timeouts
(init-field (other-evt (lambda () never-evt))
(alt-enabled? (lambda () #t)))
(super-new)
(define req-channel (make-channel))
(define alt-req-channel (make-channel))
(define mthread
(thread/suspend-to-kill
(lambda ()
(let loop ()
(sync (wrap-evt req-channel (lambda (p) (p)))
(if (alt-enabled?)
(wrap-evt alt-req-channel (lambda (p) (p)))
never-evt)
(other-evt))
(loop)))))
(define/public (call proc)
(call* proc req-channel #f))
(define/public (alt-call-evt proc)
(call* proc alt-req-channel #t))
(define/private (call* proc chan as-evt?)
(thread-resume mthread (current-thread))
(let* ([result #f]
[sema (make-semaphore 0)]
[proc (lambda ()
(set! result
(with-handlers ([(lambda (e) #t)
(lambda (e) (cons 'exn e))])
(cons 'values (call-with-values proc list))))
(semaphore-post sema))]
[handler
(lambda (_evt)
(semaphore-wait sema)
(case (car result)
((values) (apply values (cdr result)))
((exn) (raise (cdr result)))))])
(if as-evt?
(wrap-evt (channel-put-evt chan proc) handler)
(begin (channel-put chan proc)
(handler #f)))))))
;; ----
;; Kill-safe wrapper
;; Note: wrapper protects against kill-thread, but not from
;; custodian-shutdown of ports, etc.
(define kill-safe-connection%
(class* object% (connection<%>)
(init-private connection)
(define mgr (new manager%))
(define last-connected? #t)
(define-syntax-rule (define-forward (method arg ...) ...)
(begin
(define/public (method arg ...)
(send mgr call (lambda ()
(begin0
(send connection method arg ...)
(set! last-connected? (send connection connected?))))))
...))
(define/public (connected?) last-connected?)
(define-forward
(disconnect)
(get-dbsystem)
(query fsym stmt cursor?)
(prepare fsym stmt close-on-exec?)
(fetch/cursor fsym cursor fetch-size)
(get-base)
(free-statement stmt need-lock?)
(transaction-status fsym)
(start-transaction fsym isolation option cwt?)
(end-transaction fsym mode cwt?)
(list-tables fsym schema))
(super-new)))
;; ----
(define (kill-safe-connection connection)
(new kill-safe-connection%
(connection connection)))
;; ========================================
;; Virtual connection
(define virtual-connection%
(class* object% (connection<%>)
(init-private connector ;; called from client thread
get-key) ;; called from client thread
(super-new)
(define custodian (current-custodian))
;; == methods called in manager thread ==
;; key=>conn : hasheq[key => connection]
(define key=>conn (make-hasheq))
(define/private (get key) ;; also called by client thread for connected?
(hash-ref key=>conn key #f))
(define/private (put! key c)
(hash-set! key=>conn key c))
(define/private (remove! key)
(let ([c (get key)])
(when c
(hash-remove! key=>conn key)
(send c disconnect))))
(define mgr
(new manager%
(other-evt
(lambda ()
(choice-evt
(let ([keys (hash-map key=>conn (lambda (k v) k))])
(handle-evt (apply choice-evt keys)
;; Assignment to key has expired
(lambda (key)
(log-db-debug "virtual-connection: key expiration: ~e" key)
(remove! key)))))))))
;; == methods called in client thread ==
(define/private (get-connection create?)
(let* ([key (get-key)]
[c (send mgr call (lambda () (get key)))])
(cond [(and c (send c connected?)) c]
[create?
(let ([c* (parameterize ((current-custodian custodian))
(connector))])
(send mgr call
(lambda ()
(when c (remove! key))
(put! key c*)))
c*)]
[else
(when c ;; got a disconnected connection
(send mgr call (lambda () (remove! key))))
#f])))
;; ----
(define-syntax-rule (define-forward (req-con? no-con (method arg ...)) ...)
(begin (define/public (method arg ...)
(let ([c (get-connection req-con?)])
(if c
(send c method arg ...)
no-con)))
...))
(define-forward
(#t '_ (get-dbsystem))
(#t '_ (query fsym stmt cursor?))
(#t '_ (fetch/cursor fsym stmt fetch-size))
(#t '_ (start-transaction fsym isolation option cwt?))
(#f (void) (end-transaction fsym mode cwt?))
(#f #f (transaction-status fsym))
(#t '_ (list-tables fsym schema)))
(define/public (get-base)
(get-connection #t))
(define/public (connected?)
(let ([c (get (get-key))])
(and c (send c connected?))))
(define/public (disconnect)
(let ([c (get-connection #f)]
[key (get-key)])
(when c
(send c disconnect)
(send mgr call (lambda () (remove! key)))))
(void))
(define/public (prepare fsym stmt close-on-exec?)
;; FIXME: hacky way of supporting virtual-statement
(unless (or close-on-exec? (eq? fsym 'virtual-statement))
(error fsym "cannot prepare statement with virtual connection"))
(send (get-connection #t) prepare fsym stmt close-on-exec?))
(define/public (free-statement stmt need-lock?)
(error/internal 'free-statement "virtual connection does not own statements"))))
;; ----
(define (virtual-connection connector)
(let ([connector
(cond [(connection-pool? connector)
(lambda () (connection-pool-lease connector))]
[else connector])]
[get-key (lambda () (thread-dead-evt (current-thread)))])
(new virtual-connection%
(connector connector)
(get-key get-key))))
;; ========================================
;; Connection pool
(define connection-pool%
(class* object% ()
(init-private connector ;; called from manager thread
max-connections
max-idle-connections)
(super-new)
(define proxy-counter 1) ;; for debugging
(define actual-counter 1) ;; for debugging
(define actual=>number (make-weak-hasheq))
;; == methods called in manager thread ==
;; assigned-connections : nat
(define assigned-connections 0)
;; proxy=>evt : hasheq[proxy-connection => evt]
(define proxy=>evt (make-hasheq))
;; idle-list : (listof raw-connection)
(define idle-list null)
(define/private (lease* key)
(let* ([take-idle? (pair? idle-list)]
[raw-c
(cond [take-idle?
(begin0 (car idle-list)
(set! idle-list (cdr idle-list)))]
[else (new-connection)])]
[proxy-number (begin0 proxy-counter (set! proxy-counter (add1 proxy-counter)))]
[c (new proxy-connection% (pool this) (connection raw-c) (number proxy-number))])
(log-db-debug "connection-pool: leasing connection #~a (~a @~a)"
proxy-number
(if take-idle? "idle" "new")
(hash-ref actual=>number raw-c "???"))
(hash-set! proxy=>evt c (wrap-evt key (lambda (_e) c)))
(set! assigned-connections (add1 assigned-connections))
c))
(define/private (release* proxy raw-c why)
(log-db-debug "connection-pool: releasing connection #~a (~a, ~a)"
(send proxy get-number)
(cond [(not raw-c) "no-op"]
[(< (length idle-list) max-idle-connections) "idle"]
[else "disconnect"])
why)
(hash-remove! proxy=>evt proxy)
(when raw-c
(with-handlers ([exn:fail? void])
;; If in tx, just disconnect (for simplicity; else must loop for nested txs)
(when (send raw-c transaction-status 'connection-pool)
(send raw-c disconnect)))
(cond [(and (< (length idle-list) max-idle-connections)
(send raw-c connected?))
(set! idle-list (cons raw-c idle-list))]
[else (send raw-c disconnect)])
(set! assigned-connections (sub1 assigned-connections))))
(define/private (new-connection)
(let ([c (connector)]
[actual-number
(begin0 actual-counter
(set! actual-counter (add1 actual-counter)))])
(when (or (hash-ref proxy=>evt c #f) (memq c idle-list))
(error 'connection-pool "connect function did not produce a fresh connection"))
(hash-set! actual=>number c actual-number)
c))
(define mgr
(new manager%
(other-evt
(lambda ()
(let ([evts (hash-values proxy=>evt)])
(handle-evt (apply choice-evt evts)
(lambda (proxy)
(release* proxy
(send proxy release-connection)
"release-evt"))))))
(alt-enabled? (lambda () (< assigned-connections max-connections)))))
;; == methods called in client thread ==
(define/public (lease-evt key)
(send mgr alt-call-evt (lambda () (lease* key))))
(define/public (release proxy)
(let ([raw-c (send proxy release-connection)])
(send mgr call (lambda () (release* proxy raw-c "proxy disconnect"))))
(void))))
;; --
(define proxy-connection%
(class* locking% (connection<%>)
(init-private connection
pool
number)
(inherit call-with-lock)
(super-new)
(define-syntax-rule (define-forward defmethod (method arg ...) ...)
(begin
(defmethod (method arg ...)
(call-with-lock 'method
(lambda ()
(let ([c connection])
(unless c (error/not-connected 'method))
(send c method arg ...)))))
...))
(define-forward define/public
(get-dbsystem)
(query fsym stmt cursor?)
(prepare fsym stmt close-on-exec?)
(fetch/cursor fsym stmt fetch-size)
(get-base)
(free-statement stmt need-lock?)
(transaction-status fsym)
(start-transaction fsym isolation option cwt?)
(end-transaction fsym mode cwt?)
(list-tables fsym schema))
(define/override (connected?) (and connection (send connection connected?)))
(define/public (disconnect)
(send pool release this))
(define/public (get-number) number)
(define/public (release-connection)
(begin0 connection
(set! connection #f)))))
;; ----
(define (connection-pool connector
#:max-connections [max-connections +inf.0]
#:max-idle-connections [max-idle-connections 10])
(new connection-pool%
(connector connector)
(max-connections max-connections)
(max-idle-connections max-idle-connections)))
(define (connection-pool? x)
(is-a? x connection-pool%))
(define (connection-pool-lease pool [key (current-thread)])
(let* ([key
(cond [(thread? key) (thread-dead-evt key)]
[(custodian? key) (make-custodian-box key #t)]
[else key])]
[result (sync/timeout 0.1 (send pool lease-evt key))])
(unless result
(error 'connection-pool-lease "connection pool limit reached"))
result))