db: improved locking
Tests suggest new locking is faster, but primary benefit is detecting when thread holding lock is killed.
This commit is contained in:
parent
19b1ff101c
commit
8611435269
|
@ -1,5 +1,6 @@
|
||||||
#lang racket/base
|
#lang racket/base
|
||||||
(require racket/class)
|
(require racket/class
|
||||||
|
ffi/unsafe/atomic)
|
||||||
(provide connection<%>
|
(provide connection<%>
|
||||||
dbsystem<%>
|
dbsystem<%>
|
||||||
prepared-statement<%>
|
prepared-statement<%>
|
||||||
|
@ -188,27 +189,33 @@
|
||||||
|
|
||||||
;; Connection base class (locking)
|
;; Connection base class (locking)
|
||||||
|
|
||||||
;; Disabled for now, because this is an 80% solution. Unfortunately, I
|
|
||||||
;; think a 100% solution would require an auxiliary kill-safe thread
|
|
||||||
;; with multiple thread switches *per lock acquisition*. At that
|
|
||||||
;; point, might as well just use kill-safe connection.
|
|
||||||
(define USE-LOCK-HOLDER? #f)
|
|
||||||
|
|
||||||
(define locking%
|
(define locking%
|
||||||
(class object%
|
(class object%
|
||||||
|
|
||||||
;; == Communication locking
|
;; == Communication locking
|
||||||
|
|
||||||
(define lock (make-semaphore 1))
|
;; Goal: we would like to be able to detect if a thread has
|
||||||
|
|
||||||
;; Ideally, we would like to be able to detect if a thread has
|
|
||||||
;; acquired the lock and then died, leaving the connection
|
;; acquired the lock and then died, leaving the connection
|
||||||
;; permanently locked. Roughly, we would like this: if lock is
|
;; permanently locked.
|
||||||
;; held by thread th, then lock-holder = (thread-dead-evt th),
|
;;
|
||||||
;; and if lock is not held, then lock-holder = never-evt.
|
;; lock-holder=(thread-dead-evt thd) iff thd has acquired inner-lock
|
||||||
;; Unfortunately, there are intervals when this is not true.
|
;; - lock-holder, inner-lock always modified together within
|
||||||
;; Also, since lock-holder changes, reference might be stale, so
|
;; atomic block
|
||||||
;; need to double-check.
|
;;
|
||||||
|
;; Thus if (thread-dead-evt thd) is ready, thd died holding
|
||||||
|
;; inner-lock, so hopelessly locked.
|
||||||
|
;;
|
||||||
|
;; outer-sema = inner-lock
|
||||||
|
;; - outer-sema, inner-lock always modified together within atomic
|
||||||
|
;;
|
||||||
|
;; The outer-lock just prevents threads from spinning polling
|
||||||
|
;; inner-lock. If a thread gets past outer-lock and dies before
|
||||||
|
;; acquiring inner-lock, ok, because outer-lock still open at that
|
||||||
|
;; point, so other threads can enter outer-lock and acquire inner-lock.
|
||||||
|
|
||||||
|
(define outer-sema (make-semaphore 1))
|
||||||
|
(define outer-lock (semaphore-peek-evt outer-sema))
|
||||||
|
(define inner-lock (make-semaphore 1))
|
||||||
(define lock-holder never-evt)
|
(define lock-holder never-evt)
|
||||||
|
|
||||||
;; Delay async calls (eg, notice handler) until unlock
|
;; Delay async calls (eg, notice handler) until unlock
|
||||||
|
@ -220,21 +227,32 @@
|
||||||
(call-with-lock* who proc #f #t))
|
(call-with-lock* who proc #f #t))
|
||||||
|
|
||||||
(define/public-final (call-with-lock* who proc hopeless require-connected?)
|
(define/public-final (call-with-lock* who proc hopeless require-connected?)
|
||||||
(let* ([me (thread-dead-evt (current-thread))]
|
(let ([me (thread-dead-evt (current-thread))]
|
||||||
[result (sync lock lock-holder)])
|
[result (sync outer-lock lock-holder)])
|
||||||
(cond [(eq? result lock)
|
(cond [(eq? result outer-lock)
|
||||||
|
;; Got past outer stage
|
||||||
|
(let ([proceed?
|
||||||
|
(begin (start-atomic)
|
||||||
|
(let ([proceed? (semaphore-try-wait? inner-lock)])
|
||||||
|
(when proceed?
|
||||||
|
(set! lock-holder me)
|
||||||
|
(semaphore-wait outer-sema))
|
||||||
|
(end-atomic)
|
||||||
|
proceed?))])
|
||||||
|
(cond [proceed?
|
||||||
;; Acquired lock
|
;; Acquired lock
|
||||||
(when USE-LOCK-HOLDER? (set! lock-holder me))
|
;; - lock-holder = me, and outer-lock is closed again
|
||||||
(when (and require-connected? (not (connected?)))
|
(when (and require-connected? (not (connected?)))
|
||||||
(semaphore-post lock)
|
(unlock)
|
||||||
(error/not-connected who))
|
(error/not-connected who))
|
||||||
(with-handlers ([values (lambda (e) (unlock) (raise e))])
|
(with-handlers ([values (lambda (e) (unlock) (raise e))])
|
||||||
(begin0 (proc) (unlock)))]
|
(begin0 (proc) (unlock)))]
|
||||||
|
[else
|
||||||
|
;; Didn't acquire lock; retry
|
||||||
|
(call-with-lock* who proc hopeless require-connected?)]))]
|
||||||
[(eq? result lock-holder)
|
[(eq? result lock-holder)
|
||||||
;; Thread holding lock is dead
|
;; Thread holding lock is dead
|
||||||
(if hopeless
|
(if hopeless (hopeless) (error/hopeless who))]
|
||||||
(hopeless)
|
|
||||||
(error/hopeless who))]
|
|
||||||
[else
|
[else
|
||||||
;; lock-holder was stale; retry
|
;; lock-holder was stale; retry
|
||||||
(call-with-lock* who proc hopeless require-connected?)])))
|
(call-with-lock* who proc hopeless require-connected?)])))
|
||||||
|
@ -242,8 +260,11 @@
|
||||||
(define/private (unlock)
|
(define/private (unlock)
|
||||||
(let ([async-calls (reverse delayed-async-calls)])
|
(let ([async-calls (reverse delayed-async-calls)])
|
||||||
(set! delayed-async-calls null)
|
(set! delayed-async-calls null)
|
||||||
(when USE-LOCK-HOLDER? (set! lock-holder never-evt))
|
(start-atomic)
|
||||||
(semaphore-post lock)
|
(set! lock-holder never-evt)
|
||||||
|
(semaphore-post inner-lock)
|
||||||
|
(semaphore-post outer-sema)
|
||||||
|
(end-atomic)
|
||||||
(for-each call-with-continuation-barrier async-calls)))
|
(for-each call-with-continuation-barrier async-calls)))
|
||||||
|
|
||||||
;; needs overriding
|
;; needs overriding
|
||||||
|
|
Loading…
Reference in New Issue
Block a user