db: simplify locking
This commit is contained in:
parent
f1a5dab4c7
commit
f502cf3b4e
|
@ -99,34 +99,107 @@
|
|||
|
||||
;; ----------------------------------------
|
||||
|
||||
(module lock racket/base
|
||||
(require ffi/unsafe/atomic)
|
||||
(provide lock?
|
||||
make-lock
|
||||
lock-acquire
|
||||
lock-release
|
||||
call-with-lock
|
||||
(protect-out
|
||||
lock-acquire/start-atomic))
|
||||
|
||||
;; Goal: we would like to be able to detect if a thread has
|
||||
;; acquired the lock and then died, leaving the connection
|
||||
;; permanently locked.
|
||||
|
||||
;; A lock has two states, and state changes are atomic.
|
||||
;; - locked: sema = 0, owner = thread-dead-evt of owner
|
||||
;; - unlocked: sema = 1, owner = never-evt
|
||||
|
||||
(struct lock (sema sema-peek [owner #:mutable]))
|
||||
|
||||
(define (make-lock)
|
||||
(define sema (make-semaphore 1))
|
||||
(lock sema (semaphore-peek-evt sema) never-evt))
|
||||
|
||||
;; PRE: not in atomic mode
|
||||
;; Warning: If hopeless callback can return normally (especially if it returns
|
||||
;; #<void>), then it is ambiguous whether lock was acquired.
|
||||
(define (lock-acquire lk [hopeless #f] #:enable-break? [enable-break? #f])
|
||||
(lock-acquire/start-atomic lk hopeless end-atomic #:enable-break? enable-break?))
|
||||
|
||||
;; PRE: not in atomic mode
|
||||
;; Warning: If hopeless callback can return normally (especially #<void>), it
|
||||
;; is ambiguous whether lock was acquired (and whether still in atomic mode).
|
||||
(define (lock-acquire/start-atomic lk [hopeless #f] [acquired #f]
|
||||
#:enable-break? [enable-break? #f])
|
||||
(unless (lock? lk) (raise-argument-error 'lock-acquire "lock?" lk))
|
||||
(unless (or (eq? hopeless #f) (procedure? hopeless) (symbol? hopeless))
|
||||
(raise-argument-error 'lock-acquire "(or/c #f procedure? symbol?)" hopeless))
|
||||
(unless (or (eq? acquired #f) (procedure? acquired))
|
||||
(raise-argument-error 'lock-acquire "(or/c #f procedure?)" acquired))
|
||||
(define me (thread-dead-evt (current-thread)))
|
||||
(define sema (lock-sema lk))
|
||||
(define sema-peek (lock-sema-peek lk))
|
||||
(let loop ()
|
||||
(define result
|
||||
(cond [enable-break? (sync/enable-break sema-peek (lock-owner lk))]
|
||||
[else (sync sema-peek (lock-owner lk))]))
|
||||
(cond [(eq? result sema-peek)
|
||||
;; Got past outer stage
|
||||
(start-atomic)
|
||||
(cond [(eq? (lock-owner lk) never-evt)
|
||||
;; Currently unlocked => acquire
|
||||
(set-lock-owner! lk me)
|
||||
(semaphore-wait sema)
|
||||
;; Still in atomic mode!
|
||||
(if acquired (acquired) (void))]
|
||||
[else
|
||||
;; Other thread got here first => retry
|
||||
(end-atomic)
|
||||
(loop)])]
|
||||
[(eq? result (lock-owner lk))
|
||||
;; Thread holding lock is dead
|
||||
(if (procedure? hopeless)
|
||||
(hopeless)
|
||||
(error (or hopeless 'lock-acquire) "the thread owning the lock is dead"))]
|
||||
[(eq? result me)
|
||||
;; Attempt to recursively acquire lock
|
||||
(error 'lock-acquire "attempted to recursively acquire lock")]
|
||||
[else
|
||||
;; Owner was stale => retry
|
||||
;; This can happen if the thread holding the lock releases
|
||||
;; it and then immediately dies.
|
||||
(loop)])))
|
||||
|
||||
;; safe to call in atomic mode
|
||||
(define (lock-release lk)
|
||||
(unless (lock? lk) (raise-argument-error 'lock-release "lock?" lk))
|
||||
(start-atomic)
|
||||
(set-lock-owner! lk never-evt)
|
||||
(semaphore-post (lock-sema lk))
|
||||
(end-atomic))
|
||||
|
||||
;; PRE: not in atomic mode
|
||||
(define (call-with-lock lk proc [hopeless #f]
|
||||
#:enable-break? [enable-break? #f])
|
||||
(lock-acquire/start-atomic lk hopeless #:enable-break? enable-break?)
|
||||
(with-handlers ([(lambda (e) #t)
|
||||
(lambda (e) (lock-release lk) (raise e))])
|
||||
(end-atomic)
|
||||
(begin0 (proc)
|
||||
(lock-release lk)))))
|
||||
|
||||
(require (rename-in (submod "." lock)
|
||||
[call-with-lock lock:call-with-lock]))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
(define locking%
|
||||
(class debugging%
|
||||
|
||||
;; == Communication locking
|
||||
|
||||
;; Goal: we would like to be able to detect if a thread has
|
||||
;; acquired the lock and then died, leaving the connection
|
||||
;; permanently locked.
|
||||
;;
|
||||
;; lock-holder=(thread-dead-evt thd) iff thd has acquired inner-lock
|
||||
;; - lock-holder, inner-lock always modified together within
|
||||
;; atomic block
|
||||
;;
|
||||
;; Thus if (thread-dead-evt thd) is ready, thd died holding
|
||||
;; inner-lock, so hopelessly locked.
|
||||
;;
|
||||
;; outer-sema = inner-lock
|
||||
;; - outer-sema, inner-lock always modified together within atomic
|
||||
;;
|
||||
;; The outer-lock just prevents threads from spinning polling
|
||||
;; inner-lock. If a thread gets past outer-lock and dies before
|
||||
;; acquiring inner-lock, ok, because outer-lock still open at that
|
||||
;; point, so other threads can enter outer-lock and acquire inner-lock.
|
||||
|
||||
(define outer-sema (make-semaphore 1))
|
||||
(define outer-lock (semaphore-peek-evt outer-sema))
|
||||
(define inner-lock (make-semaphore 1))
|
||||
(define lock-holder never-evt)
|
||||
(define lock (make-lock))
|
||||
|
||||
;; Delay async calls (eg, notice handler) until unlock
|
||||
(define delayed-async-calls null)
|
||||
|
@ -139,57 +212,20 @@
|
|||
|
||||
;; LOCKING: requires unlocked
|
||||
(define/public-final (call-with-lock* who proc hopeless require-connected?)
|
||||
(let ([me (thread-dead-evt (current-thread))]
|
||||
[eb? (break-enabled)]
|
||||
[result (sync outer-lock lock-holder)])
|
||||
(cond [(eq? result outer-lock)
|
||||
;; Got past outer stage
|
||||
(break-enabled #f)
|
||||
(let ([proceed?
|
||||
(begin (start-atomic)
|
||||
(let ([proceed? (semaphore-try-wait? inner-lock)])
|
||||
(when proceed?
|
||||
(set! lock-holder me)
|
||||
(semaphore-wait outer-sema))
|
||||
(end-atomic)
|
||||
proceed?))])
|
||||
(cond [proceed?
|
||||
;; Acquired lock
|
||||
;; - lock-holder = me, and outer-lock is closed again
|
||||
(when (and require-connected? (not (connected?)))
|
||||
(break-enabled eb?)
|
||||
(unlock #f)
|
||||
(error/not-connected who))
|
||||
(with-handlers ([(lambda (e) #t)
|
||||
(lambda (e)
|
||||
(when (exn:break? e) (on-break-within-lock))
|
||||
(unlock #f)
|
||||
(raise e))])
|
||||
(break-enabled eb?)
|
||||
(begin0 (proc) (unlock #t)))]
|
||||
[else
|
||||
;; Didn't acquire lock; retry
|
||||
(break-enabled eb?)
|
||||
(call-with-lock* who proc hopeless require-connected?)]))]
|
||||
[(eq? result lock-holder)
|
||||
;; Thread holding lock is dead
|
||||
(if hopeless (hopeless) (error/hopeless who))]
|
||||
[(eq? me lock-holder)
|
||||
(error/internal who "attempted to recursively acquire lock")]
|
||||
[else
|
||||
;; lock-holder was stale; retry
|
||||
(call-with-lock* who proc hopeless require-connected?)])))
|
||||
|
||||
(define/private (unlock run-async-calls?)
|
||||
(let ([async-calls (reverse delayed-async-calls)])
|
||||
(set! delayed-async-calls null)
|
||||
(start-atomic)
|
||||
(set! lock-holder never-evt)
|
||||
(semaphore-post inner-lock)
|
||||
(semaphore-post outer-sema)
|
||||
(end-atomic)
|
||||
(when run-async-calls?
|
||||
(for-each call-with-continuation-barrier async-calls))))
|
||||
(define async-calls null) ;; mutated
|
||||
(begin0 (lock:call-with-lock lock
|
||||
(lambda ()
|
||||
(when (and require-connected? (not (connected?)))
|
||||
(error/not-connected who))
|
||||
(with-handlers ([exn:break?
|
||||
(lambda (e)
|
||||
(on-break-within-lock)
|
||||
(raise e))])
|
||||
(begin0 (proc)
|
||||
(set! async-calls delayed-async-calls)
|
||||
(set! delayed-async-calls null))))
|
||||
(or hopeless who))
|
||||
(for-each call-with-continuation-barrier (reverse async-calls))))
|
||||
|
||||
;; needs overriding
|
||||
;; LOCKING: must not block, must not acquire lock
|
||||
|
|
Loading…
Reference in New Issue
Block a user