add ffi/unsafe/os-async-channel

This commit is contained in:
Matthew Flatt 2021-01-29 12:53:20 -07:00
parent 24f539087f
commit 4e76091c64
6 changed files with 193 additions and 2 deletions

View File

@ -59,6 +59,11 @@ conversion handler with @racket[call-as-nonatomic]. The latter is safe
for a particular atomic region, however, only if the region can be
safely interrupted by a non-atomic exception construction.
Unlike @racket[call-as-atomic], @racket[start-atomic] and
@racket[end-atomic] can be called from any OS thread as supported by
@racketmodname[ffi/unsafe/os-thread], although the calls have no
effect in that case.
See also the caveat that @elemref["atomic-unsafe"]{atomic mode is unsafe}.}

View File

@ -1,6 +1,7 @@
#lang scribble/doc
@(require "utils.rkt"
(for-label ffi/unsafe/os-thread))
(for-label ffi/unsafe/os-thread
ffi/unsafe/os-async-channel))
@title{Operating System Threads}
@ -61,3 +62,61 @@ by @racket[make-os-semaphore]. Waiting blocks the current thread; if
the current thread is a Racket thread, then waiting also blocks all
Racket threads.}
@; ----------------------------------------
@section{Operating System Asynchronous Channels}
@defmodule[ffi/unsafe/os-async-channel]{The
@racketmodname[ffi/unsafe/os-async-channel] library provides an
asynchronous channels that work with operating-system threads, where
normal racket channels or place channels are not allowed. These
channels are typically used in combination with
@racketmodname[ffi/unsafe/os-thread].}
An asynchronous operating-system channel is a @tech[#:doc
reference.scrbl]{synchronizable event}, so can it can be used with
@racket[sync] to receive a value in a Racket thread. Other threads
must use @racket[os-async-channel-try-get] or
@racket[os-async-channel-get].
When a thread is blocked on an otherwise inaccessible asynchronous
channel that was produced by @racket[make-os-async-channel], the
thread is @emph{not} available for garbage collection. That's
different from a thread is blocked on a regular Racket channel or a
place channel.
@history[#:added "8.0.0.4"]
@defproc[(make-os-async-channel) os-async-channel?]{
Creates a new, empty asynchronous channel for use with
operating-system threads.}
@defproc[(os-async-channel? [v any/c]) boolean?]{
Returns @racket[#t] if @racket[v] is an asynchronous channel produced
by @racket[make-os-async-channel], @racket[#f] otherwise.}
@defproc[(os-async-channel-put [ch os-async-channel?] [v any/c]) void?]{
Enqueues @racket[v] in the asynchronous channel @racket[ch]. This
function can be called from a Racket thread or any operating-system
thread.}
@defproc[(os-async-channel-try-get [ch os-async-channel?] [default-v any/c #f]) any/c]{
Dequeues a value from the the asynchronous channel @racket[ch] and
returns it, if a value is available. If no value is immediately
available in the channel, @racket[default-v] is returned. This
function can be called from a Racket thread or any operating-system
thread.}
@defproc[(os-async-channel-get [ch os-async-channel?]) any/c]{
Dequeues a value from the the asynchronous channel @racket[ch] and
returns it, blocking until a value is available. This function can be
called from any non-Racket operating-system thread. This function
should @emph{not} be called from a Racket thread, since it blocks in a
way that will block all Racket threads within a place; in a Racket
thread, use @racket[sync], instead. }

View File

@ -0,0 +1,41 @@
#lang racket/base
(require ffi/unsafe/os-thread
ffi/unsafe/os-async-channel)
(when (os-thread-enabled?)
(for ([i 100])
(printf "Try ~s\n" i)
(define ch (make-os-async-channel))
(define N 4)
(define M 100)
;; one 0 per thread:
(for ([j N])
(os-async-channel-put ch 0))
;; each thread increments M times:
(for ([j N])
(call-in-os-thread
(lambda ()
(for ([k M])
(define val
(cond
[(even? k)
;; spins
(let loop ()
(or (os-async-channel-try-get ch)
(loop)))]
[else
;; blocks
(os-async-channel-get ch)]))
(os-async-channel-put ch (add1 val))))))
;; main thread loop; consume numbers from the queue
;; until they add up to the number of threads time M;
;; put a 0 back each time we're not done, yet
(let loop ([counter (* N M)] [steps 1])
(define new-counter (- counter (sync ch)))
(cond
[(zero? new-counter)
(printf "Done in ~a gets\n" steps)]
[else
(os-async-channel-put ch 0)
(loop new-counter (add1 steps))]))))

View File

@ -0,0 +1,79 @@
#lang racket/base
(require "os-thread.rkt"
"schedule.rkt"
"atomic.rkt")
(provide os-async-channel?
make-os-async-channel
os-async-channel-put
os-async-channel-try-get
os-async-channel-get)
(struct os-async-channel (head tail lock ready signal)
#:mutable
#:property prop:evt (unsafe-poller
(lambda (self wakeups)
(define vals (try-get self (not wakeups)))
(if vals
(values vals #f)
(values #f self)))))
;; In atomic mode, if in a Racket thread
(define (try-get ac dequeue?)
(os-semaphore-wait (os-async-channel-lock ac))
(when (and (null? (os-async-channel-head ac))
(pair? (os-async-channel-tail ac)))
(set-os-async-channel-head! ac (reverse (os-async-channel-tail ac)))
(set-os-async-channel-tail! ac null))
(define head (os-async-channel-head ac))
(when (and dequeue? (pair? head))
(set-os-async-channel-head! ac (cdr head)))
(os-semaphore-post (os-async-channel-lock ac))
(if (pair? head)
(list (car head))
#f))
(define (make-os-async-channel)
(unless os-thread-enabled?
(raise (exn:fail:unsupported "make-os-async-channel: OS threads are not supported"
(current-continuation-marks))))
(define sema (make-os-semaphore))
(os-semaphore-post sema)
(os-async-channel null null sema (make-os-semaphore) (unsafe-make-signal-received)))
;; Works in any thread:
(define (os-async-channel-put ac v)
(unless (os-async-channel? ac)
(raise-argument-error 'os-async-channel-put "os-async-channel?" ac))
(start-atomic) ; allowed even in non-Racket threads
(os-semaphore-wait (os-async-channel-lock ac))
(set-os-async-channel-tail! ac (cons v (os-async-channel-tail ac)))
(os-semaphore-post (os-async-channel-lock ac))
;; in case a non-Racket thread is waiting:
(os-semaphore-post (os-async-channel-ready ac))
;; in case scheduler is waiting in Racket thread:
((os-async-channel-signal ac))
(end-atomic)) ; allowed even in non-Racket threads
;; Works in any thread:
(define (os-async-channel-try-get ac [default #f])
(unless (os-async-channel? ac)
(raise-argument-error 'os-async-channel-try-get "os-async-channel?" ac))
(define vals (try-get ac #t))
(if vals
(car vals)
default))
;; Works only in a non-Racket thread:
(define (os-async-channel-get ac)
(unless (os-async-channel? ac)
;; This is just a fancy way to crash, since raising exceptions in
;; a non-Racket thread will not work:
(raise-argument-error 'os-async-channel-get "os-async-channel?" ac))
(let loop ()
(define vals (try-get ac #t))
(cond
[vals (car vals)]
[else
(os-semaphore-wait (os-async-channel-ready ac))
(loop)])))

View File

@ -45,7 +45,9 @@
;; Special handling of `current-atomic` to use the last virtual register, and
;; similarr for other. We rely on the fact that the register's default value is 0
;; or the rumble layer installs a suitable default. Also, force inline a few
;; functions and handle other special cases.
;; functions and handle other special cases. Note that the implementation of
;; `start-atomic` and `end-atomic` rely on some specific parameters being thread
;; registers so that the functions can be safely called from any Scheme thread.
(define-syntax (define stx)
(let ([define-as-virtual-register
(lambda (stx n)

View File

@ -57,11 +57,16 @@
;; inlined in Chez Scheme embedding:
(define (start-atomic)
;; Althogh it's adjusting atomicity for the thread scheduler,
;; this function is documented as working in any Scheme thread.
;; The current implementation relies on parameters like
;; `future-barrier` and `current-atomic` being virtual registers
(future-barrier)
(current-atomic (fx+ (current-atomic) 1)))
;; inlined in Chez Scheme embedding:
(define (end-atomic)
;; See `start-atomic` note on calls from any Scheme thread
(define n (fx- (current-atomic) 1))
(cond
[(fx= n 0)