cs: implement #:in-original-place? #t
for foreign calls
The `math` library relies on this working right, since MPFR is normally not compiled as thread-safe. Also, fix some locking/interrupt/atomicity problems with async callbacks generally.
This commit is contained in:
parent
f812fd9e67
commit
8ad13a472f
|
@ -2,6 +2,7 @@
|
|||
(require ffi/unsafe)
|
||||
|
||||
(module+ test
|
||||
(self)
|
||||
(main))
|
||||
|
||||
;; Make sure that `#:in-original-place?' doesn't lead to deadlock:
|
||||
|
@ -18,7 +19,7 @@
|
|||
(define p
|
||||
(place pch
|
||||
(define j (place-channel-get pch))
|
||||
;; Start a thread that keep having to wait on the original place:
|
||||
;; Start a thread that keeps having to wait on the original place:
|
||||
(thread
|
||||
(lambda ()
|
||||
(let loop ()
|
||||
|
@ -39,3 +40,17 @@
|
|||
(for ([i 5])
|
||||
(printf "iter ~a\n" i)
|
||||
(x-main)))
|
||||
|
||||
(define pthread_self
|
||||
(get-ffi-obj 'pthread_self #f (_fun #:in-original-place? #t -> _pointer)
|
||||
(lambda () #f)))
|
||||
|
||||
(define (self)
|
||||
(when pthread_self
|
||||
(define here (cast (pthread_self) _pointer _intptr))
|
||||
(define pl
|
||||
(place pch
|
||||
(place-channel-put pch (cast (pthread_self) _pointer _intptr))))
|
||||
(define from-there (place-channel-get pl))
|
||||
(unless (equal? here from-there)
|
||||
(error "didn't run in main place"))))
|
||||
|
|
|
@ -803,6 +803,7 @@
|
|||
(set-base-exception-handler!)
|
||||
(init-place-locals!)
|
||||
(register-as-place-main!)
|
||||
(remember-original-place!)
|
||||
(set-collect-handler!)
|
||||
(set-primitive-applicables!)
|
||||
(set-continuation-applicables!)
|
||||
|
|
|
@ -5,20 +5,21 @@
|
|||
(place-async-callback-queue))
|
||||
|
||||
(define (async-callback-place-init!)
|
||||
(place-async-callback-queue (make-async-callback-queue (make-mutex)
|
||||
(place-async-callback-queue (make-async-callback-queue (make-mutex) ; ordered *before* `interrupts-disable`-as-lock
|
||||
(make-condition)
|
||||
'()
|
||||
(make-async-callback-poll-wakeup))))
|
||||
|
||||
(define (call-as-asynchronous-callback thunk)
|
||||
(with-interrupts-disabled*
|
||||
(async-callback-queue-call (current-async-callback-queue) thunk #t)))
|
||||
(async-callback-queue-call (current-async-callback-queue) thunk #f #t))
|
||||
|
||||
(define (async-callback-queue-call async-callback-queue thunk need-interrupts?)
|
||||
(define (async-callback-queue-call async-callback-queue thunk interrupts-disabled? need-atomic?)
|
||||
(let* ([result-done? (box #f)]
|
||||
[result #f]
|
||||
[q async-callback-queue]
|
||||
[m (async-callback-queue-lock q)])
|
||||
(when interrupts-disabled? (enable-interrupts)) ; interrupt "lock" ordered after mutex
|
||||
(when need-atomic? (scheduler-start-atomic)) ; don't abandon engine after mutex is acquired
|
||||
(mutex-acquire m)
|
||||
(set-async-callback-queue-in! q (cons (lambda ()
|
||||
(set! result (thunk))
|
||||
|
@ -30,21 +31,22 @@
|
|||
((async-callback-queue-wakeup q))
|
||||
(let loop ()
|
||||
(unless (unbox result-done?)
|
||||
(when need-interrupts?
|
||||
;; Enable interrupts so that the thread is deactivated
|
||||
;; when we wait on the condition
|
||||
(enable-interrupts))
|
||||
;; Interrupts must be enabled so that the thread is deactivated
|
||||
;; when we wait on the condition
|
||||
(condition-wait (async-callback-queue-condition q) m)
|
||||
(when need-interrupts? (disable-interrupts))
|
||||
(loop)))
|
||||
(mutex-release m)
|
||||
(when need-atomic? (scheduler-end-atomic))
|
||||
(when interrupts-disabled? (enable-interrupts))
|
||||
result))
|
||||
|
||||
(define make-async-callback-poll-wakeup (lambda () void))
|
||||
(define (set-make-async-callback-poll-wakeup! make-wakeup)
|
||||
(set! make-async-callback-poll-wakeup make-wakeup))
|
||||
(set! make-async-callback-poll-wakeup make-wakeup)
|
||||
(set-async-callback-queue-wakeup! (current-async-callback-queue) (make-wakeup)))
|
||||
|
||||
;; Returns callbacks to run in atomic mode
|
||||
;; Returns callbacks to run in atomic mode. Interrupts must not be disabled
|
||||
;; when ths function is called.
|
||||
(define (poll-async-callbacks)
|
||||
(let ([q (current-async-callback-queue)])
|
||||
(mutex-acquire (async-callback-queue-lock q))
|
||||
|
|
|
@ -1510,7 +1510,7 @@
|
|||
in-types)
|
||||
(check who ctype? out-type)
|
||||
(check who string? :or-false lock-name)
|
||||
((ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? #f #f) p)]))
|
||||
((ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? orig-place? #f #f) p)]))
|
||||
|
||||
(define/who ffi-call-maker
|
||||
(case-lambda
|
||||
|
@ -1532,7 +1532,7 @@
|
|||
in-types)
|
||||
(check who ctype? out-type)
|
||||
(check who string? :or-false lock-name)
|
||||
(ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? #f #f)]))
|
||||
(ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? orig-place? #f #f)]))
|
||||
|
||||
;; For sanity checking of callbacks during a blocking callout:
|
||||
(define-virtual-register currently-blocking? #f)
|
||||
|
@ -1547,7 +1547,7 @@
|
|||
|
||||
(define call-locks (make-eq-hashtable))
|
||||
|
||||
(define (ffi-call/callable call? in-types out-type abi save-errno lock-name blocking? atomic? async-apply)
|
||||
(define (ffi-call/callable call? in-types out-type abi save-errno lock-name blocking? orig-place? atomic? async-apply)
|
||||
(let* ([conv (case abi
|
||||
[(stdcall) '__stdcall]
|
||||
[(sysv) '__cdecl]
|
||||
|
@ -1643,6 +1643,7 @@
|
|||
(cond
|
||||
[(and (not ret-id)
|
||||
(not blocking?)
|
||||
(not orig-place?)
|
||||
(not save-errno)
|
||||
(not lock)
|
||||
(#%andmap (lambda (in-type)
|
||||
|
@ -1743,35 +1744,40 @@
|
|||
[r (let ([ret-ptr (and ret-id
|
||||
;; result is a struct type; need to allocate space for it
|
||||
(normalized-malloc ret-size ret-malloc-mode))])
|
||||
(when lock (mutex-acquire lock))
|
||||
(with-interrupts-disabled*
|
||||
(when blocking? (currently-blocking? #t))
|
||||
(retain
|
||||
orig-args
|
||||
(let ([r (#%apply (gen-proc (cpointer-address proc-p))
|
||||
(append
|
||||
(if ret-ptr
|
||||
(list (ret-maker (cpointer-address ret-ptr)))
|
||||
'())
|
||||
(map (lambda (arg in-type maker)
|
||||
(let ([host-rep (array-rep-to-pointer-rep
|
||||
(ctype-host-rep in-type))])
|
||||
(case host-rep
|
||||
[(void*) (cpointer-address arg)]
|
||||
[(struct union)
|
||||
(maker (cpointer-address arg))]
|
||||
[else arg])))
|
||||
args in-types arg-makers)))])
|
||||
(when lock (mutex-release lock))
|
||||
(when blocking? (currently-blocking? #f))
|
||||
(case save-errno
|
||||
[(posix) (thread-cell-set! errno-cell (get-errno))]
|
||||
[(windows) (thread-cell-set! errno-cell (get-last-error))])
|
||||
(cond
|
||||
[ret-ptr ret-ptr]
|
||||
[(eq? (ctype-our-rep out-type) 'gcpointer)
|
||||
(addr->gcpointer-memory r)]
|
||||
[else r])))))])
|
||||
(let ([go (lambda ()
|
||||
(when lock (mutex-acquire lock))
|
||||
(with-interrupts-disabled*
|
||||
(when blocking? (currently-blocking? #t))
|
||||
(retain
|
||||
orig-args
|
||||
(let ([r (#%apply (gen-proc (cpointer-address proc-p))
|
||||
(append
|
||||
(if ret-ptr
|
||||
(list (ret-maker (cpointer-address ret-ptr)))
|
||||
'())
|
||||
(map (lambda (arg in-type maker)
|
||||
(let ([host-rep (array-rep-to-pointer-rep
|
||||
(ctype-host-rep in-type))])
|
||||
(case host-rep
|
||||
[(void*) (cpointer-address arg)]
|
||||
[(struct union)
|
||||
(maker (cpointer-address arg))]
|
||||
[else arg])))
|
||||
args in-types arg-makers)))])
|
||||
(when lock (mutex-release lock))
|
||||
(when blocking? (currently-blocking? #f))
|
||||
(case save-errno
|
||||
[(posix) (thread-cell-set! errno-cell (get-errno))]
|
||||
[(windows) (thread-cell-set! errno-cell (get-last-error))])
|
||||
(cond
|
||||
[ret-ptr ret-ptr]
|
||||
[(eq? (ctype-our-rep out-type) 'gcpointer)
|
||||
(addr->gcpointer-memory r)]
|
||||
[else r])))))])
|
||||
(if (and orig-place?
|
||||
(not (eqv? 0 (get-thread-id))))
|
||||
(async-callback-queue-call orig-place-async-callback-queue (lambda () (go)) #f #t)
|
||||
(go))))])
|
||||
(c->s out-type r)))
|
||||
(fxsll 1 (length in-types))
|
||||
(cpointer->name proc-p))))])]
|
||||
|
@ -1850,6 +1856,10 @@
|
|||
(place-thread-category PLACE-MAIN-THREAD)
|
||||
(async-callback-place-init!))
|
||||
|
||||
(define orig-place-async-callback-queue #f)
|
||||
(define (remember-original-place!)
|
||||
(set! orig-place-async-callback-queue (current-async-callback-queue)))
|
||||
|
||||
;; Can be called in any Scheme thread
|
||||
(define (call-as-atomic-callback thunk atomic? async-apply async-callback-queue)
|
||||
(cond
|
||||
|
@ -1876,12 +1886,16 @@
|
|||
[else
|
||||
;; Not in a place's main thread; queue an async callback
|
||||
;; and wait for the response
|
||||
(async-callback-queue-call async-callback-queue
|
||||
(lambda () (|#%app| async-apply thunk))
|
||||
;; If we created this thread by `fork-pthread`, we must
|
||||
;; have gotten here by a foreign call, so interrupts are
|
||||
;; currently disabled
|
||||
(eqv? (place-thread-category) PLACE-KNOWN-THREAD))]))
|
||||
(let ([known-thread? (eqv? (place-thread-category) PLACE-KNOWN-THREAD)])
|
||||
(async-callback-queue-call async-callback-queue
|
||||
(lambda () (|#%app| async-apply thunk))
|
||||
;; If we created this thread by `fork-pthread`, we must
|
||||
;; have gotten here by a foreign call, so interrupts are
|
||||
;; currently disabled
|
||||
known-thread?
|
||||
;; In a thread created by `fork-pthread`, we'll have to tell
|
||||
;; the scheduler to be in atomic mode:
|
||||
known-thread?))]))
|
||||
|
||||
(define scheduler-start-atomic void)
|
||||
(define scheduler-end-atomic void)
|
||||
|
@ -1927,7 +1941,7 @@
|
|||
:contract "(listof ctype?)"
|
||||
in-types)
|
||||
(check who ctype? out-type)
|
||||
(let ([make-code (ffi-call/callable #f in-types out-type abi #f #f #f (and atomic? #t) async-apply)])
|
||||
(let ([make-code (ffi-call/callable #f in-types out-type abi #f #f #f #f (and atomic? #t) async-apply)])
|
||||
(lambda (proc)
|
||||
(check 'make-ffi-callback procedure? proc)
|
||||
(let* ([code (make-code proc)]
|
||||
|
|
Loading…
Reference in New Issue
Block a user