From 776996fe241f81b40e54d312be48bfe142da9e67 Mon Sep 17 00:00:00 2001 From: Matthew Flatt Date: Thu, 25 Apr 2019 19:22:55 -0600 Subject: [PATCH] cs: implement `unsafe-{file-descriptor,socket}->semaphore` Use `unsafe-{file-descriptor,socket}->semaphore` internally to make scheduling scalable for threads blocked on sockets, like traditional Racket. --- .../racket-doc/scribblings/foreign/port.scrbl | 11 ++-- racket/src/cs/README.txt | 32 +---------- racket/src/cs/io.sls | 12 +++- racket/src/cs/rumble.sls | 3 + racket/src/cs/rumble/foreign.ss | 14 ++++- racket/src/io/host/bootstrap.rkt | 14 ++++- racket/src/io/host/pthread.rkt | 7 ++- racket/src/io/host/thread.rkt | 1 + racket/src/io/port/fd-port.rkt | 12 ++++ racket/src/io/sandman/ltps.rkt | 55 ++++++++++++++++++- racket/src/io/sandman/main.rkt | 4 ++ racket/src/io/unsafe/port.rkt | 18 +++++- racket/src/thread/instance.rkt | 1 + racket/src/thread/main.rkt | 1 + 14 files changed, 140 insertions(+), 45 deletions(-) diff --git a/pkgs/racket-doc/scribblings/foreign/port.scrbl b/pkgs/racket-doc/scribblings/foreign/port.scrbl index 1113639e50..2a78b5218e 100644 --- a/pkgs/racket-doc/scribblings/foreign/port.scrbl +++ b/pkgs/racket-doc/scribblings/foreign/port.scrbl @@ -108,7 +108,8 @@ previously created for the given file descriptor or socket. Semaphores must be unregistered before the file descriptor or socket is closed. Beware that closing a port from @racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port] will also ready and unregister -semaphores.} +semaphores. In all of those cases, however, the semaphore is made +ready asynchronously, so there may be a detectable delay.} @defproc[(unsafe-fd->evt [fd exact-integer?] @@ -142,9 +143,9 @@ socketin the specified mode. The @racket['remove] mode readies and unregisters any events previously created for the given file descriptor or socket. Events must be unregistered before the file descriptor or socket is -closed. Unlike @racket[unsafe-file-descriptor->semaphore] and -@racket[unsafe-socket->semaphore], closing a port from -@racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port] -does not unregister events. +closed. Unlike the semaphore result of @racket[unsafe-file-descriptor->semaphore] and +@racket[unsafe-socket->semaphore], the event result of +@racket[unsafe-fd->evt] is not triggered or unregistered by closing a port---not +even a port from @racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port]. @history[#:added "7.2.0.6"]} diff --git a/racket/src/cs/README.txt b/racket/src/cs/README.txt index 9dc59d0bcf..9faaea88ba 100644 --- a/racket/src/cs/README.txt +++ b/racket/src/cs/README.txt @@ -336,7 +336,7 @@ Safety and Debugging Mode ------------------------- If you make changes to files in "rumble", you should turn off -`[RUMBLE_]UNSAFE_COMP` in the makefile. +`UNSAFE_COMP` in the makefile. You may want to turn on `DEBUG_COMP` in the makefile, so that backtraces provide expression-specific source locations instead of @@ -465,36 +465,6 @@ atomic regions: It may be tempting to use that flag for other purposes, as a cheap way to disable thread swaps. For now, don't do that. - -Status and Thoughts on Various Racket Subsystems ------------------------------------------------- - - * Applicable structs work by adding an indirection to each function - call when the target is not obviously a plain procedure; with the - analysis in "../schemify/schemify.rkt", the indirection is not - needed often in a typical program, and the overhead appears to be - light when it is needed. - - * The "rktio" library fills the gap between Racket and Chez Scheme's - native I/O. The "rktio" library provides a minimal, non-blocking, - non-GCed interface to OS-specific functionality. Its' compiled to a - shared library and loadied into Chez Scheme, and then Racket's I/O - API is implemented in Racket by calling rktio as a kind of foreign - library. - - * The Racket and Chez Scheme numeric systems likely differ in some - ways, and I don't know how much work that will be. - - * For futures, Chez Scheme exposes OS-level threads with limited - safety guarantees. An implementation of futures can probably take - advantage of threads with thread-unsafe primitives wrapped to - divert to a barrier when called in a future. - - * GC-based memory accounting requires new support from Chez Scheme. - - * Extflonums will probably exist only on the traditional Racket - implementation for a long while. - Performance Notes ----------------- diff --git a/racket/src/cs/io.sls b/racket/src/cs/io.sls index af4d0038f2..fd5d8433df 100644 --- a/racket/src/cs/io.sls +++ b/racket/src/cs/io.sls @@ -17,7 +17,9 @@ ;; Remapped to place-local register operations: [unsafe-make-place-local rumble:unsafe-make-place-local] [unsafe-place-local-ref rumble:unsafe-place-local-ref] - [unsafe-place-local-set! rumble:unsafe-place-local-set!]) + [unsafe-place-local-set! rumble:unsafe-place-local-set!] + [immobile-cell->address rumble:immobile-cell->address] + [address->immobile-cell rumble:address->immobile-cell]) (thread)) (include "place-register.ss") @@ -44,7 +46,7 @@ ;; ---------------------------------------- - (module (|#%rktio-instance| ptr->address) + (module (|#%rktio-instance| ptr->address address->ptr) (meta define (convert-type t) (syntax-case t (ref *ref rktio_bool_t rktio_const_string_t) [(ref . _) #'uptr] @@ -360,6 +362,12 @@ form ...)])) (include "../rktio/rktio.rktl")))) + (define (immobile-cell->address p) + (address->ptr (rumble:immobile-cell->address p))) + + (define (address->immobile-cell p) + (rumble:address->immobile-cell (ptr->address p))) + ;; ---------------------------------------- (define format diff --git a/racket/src/cs/rumble.sls b/racket/src/cs/rumble.sls index c23a5f589e..d5ef8c9536 100644 --- a/racket/src/cs/rumble.sls +++ b/racket/src/cs/rumble.sls @@ -614,6 +614,9 @@ ptr-set! saved-errno set-cpointer-tag! set-ptr-offset! vector->cpointer unsafe-register-process-global unsafe-add-global-finalizer (rename [ffi-lib* ffi-lib]) + immobile-cell-ref ; not exported to Racket + immobile-cell->address ; not exported to Racket + address->immobile-cell ; not exported to Racket set-ffi-get-lib-and-obj! ; not exported to Racket poll-async-callbacks ; not exported to Racket set-make-async-callback-poll-wakeup! ; not exported to Racket diff --git a/racket/src/cs/rumble/foreign.ss b/racket/src/cs/rumble/foreign.ss index d651ac3218..7ea924c82c 100644 --- a/racket/src/cs/rumble/foreign.ss +++ b/racket/src/cs/rumble/foreign.ss @@ -144,7 +144,10 @@ ;; assumption that the address is the payload of a byte ;; string: (define (addr->gcpointer-memory v) ; call with GC disabled - (#%$address->object v (- bytevector-content-offset))) + (#%$address->object v bytevector-content-offset)) + +(define (addr->vector v) ; call with GC disabled or when reuslt is locked + (#%$address->object v vector-content-offset)) ;; Converts a primitive cpointer (normally the result of ;; `unwrap-cpointer`) to a raw foreign address. The @@ -1351,6 +1354,15 @@ (define (free-immobile-cell b) (unlock-object (cpointer-memory b))) +(define (immobile-cell-ref b) + (#%vector-ref (cpointer-memory b) 0)) + +(define (immobile-cell->address b) + (vector->addr (cpointer-memory b))) + +(define (address->immobile-cell a) + (make-cpointer/cell (addr->vector a) #f)) + (define (malloc-mode? v) (chez:memq v '(raw atomic nonatomic tagged atomic-interior interior diff --git a/racket/src/io/host/bootstrap.rkt b/racket/src/io/host/bootstrap.rkt index 410965d402..e22b2f1e72 100644 --- a/racket/src/io/host/bootstrap.rkt +++ b/racket/src/io/host/bootstrap.rkt @@ -5,6 +5,11 @@ unsafe-custodian-unregister) "../../thread/current-sandman.rkt" ffi/unsafe/atomic + (only-in ffi/unsafe + malloc-immobile-cell + free-immobile-cell + ptr-ref + _racket) "bootstrap-rktio.rkt") ;; Approximate scheduler cooperation where `async-evt` can be used @@ -83,7 +88,12 @@ 'unsafe-place-local-set! set-box! 'unsafe-add-global-finalizer (lambda (v proc) (void)) 'unsafe-strip-impersonator (lambda (v) v) - 'prop:unsafe-authentic-override prop:unsafe-authentic-override)) + 'prop:unsafe-authentic-override prop:unsafe-authentic-override + 'malloc-immobile-cell malloc-immobile-cell + 'free-immobile-cell free-immobile-cell + 'immobile-cell-ref (lambda (ib) (ptr-ref ib _racket)) + 'immobile-cell->address (lambda (b) b) + 'address->immobile-cell (lambda (b) b))) (primitive-table '#%thread (hasheq 'thread thread @@ -93,6 +103,8 @@ 'thread-resume thread-resume 'make-semaphore make-semaphore 'semaphore-post semaphore-post + 'semaphore-post-all (lambda (s) (for ([i (in-range 100)]) + (semaphore-post s))) 'semaphore-wait semaphore-wait 'semaphore-peek-evt semaphore-peek-evt 'make-channel make-channel diff --git a/racket/src/io/host/pthread.rkt b/racket/src/io/host/pthread.rkt index 027141ceb7..4a1fc38956 100644 --- a/racket/src/io/host/pthread.rkt +++ b/racket/src/io/host/pthread.rkt @@ -26,4 +26,9 @@ unsafe-add-global-finalizer unsafe-strip-impersonator prop:unsafe-authentic-override - set-fs-change-properties!) + set-fs-change-properties! + malloc-immobile-cell + free-immobile-cell + immobile-cell-ref + immobile-cell->address + address->immobile-cell) diff --git a/racket/src/io/host/thread.rkt b/racket/src/io/host/thread.rkt index acde4ee5bc..823ad48edc 100644 --- a/racket/src/io/host/thread.rkt +++ b/racket/src/io/host/thread.rkt @@ -37,6 +37,7 @@ thread-resume make-semaphore semaphore-post + semaphore-post-all semaphore-wait semaphore-peek-evt make-channel diff --git a/racket/src/io/port/fd-port.rkt b/racket/src/io/port/fd-port.rkt index ce493b47f4..bb5f4d430c 100644 --- a/racket/src/io/port/fd-port.rkt +++ b/racket/src/io/port/fd-port.rkt @@ -6,6 +6,7 @@ "../host/thread.rkt" "../host/pthread.rkt" "../sandman/main.rkt" + "../sandman/ltps.rkt" "../file/error.rkt" "port.rkt" "input-port.rkt" @@ -32,6 +33,7 @@ (define (fd-close fd fd-refcount) (set-box! fd-refcount (sub1 (unbox fd-refcount))) (when (zero? (unbox fd-refcount)) + (fd-semaphore-update! fd 'remove) (define v (rktio_close rktio fd)) (when (rktio-error? v) (end-atomic) @@ -400,6 +402,16 @@ (cond [ready? (values (list fde) #f)] + ;; If the called is going to block (i.e., not just polling), then + ;; try to get a semaphore to represent the file descriptor, because + ;; that can be more scalable (especially for lots of TCP sockets) + [(and (not (sandman-poll-ctx-poll? ctx)) + (fd-semaphore-update! (fd-evt-fd fde) + (if (eqv? RKTIO_POLL_READ (bitwise-and mode RKTIO_POLL_READ)) + 'read + 'write))) + => (lambda (s) ; got a semaphore + (values #f (wrap-evt s (lambda (s) fde))))] [else ;; If `sched-info` in `poll-ctx` is not #f, then we can register this file ;; descriptor so that if no thread is able to make progress, diff --git a/racket/src/io/sandman/ltps.rkt b/racket/src/io/sandman/ltps.rkt index 4c02624d04..5c9a38704a 100644 --- a/racket/src/io/sandman/ltps.rkt +++ b/racket/src/io/sandman/ltps.rkt @@ -1,14 +1,22 @@ #lang racket/base (require "../host/rktio.rkt" "../host/thread.rkt" + "../host/pthread.rkt" "../host/place-local.rkt") (provide shared-ltps - shared-ltps-place-init!) + shared-ltps-place-init! + + fd-semaphore-update! + fd-semaphore-poll-ready) (define (make-ltps) (define ltps (rktio_ltps_open rktio)) (unless (rktio-error? ltps) + ;; Rely on module ordering to ensure that this + ;; custodian registration precedes any port (or other) + ;; registrations that will need to be removed before + ;; the ltps is closed (unsafe-custodian-register (current-custodian) ltps ;; in atomic mode @@ -24,3 +32,48 @@ (define (shared-ltps-place-init!) (make-ltps)) + +;; ---------------------------------------- + +;; in atomic mode +(define (fd-semaphore-update! fd mode) + (cond + [(eq? shared-ltps rktio_NULL) #f] + [else + (define h (rktio_ltps_add rktio shared-ltps fd (case mode + [(read) RKTIO_LTPS_CREATE_READ] + [(write) RKTIO_LTPS_CREATE_WRITE] + [(check-read) RKTIO_LTPS_CHECK_READ] + [(check-write) RKTIO_LTPS_CHECK_WRITE] + [else RKTIO_LTPS_REMOVE]))) + (cond + [(rktio-error? h) + ;; We could log failures that are not RKTIO_ERROR_LTPS_REMOVED or RKTIO_ERROR_LTPS_NOT_FOUND + #f] + [else + (define ib (rktio_ltps_handle_get_data rktio h)) + (cond + [(not (eq? ib rktio_NULL)) + (immobile-cell-ref (address->immobile-cell ib))] + [else + (define s (make-semaphore)) + (define ib (malloc-immobile-cell s)) + (rktio_ltps_handle_set_data rktio h (immobile-cell->address ib)) + s])])])) + +;; in atomic mode +(define (fd-semaphore-poll-ready) + (unless (eq? shared-ltps rktio_NULL) + (rktio_ltps_poll rktio shared-ltps) + (let loop () + (define h (rktio_ltps_get_signaled_handle rktio shared-ltps)) + (cond + [(rktio-error? h) + ;; Could log an error that isn't RKTIO_ERROR_LTPS_NOT_FOUND + (void)] + [else + (define ib (address->immobile-cell (rktio_ltps_handle_get_data rktio h))) + (semaphore-post-all (immobile-cell-ref ib)) + (free-immobile-cell ib) + (rktio_free h) + (loop)])))) diff --git a/racket/src/io/sandman/main.rkt b/racket/src/io/sandman/main.rkt index c07881ce71..50c9ee027e 100644 --- a/racket/src/io/sandman/main.rkt +++ b/racket/src/io/sandman/main.rkt @@ -22,6 +22,7 @@ sandman-poll-ctx-add-poll-set-adder! sandman-poll-ctx-merge-timeout sandman-set-background-sleep! + sandman-poll-ctx-poll? sandman-place-init!) (struct exts (timeout-at fd-adders)) @@ -46,6 +47,8 @@ (schedule-info-current-exts sched-info) timeout)))) +(define (sandman-poll-ctx-poll? poll-ctx) + (poll-ctx-poll? poll-ctx)) (define-place-local background-sleep #f) (define-place-local background-sleep-fd #f) @@ -104,6 +107,7 @@ [(eqv? v RKTIO_OS_SIGNAL_TERM) 'terminate] [else 'break])) (check-signals))) + (fd-semaphore-poll-ready) ((sandman-do-poll timeout-sandman) mode wakeup)) ;; get-wakeup diff --git a/racket/src/io/unsafe/port.rkt b/racket/src/io/unsafe/port.rkt index 73d7a2dcd2..de566e4b0e 100644 --- a/racket/src/io/unsafe/port.rkt +++ b/racket/src/io/unsafe/port.rkt @@ -3,7 +3,8 @@ "../host/thread.rkt" "../string/convert.rkt" "../port/fd-port.rkt" - "../network/tcp-port.rkt") + "../network/tcp-port.rkt" + "../sandman/ltps.rkt") (provide unsafe-file-descriptor->port unsafe-port->file-descriptor @@ -47,11 +48,22 @@ (and (tcp-port? p) (unsafe-port->file-descriptor p))) +(define (unsafe-fd->semaphore system-fd mode socket?) + (start-atomic) + (define fd (rktio_system_fd rktio system-fd + (bitwise-ior RKTIO_OPEN_READ + RKTIO_OPEN_WRITE + (if socket? RKTIO_OPEN_SOCKET 0)))) + (define sema (fd-semaphore-update! fd mode)) + (rktio_forget rktio fd) + (end-atomic) + sema) + (define (unsafe-file-descriptor->semaphore system-fd mode) - #f) + (unsafe-fd->semaphore system-fd mode #f)) (define (unsafe-socket->semaphore system-fd mode) - #f) + (unsafe-fd->semaphore system-fd mode #t)) (define (unsafe-poll-fd system-fd mode [socket? #t]) (atomically diff --git a/racket/src/thread/instance.rkt b/racket/src/thread/instance.rkt index 6ab32d91bd..be1afe3a47 100644 --- a/racket/src/thread/instance.rkt +++ b/racket/src/thread/instance.rkt @@ -27,6 +27,7 @@ 'thread-resume thread-resume 'make-semaphore make-semaphore 'semaphore-post semaphore-post + 'semaphore-post-all semaphore-post-all 'semaphore-wait semaphore-wait 'semaphore-peek-evt semaphore-peek-evt 'make-channel make-channel diff --git a/racket/src/thread/main.rkt b/racket/src/thread/main.rkt index 9a9c8615d8..1a04f596a1 100644 --- a/racket/src/thread/main.rkt +++ b/racket/src/thread/main.rkt @@ -63,6 +63,7 @@ make-semaphore semaphore-post + semaphore-post-all semaphore-wait semaphore-try-wait? semaphore?