From 4e76091c6498ff470c0990eb6bfbaa6925424525 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Fri, 29 Jan 2021 12:53:20 -0700 Subject: [PATCH] add ffi/unsafe/os-async-channel --- .../scribblings/foreign/atomic.scrbl | 5 ++ .../scribblings/foreign/os-thread.scrbl | 61 +++++++++++++- .../tests/racket/os-async-channel.rkt | 41 ++++++++++ .../collects/ffi/unsafe/os-async-channel.rkt | 79 +++++++++++++++++++ racket/src/cs/thread.sls | 4 +- racket/src/thread/atomic.rkt | 5 ++ 6 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 pkgs/racket-test/tests/racket/os-async-channel.rkt create mode 100644 racket/collects/ffi/unsafe/os-async-channel.rkt diff --git a/pkgs/racket-doc/scribblings/foreign/atomic.scrbl b/pkgs/racket-doc/scribblings/foreign/atomic.scrbl index 23016f9e72..1336f87142 100644 --- a/pkgs/racket-doc/scribblings/foreign/atomic.scrbl +++ b/pkgs/racket-doc/scribblings/foreign/atomic.scrbl @@ -59,6 +59,11 @@ conversion handler with @racket[call-as-nonatomic]. The latter is safe for a particular atomic region, however, only if the region can be safely interrupted by a non-atomic exception construction. +Unlike @racket[call-as-atomic], @racket[start-atomic] and +@racket[end-atomic] can be called from any OS thread as supported by +@racketmodname[ffi/unsafe/os-thread], although the calls have no +effect in that case. + See also the caveat that @elemref["atomic-unsafe"]{atomic mode is unsafe}.} diff --git a/pkgs/racket-doc/scribblings/foreign/os-thread.scrbl b/pkgs/racket-doc/scribblings/foreign/os-thread.scrbl index 7ced9b72b2..177c7e358b 100644 --- a/pkgs/racket-doc/scribblings/foreign/os-thread.scrbl +++ b/pkgs/racket-doc/scribblings/foreign/os-thread.scrbl @@ -1,6 +1,7 @@ #lang scribble/doc @(require "utils.rkt" - (for-label ffi/unsafe/os-thread)) + (for-label ffi/unsafe/os-thread + ffi/unsafe/os-async-channel)) @title{Operating System Threads} @@ -61,3 +62,61 @@ by @racket[make-os-semaphore]. Waiting blocks the current thread; if the current thread is a Racket thread, then waiting also blocks all Racket threads.} +@; ---------------------------------------- + +@section{Operating System Asynchronous Channels} + +@defmodule[ffi/unsafe/os-async-channel]{The +@racketmodname[ffi/unsafe/os-async-channel] library provides an +asynchronous channels that work with operating-system threads, where +normal racket channels or place channels are not allowed. These +channels are typically used in combination with +@racketmodname[ffi/unsafe/os-thread].} + +An asynchronous operating-system channel is a @tech[#:doc +reference.scrbl]{synchronizable event}, so can it can be used with +@racket[sync] to receive a value in a Racket thread. Other threads +must use @racket[os-async-channel-try-get] or +@racket[os-async-channel-get]. + +When a thread is blocked on an otherwise inaccessible asynchronous +channel that was produced by @racket[make-os-async-channel], the +thread is @emph{not} available for garbage collection. That's +different from a thread is blocked on a regular Racket channel or a +place channel. + +@history[#:added "8.0.0.4"] + +@defproc[(make-os-async-channel) os-async-channel?]{ + +Creates a new, empty asynchronous channel for use with +operating-system threads.} + + +@defproc[(os-async-channel? [v any/c]) boolean?]{ + +Returns @racket[#t] if @racket[v] is an asynchronous channel produced +by @racket[make-os-async-channel], @racket[#f] otherwise.} + +@defproc[(os-async-channel-put [ch os-async-channel?] [v any/c]) void?]{ + +Enqueues @racket[v] in the asynchronous channel @racket[ch]. This +function can be called from a Racket thread or any operating-system +thread.} + +@defproc[(os-async-channel-try-get [ch os-async-channel?] [default-v any/c #f]) any/c]{ + +Dequeues a value from the the asynchronous channel @racket[ch] and +returns it, if a value is available. If no value is immediately +available in the channel, @racket[default-v] is returned. This +function can be called from a Racket thread or any operating-system +thread.} + +@defproc[(os-async-channel-get [ch os-async-channel?]) any/c]{ + +Dequeues a value from the the asynchronous channel @racket[ch] and +returns it, blocking until a value is available. This function can be +called from any non-Racket operating-system thread. This function +should @emph{not} be called from a Racket thread, since it blocks in a +way that will block all Racket threads within a place; in a Racket +thread, use @racket[sync], instead. } diff --git a/pkgs/racket-test/tests/racket/os-async-channel.rkt b/pkgs/racket-test/tests/racket/os-async-channel.rkt new file mode 100644 index 0000000000..5d91f676d7 --- /dev/null +++ b/pkgs/racket-test/tests/racket/os-async-channel.rkt @@ -0,0 +1,41 @@ +#lang racket/base +(require ffi/unsafe/os-thread + ffi/unsafe/os-async-channel) + +(when (os-thread-enabled?) + (for ([i 100]) + (printf "Try ~s\n" i) + (define ch (make-os-async-channel)) + (define N 4) + (define M 100) + ;; one 0 per thread: + (for ([j N]) + (os-async-channel-put ch 0)) + ;; each thread increments M times: + (for ([j N]) + (call-in-os-thread + (lambda () + (for ([k M]) + (define val + (cond + [(even? k) + ;; spins + (let loop () + (or (os-async-channel-try-get ch) + (loop)))] + [else + ;; blocks + (os-async-channel-get ch)])) + (os-async-channel-put ch (add1 val)))))) + ;; main thread loop; consume numbers from the queue + ;; until they add up to the number of threads time M; + ;; put a 0 back each time we're not done, yet + (let loop ([counter (* N M)] [steps 1]) + (define new-counter (- counter (sync ch))) + (cond + [(zero? new-counter) + (printf "Done in ~a gets\n" steps)] + [else + (os-async-channel-put ch 0) + (loop new-counter (add1 steps))])))) + diff --git a/racket/collects/ffi/unsafe/os-async-channel.rkt b/racket/collects/ffi/unsafe/os-async-channel.rkt new file mode 100644 index 0000000000..3208e730c7 --- /dev/null +++ b/racket/collects/ffi/unsafe/os-async-channel.rkt @@ -0,0 +1,79 @@ +#lang racket/base +(require "os-thread.rkt" + "schedule.rkt" + "atomic.rkt") + +(provide os-async-channel? + make-os-async-channel + os-async-channel-put + os-async-channel-try-get + os-async-channel-get) + +(struct os-async-channel (head tail lock ready signal) + #:mutable + #:property prop:evt (unsafe-poller + (lambda (self wakeups) + (define vals (try-get self (not wakeups))) + (if vals + (values vals #f) + (values #f self))))) + +;; In atomic mode, if in a Racket thread +(define (try-get ac dequeue?) + (os-semaphore-wait (os-async-channel-lock ac)) + (when (and (null? (os-async-channel-head ac)) + (pair? (os-async-channel-tail ac))) + (set-os-async-channel-head! ac (reverse (os-async-channel-tail ac))) + (set-os-async-channel-tail! ac null)) + (define head (os-async-channel-head ac)) + (when (and dequeue? (pair? head)) + (set-os-async-channel-head! ac (cdr head))) + (os-semaphore-post (os-async-channel-lock ac)) + (if (pair? head) + (list (car head)) + #f)) + +(define (make-os-async-channel) + (unless os-thread-enabled? + (raise (exn:fail:unsupported "make-os-async-channel: OS threads are not supported" + (current-continuation-marks)))) + (define sema (make-os-semaphore)) + (os-semaphore-post sema) + (os-async-channel null null sema (make-os-semaphore) (unsafe-make-signal-received))) + +;; Works in any thread: +(define (os-async-channel-put ac v) + (unless (os-async-channel? ac) + (raise-argument-error 'os-async-channel-put "os-async-channel?" ac)) + (start-atomic) ; allowed even in non-Racket threads + (os-semaphore-wait (os-async-channel-lock ac)) + (set-os-async-channel-tail! ac (cons v (os-async-channel-tail ac))) + (os-semaphore-post (os-async-channel-lock ac)) + ;; in case a non-Racket thread is waiting: + (os-semaphore-post (os-async-channel-ready ac)) + ;; in case scheduler is waiting in Racket thread: + ((os-async-channel-signal ac)) + (end-atomic)) ; allowed even in non-Racket threads + +;; Works in any thread: +(define (os-async-channel-try-get ac [default #f]) + (unless (os-async-channel? ac) + (raise-argument-error 'os-async-channel-try-get "os-async-channel?" ac)) + (define vals (try-get ac #t)) + (if vals + (car vals) + default)) + +;; Works only in a non-Racket thread: +(define (os-async-channel-get ac) + (unless (os-async-channel? ac) + ;; This is just a fancy way to crash, since raising exceptions in + ;; a non-Racket thread will not work: + (raise-argument-error 'os-async-channel-get "os-async-channel?" ac)) + (let loop () + (define vals (try-get ac #t)) + (cond + [vals (car vals)] + [else + (os-semaphore-wait (os-async-channel-ready ac)) + (loop)]))) diff --git a/racket/src/cs/thread.sls b/racket/src/cs/thread.sls index aa0b013f94..1dd23ce7f9 100644 --- a/racket/src/cs/thread.sls +++ b/racket/src/cs/thread.sls @@ -45,7 +45,9 @@ ;; Special handling of `current-atomic` to use the last virtual register, and ;; similarr for other. We rely on the fact that the register's default value is 0 ;; or the rumble layer installs a suitable default. Also, force inline a few - ;; functions and handle other special cases. + ;; functions and handle other special cases. Note that the implementation of + ;; `start-atomic` and `end-atomic` rely on some specific parameters being thread + ;; registers so that the functions can be safely called from any Scheme thread. (define-syntax (define stx) (let ([define-as-virtual-register (lambda (stx n) diff --git a/racket/src/thread/atomic.rkt b/racket/src/thread/atomic.rkt index 4df745c7e7..5212fd964d 100644 --- a/racket/src/thread/atomic.rkt +++ b/racket/src/thread/atomic.rkt @@ -57,11 +57,16 @@ ;; inlined in Chez Scheme embedding: (define (start-atomic) + ;; Althogh it's adjusting atomicity for the thread scheduler, + ;; this function is documented as working in any Scheme thread. + ;; The current implementation relies on parameters like + ;; `future-barrier` and `current-atomic` being virtual registers (future-barrier) (current-atomic (fx+ (current-atomic) 1))) ;; inlined in Chez Scheme embedding: (define (end-atomic) + ;; See `start-atomic` note on calls from any Scheme thread (define n (fx- (current-atomic) 1)) (cond [(fx= n 0)