diff --git a/pkgs/racket-test/tests/racket/ffi-orig-place.rkt b/pkgs/racket-test/tests/racket/ffi-orig-place.rkt index 3df68bd295..fcf07d8ac9 100644 --- a/pkgs/racket-test/tests/racket/ffi-orig-place.rkt +++ b/pkgs/racket-test/tests/racket/ffi-orig-place.rkt @@ -2,6 +2,7 @@ (require ffi/unsafe) (module+ test + (self) (main)) ;; Make sure that `#:in-original-place?' doesn't lead to deadlock: @@ -18,7 +19,7 @@ (define p (place pch (define j (place-channel-get pch)) - ;; Start a thread that keep having to wait on the original place: + ;; Start a thread that keeps having to wait on the original place: (thread (lambda () (let loop () @@ -39,3 +40,17 @@ (for ([i 5]) (printf "iter ~a\n" i) (x-main))) + +(define pthread_self + (get-ffi-obj 'pthread_self #f (_fun #:in-original-place? #t -> _pointer) + (lambda () #f))) + +(define (self) + (when pthread_self + (define here (cast (pthread_self) _pointer _intptr)) + (define pl + (place pch + (place-channel-put pch (cast (pthread_self) _pointer _intptr)))) + (define from-there (place-channel-get pl)) + (unless (equal? here from-there) + (error "didn't run in main place")))) diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index 7c998e0ef0..d2020a47e6 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -803,6 +803,7 @@ (set-base-exception-handler!) (init-place-locals!) (register-as-place-main!) + (remember-original-place!) (set-collect-handler!) (set-primitive-applicables!) (set-continuation-applicables!) diff --git a/racket/src/cs/rumble/async-callback.ss b/racket/src/cs/rumble/async-callback.ss index 7bd343ecc8..0b0b0a5aa2 100644 --- a/racket/src/cs/rumble/async-callback.ss +++ b/racket/src/cs/rumble/async-callback.ss @@ -5,20 +5,21 @@ (place-async-callback-queue)) (define (async-callback-place-init!) - (place-async-callback-queue (make-async-callback-queue (make-mutex) + (place-async-callback-queue (make-async-callback-queue (make-mutex) ; ordered *before* `interrupts-disable`-as-lock (make-condition) '() (make-async-callback-poll-wakeup)))) (define (call-as-asynchronous-callback thunk) - (with-interrupts-disabled* - (async-callback-queue-call (current-async-callback-queue) thunk #t))) + (async-callback-queue-call (current-async-callback-queue) thunk #f #t)) -(define (async-callback-queue-call async-callback-queue thunk need-interrupts?) +(define (async-callback-queue-call async-callback-queue thunk interrupts-disabled? need-atomic?) (let* ([result-done? (box #f)] [result #f] [q async-callback-queue] [m (async-callback-queue-lock q)]) + (when interrupts-disabled? (enable-interrupts)) ; interrupt "lock" ordered after mutex + (when need-atomic? (scheduler-start-atomic)) ; don't abandon engine after mutex is acquired (mutex-acquire m) (set-async-callback-queue-in! q (cons (lambda () (set! result (thunk)) @@ -30,21 +31,22 @@ ((async-callback-queue-wakeup q)) (let loop () (unless (unbox result-done?) - (when need-interrupts? - ;; Enable interrupts so that the thread is deactivated - ;; when we wait on the condition - (enable-interrupts)) + ;; Interrupts must be enabled so that the thread is deactivated + ;; when we wait on the condition (condition-wait (async-callback-queue-condition q) m) - (when need-interrupts? (disable-interrupts)) (loop))) (mutex-release m) + (when need-atomic? (scheduler-end-atomic)) + (when interrupts-disabled? (enable-interrupts)) result)) (define make-async-callback-poll-wakeup (lambda () void)) (define (set-make-async-callback-poll-wakeup! make-wakeup) - (set! make-async-callback-poll-wakeup make-wakeup)) + (set! make-async-callback-poll-wakeup make-wakeup) + (set-async-callback-queue-wakeup! (current-async-callback-queue) (make-wakeup))) -;; Returns callbacks to run in atomic mode +;; Returns callbacks to run in atomic mode. Interrupts must not be disabled +;; when ths function is called. (define (poll-async-callbacks) (let ([q (current-async-callback-queue)]) (mutex-acquire (async-callback-queue-lock q)) diff --git a/racket/src/cs/rumble/foreign.ss b/racket/src/cs/rumble/foreign.ss index 49176c7022..54e8a0d4b9 100644 --- a/racket/src/cs/rumble/foreign.ss +++ b/racket/src/cs/rumble/foreign.ss @@ -1510,7 +1510,7 @@ in-types) (check who ctype? out-type) (check who string? :or-false lock-name) - ((ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? #f #f) p)])) + ((ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? orig-place? #f #f) p)])) (define/who ffi-call-maker (case-lambda @@ -1532,7 +1532,7 @@ in-types) (check who ctype? out-type) (check who string? :or-false lock-name) - (ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? #f #f)])) + (ffi-call/callable #t in-types out-type abi save-errno lock-name blocking? orig-place? #f #f)])) ;; For sanity checking of callbacks during a blocking callout: (define-virtual-register currently-blocking? #f) @@ -1547,7 +1547,7 @@ (define call-locks (make-eq-hashtable)) -(define (ffi-call/callable call? in-types out-type abi save-errno lock-name blocking? atomic? async-apply) +(define (ffi-call/callable call? in-types out-type abi save-errno lock-name blocking? orig-place? atomic? async-apply) (let* ([conv (case abi [(stdcall) '__stdcall] [(sysv) '__cdecl] @@ -1643,6 +1643,7 @@ (cond [(and (not ret-id) (not blocking?) + (not orig-place?) (not save-errno) (not lock) (#%andmap (lambda (in-type) @@ -1743,35 +1744,40 @@ [r (let ([ret-ptr (and ret-id ;; result is a struct type; need to allocate space for it (normalized-malloc ret-size ret-malloc-mode))]) - (when lock (mutex-acquire lock)) - (with-interrupts-disabled* - (when blocking? (currently-blocking? #t)) - (retain - orig-args - (let ([r (#%apply (gen-proc (cpointer-address proc-p)) - (append - (if ret-ptr - (list (ret-maker (cpointer-address ret-ptr))) - '()) - (map (lambda (arg in-type maker) - (let ([host-rep (array-rep-to-pointer-rep - (ctype-host-rep in-type))]) - (case host-rep - [(void*) (cpointer-address arg)] - [(struct union) - (maker (cpointer-address arg))] - [else arg]))) - args in-types arg-makers)))]) - (when lock (mutex-release lock)) - (when blocking? (currently-blocking? #f)) - (case save-errno - [(posix) (thread-cell-set! errno-cell (get-errno))] - [(windows) (thread-cell-set! errno-cell (get-last-error))]) - (cond - [ret-ptr ret-ptr] - [(eq? (ctype-our-rep out-type) 'gcpointer) - (addr->gcpointer-memory r)] - [else r])))))]) + (let ([go (lambda () + (when lock (mutex-acquire lock)) + (with-interrupts-disabled* + (when blocking? (currently-blocking? #t)) + (retain + orig-args + (let ([r (#%apply (gen-proc (cpointer-address proc-p)) + (append + (if ret-ptr + (list (ret-maker (cpointer-address ret-ptr))) + '()) + (map (lambda (arg in-type maker) + (let ([host-rep (array-rep-to-pointer-rep + (ctype-host-rep in-type))]) + (case host-rep + [(void*) (cpointer-address arg)] + [(struct union) + (maker (cpointer-address arg))] + [else arg]))) + args in-types arg-makers)))]) + (when lock (mutex-release lock)) + (when blocking? (currently-blocking? #f)) + (case save-errno + [(posix) (thread-cell-set! errno-cell (get-errno))] + [(windows) (thread-cell-set! errno-cell (get-last-error))]) + (cond + [ret-ptr ret-ptr] + [(eq? (ctype-our-rep out-type) 'gcpointer) + (addr->gcpointer-memory r)] + [else r])))))]) + (if (and orig-place? + (not (eqv? 0 (get-thread-id)))) + (async-callback-queue-call orig-place-async-callback-queue (lambda () (go)) #f #t) + (go))))]) (c->s out-type r))) (fxsll 1 (length in-types)) (cpointer->name proc-p))))])] @@ -1850,6 +1856,10 @@ (place-thread-category PLACE-MAIN-THREAD) (async-callback-place-init!)) +(define orig-place-async-callback-queue #f) +(define (remember-original-place!) + (set! orig-place-async-callback-queue (current-async-callback-queue))) + ;; Can be called in any Scheme thread (define (call-as-atomic-callback thunk atomic? async-apply async-callback-queue) (cond @@ -1876,12 +1886,16 @@ [else ;; Not in a place's main thread; queue an async callback ;; and wait for the response - (async-callback-queue-call async-callback-queue - (lambda () (|#%app| async-apply thunk)) - ;; If we created this thread by `fork-pthread`, we must - ;; have gotten here by a foreign call, so interrupts are - ;; currently disabled - (eqv? (place-thread-category) PLACE-KNOWN-THREAD))])) + (let ([known-thread? (eqv? (place-thread-category) PLACE-KNOWN-THREAD)]) + (async-callback-queue-call async-callback-queue + (lambda () (|#%app| async-apply thunk)) + ;; If we created this thread by `fork-pthread`, we must + ;; have gotten here by a foreign call, so interrupts are + ;; currently disabled + known-thread? + ;; In a thread created by `fork-pthread`, we'll have to tell + ;; the scheduler to be in atomic mode: + known-thread?))])) (define scheduler-start-atomic void) (define scheduler-end-atomic void) @@ -1927,7 +1941,7 @@ :contract "(listof ctype?)" in-types) (check who ctype? out-type) - (let ([make-code (ffi-call/callable #f in-types out-type abi #f #f #f (and atomic? #t) async-apply)]) + (let ([make-code (ffi-call/callable #f in-types out-type abi #f #f #f #f (and atomic? #t) async-apply)]) (lambda (proc) (check 'make-ffi-callback procedure? proc) (let* ([code (make-code proc)]