From f502cf3b4e15f87d41b3bacf5b04ca2915fd284a Mon Sep 17 00:00:00 2001 From: Ryan Culpepper Date: Tue, 16 Feb 2021 01:15:06 +0100 Subject: [PATCH] db: simplify locking --- racket/collects/db/private/generic/common.rkt | 188 +++++++++++------- 1 file changed, 112 insertions(+), 76 deletions(-) diff --git a/racket/collects/db/private/generic/common.rkt b/racket/collects/db/private/generic/common.rkt index 8d69f33734..aab5c1cd0b 100644 --- a/racket/collects/db/private/generic/common.rkt +++ b/racket/collects/db/private/generic/common.rkt @@ -99,34 +99,107 @@ ;; ---------------------------------------- +(module lock racket/base + (require ffi/unsafe/atomic) + (provide lock? + make-lock + lock-acquire + lock-release + call-with-lock + (protect-out + lock-acquire/start-atomic)) + + ;; Goal: we would like to be able to detect if a thread has + ;; acquired the lock and then died, leaving the connection + ;; permanently locked. + + ;; A lock has two states, and state changes are atomic. + ;; - locked: sema = 0, owner = thread-dead-evt of owner + ;; - unlocked: sema = 1, owner = never-evt + + (struct lock (sema sema-peek [owner #:mutable])) + + (define (make-lock) + (define sema (make-semaphore 1)) + (lock sema (semaphore-peek-evt sema) never-evt)) + + ;; PRE: not in atomic mode + ;; Warning: If hopeless callback can return normally (especially if it returns + ;; #), then it is ambiguous whether lock was acquired. + (define (lock-acquire lk [hopeless #f] #:enable-break? [enable-break? #f]) + (lock-acquire/start-atomic lk hopeless end-atomic #:enable-break? enable-break?)) + + ;; PRE: not in atomic mode + ;; Warning: If hopeless callback can return normally (especially #), it + ;; is ambiguous whether lock was acquired (and whether still in atomic mode). + (define (lock-acquire/start-atomic lk [hopeless #f] [acquired #f] + #:enable-break? [enable-break? #f]) + (unless (lock? lk) (raise-argument-error 'lock-acquire "lock?" lk)) + (unless (or (eq? hopeless #f) (procedure? hopeless) (symbol? hopeless)) + (raise-argument-error 'lock-acquire "(or/c #f procedure? symbol?)" hopeless)) + (unless (or (eq? acquired #f) (procedure? acquired)) + (raise-argument-error 'lock-acquire "(or/c #f procedure?)" acquired)) + (define me (thread-dead-evt (current-thread))) + (define sema (lock-sema lk)) + (define sema-peek (lock-sema-peek lk)) + (let loop () + (define result + (cond [enable-break? (sync/enable-break sema-peek (lock-owner lk))] + [else (sync sema-peek (lock-owner lk))])) + (cond [(eq? result sema-peek) + ;; Got past outer stage + (start-atomic) + (cond [(eq? (lock-owner lk) never-evt) + ;; Currently unlocked => acquire + (set-lock-owner! lk me) + (semaphore-wait sema) + ;; Still in atomic mode! + (if acquired (acquired) (void))] + [else + ;; Other thread got here first => retry + (end-atomic) + (loop)])] + [(eq? result (lock-owner lk)) + ;; Thread holding lock is dead + (if (procedure? hopeless) + (hopeless) + (error (or hopeless 'lock-acquire) "the thread owning the lock is dead"))] + [(eq? result me) + ;; Attempt to recursively acquire lock + (error 'lock-acquire "attempted to recursively acquire lock")] + [else + ;; Owner was stale => retry + ;; This can happen if the thread holding the lock releases + ;; it and then immediately dies. + (loop)]))) + + ;; safe to call in atomic mode + (define (lock-release lk) + (unless (lock? lk) (raise-argument-error 'lock-release "lock?" lk)) + (start-atomic) + (set-lock-owner! lk never-evt) + (semaphore-post (lock-sema lk)) + (end-atomic)) + + ;; PRE: not in atomic mode + (define (call-with-lock lk proc [hopeless #f] + #:enable-break? [enable-break? #f]) + (lock-acquire/start-atomic lk hopeless #:enable-break? enable-break?) + (with-handlers ([(lambda (e) #t) + (lambda (e) (lock-release lk) (raise e))]) + (end-atomic) + (begin0 (proc) + (lock-release lk))))) + +(require (rename-in (submod "." lock) + [call-with-lock lock:call-with-lock])) + +;; ---------------------------------------- + (define locking% (class debugging% - ;; == Communication locking - - ;; Goal: we would like to be able to detect if a thread has - ;; acquired the lock and then died, leaving the connection - ;; permanently locked. - ;; - ;; lock-holder=(thread-dead-evt thd) iff thd has acquired inner-lock - ;; - lock-holder, inner-lock always modified together within - ;; atomic block - ;; - ;; Thus if (thread-dead-evt thd) is ready, thd died holding - ;; inner-lock, so hopelessly locked. - ;; - ;; outer-sema = inner-lock - ;; - outer-sema, inner-lock always modified together within atomic - ;; - ;; The outer-lock just prevents threads from spinning polling - ;; inner-lock. If a thread gets past outer-lock and dies before - ;; acquiring inner-lock, ok, because outer-lock still open at that - ;; point, so other threads can enter outer-lock and acquire inner-lock. - - (define outer-sema (make-semaphore 1)) - (define outer-lock (semaphore-peek-evt outer-sema)) - (define inner-lock (make-semaphore 1)) - (define lock-holder never-evt) + (define lock (make-lock)) ;; Delay async calls (eg, notice handler) until unlock (define delayed-async-calls null) @@ -139,57 +212,20 @@ ;; LOCKING: requires unlocked (define/public-final (call-with-lock* who proc hopeless require-connected?) - (let ([me (thread-dead-evt (current-thread))] - [eb? (break-enabled)] - [result (sync outer-lock lock-holder)]) - (cond [(eq? result outer-lock) - ;; Got past outer stage - (break-enabled #f) - (let ([proceed? - (begin (start-atomic) - (let ([proceed? (semaphore-try-wait? inner-lock)]) - (when proceed? - (set! lock-holder me) - (semaphore-wait outer-sema)) - (end-atomic) - proceed?))]) - (cond [proceed? - ;; Acquired lock - ;; - lock-holder = me, and outer-lock is closed again - (when (and require-connected? (not (connected?))) - (break-enabled eb?) - (unlock #f) - (error/not-connected who)) - (with-handlers ([(lambda (e) #t) - (lambda (e) - (when (exn:break? e) (on-break-within-lock)) - (unlock #f) - (raise e))]) - (break-enabled eb?) - (begin0 (proc) (unlock #t)))] - [else - ;; Didn't acquire lock; retry - (break-enabled eb?) - (call-with-lock* who proc hopeless require-connected?)]))] - [(eq? result lock-holder) - ;; Thread holding lock is dead - (if hopeless (hopeless) (error/hopeless who))] - [(eq? me lock-holder) - (error/internal who "attempted to recursively acquire lock")] - [else - ;; lock-holder was stale; retry - (call-with-lock* who proc hopeless require-connected?)]))) - - (define/private (unlock run-async-calls?) - (let ([async-calls (reverse delayed-async-calls)]) - (set! delayed-async-calls null) - (start-atomic) - (set! lock-holder never-evt) - (semaphore-post inner-lock) - (semaphore-post outer-sema) - (end-atomic) - (when run-async-calls? - (for-each call-with-continuation-barrier async-calls)))) + (define async-calls null) ;; mutated + (begin0 (lock:call-with-lock lock + (lambda () + (when (and require-connected? (not (connected?))) + (error/not-connected who)) + (with-handlers ([exn:break? + (lambda (e) + (on-break-within-lock) + (raise e))]) + (begin0 (proc) + (set! async-calls delayed-async-calls) + (set! delayed-async-calls null)))) + (or hopeless who)) + (for-each call-with-continuation-barrier (reverse async-calls)))) ;; needs overriding ;; LOCKING: must not block, must not acquire lock