cs: implement unsafe-{file-descriptor,socket}->semaphore

Use `unsafe-{file-descriptor,socket}->semaphore` internally to
make scheduling scalable for threads blocked on sockets, like
traditional Racket.
This commit is contained in:
Matthew Flatt 2019-04-25 19:22:55 -06:00
parent 05fe76f49a
commit 776996fe24
14 changed files with 140 additions and 45 deletions

View File

@ -108,7 +108,8 @@ previously created for the given file descriptor or socket. Semaphores
must be unregistered before the file descriptor or socket is closed.
Beware that closing a port from @racket[unsafe-file-descriptor->port]
or @racket[unsafe-socket->port] will also ready and unregister
semaphores.}
semaphores. In all of those cases, however, the semaphore is made
ready asynchronously, so there may be a detectable delay.}
@defproc[(unsafe-fd->evt [fd exact-integer?]
@ -142,9 +143,9 @@ socketin the specified mode.
The @racket['remove] mode readies and unregisters any events
previously created for the given file descriptor or socket. Events
must be unregistered before the file descriptor or socket is
closed. Unlike @racket[unsafe-file-descriptor->semaphore] and
@racket[unsafe-socket->semaphore], closing a port from
@racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port]
does not unregister events.
closed. Unlike the semaphore result of @racket[unsafe-file-descriptor->semaphore] and
@racket[unsafe-socket->semaphore], the event result of
@racket[unsafe-fd->evt] is not triggered or unregistered by closing a port---not
even a port from @racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port].
@history[#:added "7.2.0.6"]}

View File

@ -336,7 +336,7 @@ Safety and Debugging Mode
-------------------------
If you make changes to files in "rumble", you should turn off
`[RUMBLE_]UNSAFE_COMP` in the makefile.
`UNSAFE_COMP` in the makefile.
You may want to turn on `DEBUG_COMP` in the makefile, so that
backtraces provide expression-specific source locations instead of
@ -465,36 +465,6 @@ atomic regions:
It may be tempting to use that flag for other purposes, as a
cheap way to disable thread swaps. For now, don't do that.
Status and Thoughts on Various Racket Subsystems
------------------------------------------------
* Applicable structs work by adding an indirection to each function
call when the target is not obviously a plain procedure; with the
analysis in "../schemify/schemify.rkt", the indirection is not
needed often in a typical program, and the overhead appears to be
light when it is needed.
* The "rktio" library fills the gap between Racket and Chez Scheme's
native I/O. The "rktio" library provides a minimal, non-blocking,
non-GCed interface to OS-specific functionality. Its' compiled to a
shared library and loadied into Chez Scheme, and then Racket's I/O
API is implemented in Racket by calling rktio as a kind of foreign
library.
* The Racket and Chez Scheme numeric systems likely differ in some
ways, and I don't know how much work that will be.
* For futures, Chez Scheme exposes OS-level threads with limited
safety guarantees. An implementation of futures can probably take
advantage of threads with thread-unsafe primitives wrapped to
divert to a barrier when called in a future.
* GC-based memory accounting requires new support from Chez Scheme.
* Extflonums will probably exist only on the traditional Racket
implementation for a long while.
Performance Notes
-----------------

View File

@ -17,7 +17,9 @@
;; Remapped to place-local register operations:
[unsafe-make-place-local rumble:unsafe-make-place-local]
[unsafe-place-local-ref rumble:unsafe-place-local-ref]
[unsafe-place-local-set! rumble:unsafe-place-local-set!])
[unsafe-place-local-set! rumble:unsafe-place-local-set!]
[immobile-cell->address rumble:immobile-cell->address]
[address->immobile-cell rumble:address->immobile-cell])
(thread))
(include "place-register.ss")
@ -44,7 +46,7 @@
;; ----------------------------------------
(module (|#%rktio-instance| ptr->address)
(module (|#%rktio-instance| ptr->address address->ptr)
(meta define (convert-type t)
(syntax-case t (ref *ref rktio_bool_t rktio_const_string_t)
[(ref . _) #'uptr]
@ -360,6 +362,12 @@
form ...)]))
(include "../rktio/rktio.rktl"))))
(define (immobile-cell->address p)
(address->ptr (rumble:immobile-cell->address p)))
(define (address->immobile-cell p)
(rumble:address->immobile-cell (ptr->address p)))
;; ----------------------------------------
(define format

View File

@ -614,6 +614,9 @@
ptr-set! saved-errno set-cpointer-tag! set-ptr-offset! vector->cpointer
unsafe-register-process-global unsafe-add-global-finalizer
(rename [ffi-lib* ffi-lib])
immobile-cell-ref ; not exported to Racket
immobile-cell->address ; not exported to Racket
address->immobile-cell ; not exported to Racket
set-ffi-get-lib-and-obj! ; not exported to Racket
poll-async-callbacks ; not exported to Racket
set-make-async-callback-poll-wakeup! ; not exported to Racket

View File

@ -144,7 +144,10 @@
;; assumption that the address is the payload of a byte
;; string:
(define (addr->gcpointer-memory v) ; call with GC disabled
(#%$address->object v (- bytevector-content-offset)))
(#%$address->object v bytevector-content-offset))
(define (addr->vector v) ; call with GC disabled or when reuslt is locked
(#%$address->object v vector-content-offset))
;; Converts a primitive cpointer (normally the result of
;; `unwrap-cpointer`) to a raw foreign address. The
@ -1351,6 +1354,15 @@
(define (free-immobile-cell b)
(unlock-object (cpointer-memory b)))
(define (immobile-cell-ref b)
(#%vector-ref (cpointer-memory b) 0))
(define (immobile-cell->address b)
(vector->addr (cpointer-memory b)))
(define (address->immobile-cell a)
(make-cpointer/cell (addr->vector a) #f))
(define (malloc-mode? v)
(chez:memq v '(raw atomic nonatomic tagged
atomic-interior interior

View File

@ -5,6 +5,11 @@
unsafe-custodian-unregister)
"../../thread/current-sandman.rkt"
ffi/unsafe/atomic
(only-in ffi/unsafe
malloc-immobile-cell
free-immobile-cell
ptr-ref
_racket)
"bootstrap-rktio.rkt")
;; Approximate scheduler cooperation where `async-evt` can be used
@ -83,7 +88,12 @@
'unsafe-place-local-set! set-box!
'unsafe-add-global-finalizer (lambda (v proc) (void))
'unsafe-strip-impersonator (lambda (v) v)
'prop:unsafe-authentic-override prop:unsafe-authentic-override))
'prop:unsafe-authentic-override prop:unsafe-authentic-override
'malloc-immobile-cell malloc-immobile-cell
'free-immobile-cell free-immobile-cell
'immobile-cell-ref (lambda (ib) (ptr-ref ib _racket))
'immobile-cell->address (lambda (b) b)
'address->immobile-cell (lambda (b) b)))
(primitive-table '#%thread
(hasheq 'thread thread
@ -93,6 +103,8 @@
'thread-resume thread-resume
'make-semaphore make-semaphore
'semaphore-post semaphore-post
'semaphore-post-all (lambda (s) (for ([i (in-range 100)])
(semaphore-post s)))
'semaphore-wait semaphore-wait
'semaphore-peek-evt semaphore-peek-evt
'make-channel make-channel

View File

@ -26,4 +26,9 @@
unsafe-add-global-finalizer
unsafe-strip-impersonator
prop:unsafe-authentic-override
set-fs-change-properties!)
set-fs-change-properties!
malloc-immobile-cell
free-immobile-cell
immobile-cell-ref
immobile-cell->address
address->immobile-cell)

View File

@ -37,6 +37,7 @@
thread-resume
make-semaphore
semaphore-post
semaphore-post-all
semaphore-wait
semaphore-peek-evt
make-channel

View File

@ -6,6 +6,7 @@
"../host/thread.rkt"
"../host/pthread.rkt"
"../sandman/main.rkt"
"../sandman/ltps.rkt"
"../file/error.rkt"
"port.rkt"
"input-port.rkt"
@ -32,6 +33,7 @@
(define (fd-close fd fd-refcount)
(set-box! fd-refcount (sub1 (unbox fd-refcount)))
(when (zero? (unbox fd-refcount))
(fd-semaphore-update! fd 'remove)
(define v (rktio_close rktio fd))
(when (rktio-error? v)
(end-atomic)
@ -400,6 +402,16 @@
(cond
[ready?
(values (list fde) #f)]
;; If the called is going to block (i.e., not just polling), then
;; try to get a semaphore to represent the file descriptor, because
;; that can be more scalable (especially for lots of TCP sockets)
[(and (not (sandman-poll-ctx-poll? ctx))
(fd-semaphore-update! (fd-evt-fd fde)
(if (eqv? RKTIO_POLL_READ (bitwise-and mode RKTIO_POLL_READ))
'read
'write)))
=> (lambda (s) ; got a semaphore
(values #f (wrap-evt s (lambda (s) fde))))]
[else
;; If `sched-info` in `poll-ctx` is not #f, then we can register this file
;; descriptor so that if no thread is able to make progress,

View File

@ -1,14 +1,22 @@
#lang racket/base
(require "../host/rktio.rkt"
"../host/thread.rkt"
"../host/pthread.rkt"
"../host/place-local.rkt")
(provide shared-ltps
shared-ltps-place-init!)
shared-ltps-place-init!
fd-semaphore-update!
fd-semaphore-poll-ready)
(define (make-ltps)
(define ltps (rktio_ltps_open rktio))
(unless (rktio-error? ltps)
;; Rely on module ordering to ensure that this
;; custodian registration precedes any port (or other)
;; registrations that will need to be removed before
;; the ltps is closed
(unsafe-custodian-register (current-custodian)
ltps
;; in atomic mode
@ -24,3 +32,48 @@
(define (shared-ltps-place-init!)
(make-ltps))
;; ----------------------------------------
;; in atomic mode
(define (fd-semaphore-update! fd mode)
(cond
[(eq? shared-ltps rktio_NULL) #f]
[else
(define h (rktio_ltps_add rktio shared-ltps fd (case mode
[(read) RKTIO_LTPS_CREATE_READ]
[(write) RKTIO_LTPS_CREATE_WRITE]
[(check-read) RKTIO_LTPS_CHECK_READ]
[(check-write) RKTIO_LTPS_CHECK_WRITE]
[else RKTIO_LTPS_REMOVE])))
(cond
[(rktio-error? h)
;; We could log failures that are not RKTIO_ERROR_LTPS_REMOVED or RKTIO_ERROR_LTPS_NOT_FOUND
#f]
[else
(define ib (rktio_ltps_handle_get_data rktio h))
(cond
[(not (eq? ib rktio_NULL))
(immobile-cell-ref (address->immobile-cell ib))]
[else
(define s (make-semaphore))
(define ib (malloc-immobile-cell s))
(rktio_ltps_handle_set_data rktio h (immobile-cell->address ib))
s])])]))
;; in atomic mode
(define (fd-semaphore-poll-ready)
(unless (eq? shared-ltps rktio_NULL)
(rktio_ltps_poll rktio shared-ltps)
(let loop ()
(define h (rktio_ltps_get_signaled_handle rktio shared-ltps))
(cond
[(rktio-error? h)
;; Could log an error that isn't RKTIO_ERROR_LTPS_NOT_FOUND
(void)]
[else
(define ib (address->immobile-cell (rktio_ltps_handle_get_data rktio h)))
(semaphore-post-all (immobile-cell-ref ib))
(free-immobile-cell ib)
(rktio_free h)
(loop)]))))

View File

@ -22,6 +22,7 @@
sandman-poll-ctx-add-poll-set-adder!
sandman-poll-ctx-merge-timeout
sandman-set-background-sleep!
sandman-poll-ctx-poll?
sandman-place-init!)
(struct exts (timeout-at fd-adders))
@ -46,6 +47,8 @@
(schedule-info-current-exts sched-info)
timeout))))
(define (sandman-poll-ctx-poll? poll-ctx)
(poll-ctx-poll? poll-ctx))
(define-place-local background-sleep #f)
(define-place-local background-sleep-fd #f)
@ -104,6 +107,7 @@
[(eqv? v RKTIO_OS_SIGNAL_TERM) 'terminate]
[else 'break]))
(check-signals)))
(fd-semaphore-poll-ready)
((sandman-do-poll timeout-sandman) mode wakeup))
;; get-wakeup

View File

@ -3,7 +3,8 @@
"../host/thread.rkt"
"../string/convert.rkt"
"../port/fd-port.rkt"
"../network/tcp-port.rkt")
"../network/tcp-port.rkt"
"../sandman/ltps.rkt")
(provide unsafe-file-descriptor->port
unsafe-port->file-descriptor
@ -47,11 +48,22 @@
(and (tcp-port? p)
(unsafe-port->file-descriptor p)))
(define (unsafe-fd->semaphore system-fd mode socket?)
(start-atomic)
(define fd (rktio_system_fd rktio system-fd
(bitwise-ior RKTIO_OPEN_READ
RKTIO_OPEN_WRITE
(if socket? RKTIO_OPEN_SOCKET 0))))
(define sema (fd-semaphore-update! fd mode))
(rktio_forget rktio fd)
(end-atomic)
sema)
(define (unsafe-file-descriptor->semaphore system-fd mode)
#f)
(unsafe-fd->semaphore system-fd mode #f))
(define (unsafe-socket->semaphore system-fd mode)
#f)
(unsafe-fd->semaphore system-fd mode #t))
(define (unsafe-poll-fd system-fd mode [socket? #t])
(atomically

View File

@ -27,6 +27,7 @@
'thread-resume thread-resume
'make-semaphore make-semaphore
'semaphore-post semaphore-post
'semaphore-post-all semaphore-post-all
'semaphore-wait semaphore-wait
'semaphore-peek-evt semaphore-peek-evt
'make-channel make-channel

View File

@ -63,6 +63,7 @@
make-semaphore
semaphore-post
semaphore-post-all
semaphore-wait
semaphore-try-wait?
semaphore?