add unsafe-poll-fd, unsafe-fd->evt (#2414)

The unsafe-fd->evt interface is based on unsafe-{file-descriptor,socket}->semaphore. 
The main differences are that these events are level-triggered, not edge-triggered, and 
they do not cooperate with ports created by unsafe-{file-descriptor,socket}->port.
This commit is contained in:
Ryan Culpepper 2019-02-20 13:53:11 +01:00 committed by GitHub
parent abb1ce71cc
commit d185257a75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 208 additions and 14 deletions

View File

@ -12,7 +12,7 @@
(define collection 'multi)
(define version "7.2.0.5")
(define version "7.2.0.6")
(define deps `("racket-lib"
["racket" #:version ,version]))

View File

@ -51,7 +51,7 @@ socket.
For any kind of result port, closing the resulting ports readies and
unregisters any semaphores for the file descriptor or socket that were
previously created with @racket[unsafe-file-descriptor->semaphore]
previously created with @racket[unsafe-file-descriptor->semaphore] or
@racket[unsafe-socket->semaphore].}
@ -75,13 +75,30 @@ a socket for @racket[port] if it has one, @racket[#f] otherwise.}
(or/c semaphore? #f)]
)]{
For @racket[mode] as @racket['read] or @racket['write], returns a
semaphore that becomes ready when @racket[fd] or @racket[socket]
becomes ready for reading or writing, respectively. The result is
@racket[#f] if a conversion to a semaphore is not supported for the
current platform or for the given file descriptor or socket.
Returns a semaphore that becomes ready when @racket[fd] or @racket[socket]
is ready for reading or writing, as selected by @racket[mode]. Specifically,
these functions provide a one-shot, @emph{edge-triggered} indicator; the
semaphore is posted the @emph{first time} any of the following cases holds:
The @racket['read-check] and @racket['write-check] modes are like
@itemlist[
@item{@racket[fd] or @racket[socket] is ready for reading or writing
(depending on @racket[mode]),}
@item{ports were created from @racket[fd] or @racket[socket] using
@racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port],
and those ports were closed, or}
@item{a subsequent call occurred with the same @racket[fd] or
@racket[socket] and with @racket['remove] for @racket[mode].}
]
The result is @racket[#f] if a conversion to a semaphore is not
supported for the current platform or for the given file descriptor or
socket.
The @racket['check-read] and @racket['check-write] modes are like
@racket['read] and @racket['write], but the result if @racket[#f] if a
semaphore is not already generated for the specified file descriptor
or socket in the specified mode.
@ -92,3 +109,42 @@ must be unregistered before the file descriptor or socket is closed.
Beware that closing a port from @racket[unsafe-file-descriptor->port]
or @racket[unsafe-socket->port] will also ready and unregister
semaphores.}
@defproc[(unsafe-fd->evt [fd exact-integer?]
[mode (or/c 'read 'write 'check-read 'check-write 'remove)]
[socket? any/c #t])
(or/c evt? #f)]{
Returns an event that is ready when @racket[fd] is ready for reading
or writing, as selected by @racket[mode]. Specifically, it returns a
multi-use, @emph{level-triggered} indicator; the event is ready
@emph{whenever} any of the following cases holds:
@itemlist[
@item{@racket[fd] is ready for reading or writing (depending on
@racket[mode]),}
@item{a subsequent call occurred with the same @racket[fd] and with
@racket['remove] for @racket[mode] (once removed, the event is
perpetually ready).}
]
The synchronization result of the event is the event itself.
The @racket['check-read] and @racket['check-write] modes are like
@racket['read] and @racket['write], but the result is @racket[#f] if
an event is not already generated for the specified file descriptor or
socketin the specified mode.
The @racket['remove] mode readies and unregisters any events
previously created for the given file descriptor or socket. Events
must be unregistered before the file descriptor or socket is
closed. Unlike @racket[unsafe-file-descriptor->semaphore] and
@racket[unsafe-socket->semaphore], closing a port from
@racket[unsafe-file-descriptor->port] or @racket[unsafe-socket->port]
does not unregister events.
@history[#:added "7.2.0.6"]}

View File

@ -46,6 +46,17 @@ value, @racket[poll] can call @racket[unsafe-poll-ctx-fd-wakeup],
register wakeup triggers.}
@defproc[(unsafe-poll-fd [fd exact-integer?]
[mode '(read write)]
[socket? any/c #t])
boolean?]{
Checks whether the given file descriptor or socket is currently ready
for reading or writing, as selected by @racket[mode].
@history[#:added "7.2.0.6"]}
@defproc[(unsafe-poll-ctx-fd-wakeup [wakeups any/c]
[fd fixnum?]
[mode '(read write error)])

View File

@ -6,4 +6,80 @@
unsafe-socket->port
unsafe-port->socket
unsafe-socket->semaphore))
(provide (all-from-out '#%unsafe))
(provide (all-from-out '#%unsafe)
unsafe-fd->evt)
(module fd-evt racket/base
(require (only-in '#%unsafe
unsafe-start-atomic
unsafe-end-atomic
unsafe-poller
unsafe-poll-fd
unsafe-poll-ctx-fd-wakeup))
(provide (protect-out unsafe-fd->evt))
(define socket-different?
(case (system-type 'os)
[(windows) #t]
[else #f]))
(struct fd-evt (sfd mode socket? [closed? #:mutable])
#:property prop:evt
(unsafe-poller
(lambda (self wakeups)
(define sfd (fd-evt-sfd self))
(define mode (fd-evt-mode self))
(define socket? (fd-evt-socket? self))
(cond
[(fd-evt-closed? self)
(values (list self) #f)]
[(unsafe-poll-fd sfd mode socket?)
(values (list self) #f)]
[else
(when wakeups
(unsafe-poll-ctx-fd-wakeup wakeups sfd mode #;socket?))
(values #f self)]))))
;; {file-descriptor,socket}=>{read,write}-evt : (Hasheqv Nat => fd-evt)
(define file-descriptor=>read-evt (make-hasheqv))
(define file-descriptor=>write-evt (make-hasheqv))
(define socket=>read-evt (if socket-different? (make-hasheqv) file-descriptor=>read-evt))
(define socket=>write-evt (if socket-different? (make-hasheqv) file-descriptor=>write-evt))
;; Differences between unsafe-fd->evt and unsafe-{file-descriptor,socket}->semaphore:
;; - level-triggered, not edge-triggered
;; - no cooperation with ports created by unsafe-{file-descriptor,socket}->port
(define (unsafe-fd->evt sfd mode [socket0? #t])
(define socket? (and socket0? #t))
(define sfd=>read-evt (if socket? socket=>read-evt file-descriptor=>read-evt))
(define sfd=>write-evt (if socket? socket=>write-evt file-descriptor=>write-evt))
(unless (exact-integer? sfd)
(raise-argument-error 'unsafe-fd->evt "handle-integer?" 0 sfd mode socket0?))
(unsafe-start-atomic)
(begin0
(case mode
[(read) (hash-ref! sfd=>read-evt sfd (lambda () (fd-evt sfd mode socket? #f)))]
[(write) (hash-ref! sfd=>write-evt sfd (lambda () (fd-evt sfd mode socket? #f)))]
[(check-read) (hash-ref sfd=>read-evt sfd #f)]
[(check-write) (hash-ref sfd=>write-evt sfd #f)]
[(remove)
(define (remove-and-close sfd=>evt)
(define evt (hash-ref sfd=>evt sfd #f))
(when evt
(hash-remove! sfd=>evt sfd)
(set-fd-evt-closed?! evt #t)))
(remove-and-close sfd=>read-evt)
(remove-and-close sfd=>write-evt)
#f]
[(internal-debug)
`((read ,(hash-keys sfd=>read-evt))
(write ,(hash-keys sfd=>write-evt)))]
[else
(unsafe-end-atomic)
(raise-argument-error 'unsafe-fd->evt
"(or/c 'read 'write 'check-read 'check-write 'remove)"
1 sfd mode socket0?)])
(unsafe-end-atomic))))
(require (submod "." fd-evt))

View File

@ -1,6 +1,7 @@
#lang racket/base
(require (only-in '#%unsafe
unsafe-poller
unsafe-poll-fd
unsafe-poll-ctx-fd-wakeup
unsafe-poll-ctx-eventmask-wakeup
unsafe-poll-ctx-milliseconds-wakeup

View File

@ -131,6 +131,7 @@
[unsafe-poll-ctx-eventmask-wakeup (known-procedure 4)]
[unsafe-poll-ctx-fd-wakeup (known-procedure 8)]
[unsafe-poll-ctx-milliseconds-wakeup (known-procedure 4)]
[unsafe-poll-fd (known-procedure 12)]
[unsafe-poller (known-constant)]
[unsafe-port->file-descriptor (known-procedure 2)]
[unsafe-port->socket (known-procedure 2)]

View File

@ -1,5 +1,6 @@
#lang racket/base
(require "../host/rktio.rkt"
"../host/thread.rkt"
"../string/convert.rkt"
"../port/fd-port.rkt"
"../network/tcp-port.rkt")
@ -7,10 +8,12 @@
(provide unsafe-file-descriptor->port
unsafe-port->file-descriptor
unsafe-file-descriptor->semaphore
unsafe-socket->port
unsafe-port->socket
unsafe-socket->semaphore)
unsafe-socket->semaphore
unsafe-poll-fd)
(define (unsafe-file-descriptor->port system-fd name mode)
(define read? (memq 'read mode))
@ -49,3 +52,14 @@
(define (unsafe-socket->semaphore system-fd mode)
#f)
(define (unsafe-poll-fd system-fd mode [socket? #t])
(atomically
(define fd (rktio_system_fd rktio system-fd (if socket? RKTIO_OPEN_SOCKET 0)))
(define ready?
(case mode
[(read) (eqv? (rktio_poll_read_ready rktio fd) RKTIO_POLL_READY)]
[(write) (eqv? (rktio_poll_write_ready rktio fd) RKTIO_POLL_READY)]
[else #f]))
(rktio_forget rktio fd)
ready?))

View File

@ -14,7 +14,7 @@
#define USE_COMPILED_STARTUP 1
#define EXPECTED_PRIM_COUNT 1450
#define EXPECTED_PRIM_COUNT 1451
#ifdef MZSCHEME_SOMETHING_OMITTED
# undef USE_COMPILED_STARTUP

View File

@ -13,12 +13,12 @@
consistently.)
*/
#define MZSCHEME_VERSION "7.2.0.5"
#define MZSCHEME_VERSION "7.2.0.6"
#define MZSCHEME_VERSION_X 7
#define MZSCHEME_VERSION_Y 2
#define MZSCHEME_VERSION_Z 0
#define MZSCHEME_VERSION_W 5
#define MZSCHEME_VERSION_W 6
#define MZSCHEME_VERSION_MAJOR ((MZSCHEME_VERSION_X * 100) + MZSCHEME_VERSION_Y)
#define MZSCHEME_VERSION_MINOR ((MZSCHEME_VERSION_Z * 1000) + MZSCHEME_VERSION_W)

View File

@ -390,6 +390,7 @@ static Scheme_Object *unsafe_start_breakable_atomic(int argc, Scheme_Object **ar
static Scheme_Object *unsafe_end_breakable_atomic(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_in_atomic_p(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_fd(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_ctx_fd_wakeup(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_ctx_eventmask_wakeup(int argc, Scheme_Object **argv);
static Scheme_Object *unsafe_poll_ctx_time_wakeup(int argc, Scheme_Object **argv);
@ -658,6 +659,7 @@ scheme_init_unsafe_thread (Scheme_Startup_Env *env)
ADD_PRIM_W_ARITY("unsafe-add-global-finalizer", unsafe_add_global_finalizer, 2, 2, env);
scheme_addto_prim_instance("unsafe-poller", scheme_unsafe_poller_proc, env);
ADD_PRIM_W_ARITY("unsafe-poll-fd", unsafe_poll_fd, 2, 3, env);
ADD_PRIM_W_ARITY("unsafe-poll-ctx-fd-wakeup", unsafe_poll_ctx_fd_wakeup, 3, 3, env);
ADD_PRIM_W_ARITY("unsafe-poll-ctx-eventmask-wakeup", unsafe_poll_ctx_eventmask_wakeup, 2, 2, env);
ADD_PRIM_W_ARITY("unsafe-poll-ctx-milliseconds-wakeup", unsafe_poll_ctx_time_wakeup, 2, 2, env);
@ -5423,6 +5425,39 @@ sch_sleep(int argc, Scheme_Object *args[])
return scheme_void;
}
Scheme_Object *unsafe_poll_fd(int argc, Scheme_Object **argv)
{
intptr_t sfd = 0;
rktio_fd_t *rfd = NULL;
int mode = 0;
int ready = 0;
int is_socket = 1;
if (!scheme_get_int_val(argv[0], &sfd))
scheme_wrong_contract("unsafe-poll-fd", "handle-integer?", 0, argc, argv);
if (SAME_OBJ(argv[1], read_symbol))
mode = RKTIO_POLL_READ;
else if (SAME_OBJ(argv[1], write_symbol))
mode = RKTIO_POLL_WRITE;
else
scheme_wrong_contract("unsafe-poll-fd", "(or/c 'read 'write)", 1, argc, argv);
if (argc > 2) {
is_socket = SCHEME_TRUEP(argv[2]);
}
rfd = rktio_system_fd(scheme_rktio, sfd, (is_socket ? RKTIO_OPEN_SOCKET : 0));
if (mode == RKTIO_POLL_READ)
ready = rktio_poll_read_ready(scheme_rktio, rfd);
else if (mode == RKTIO_POLL_WRITE)
ready = rktio_poll_write_ready(scheme_rktio, rfd);
rktio_forget(scheme_rktio, rfd);
return (ready == RKTIO_POLL_READY) ? scheme_true : scheme_false;
}
Scheme_Object *unsafe_poll_ctx_fd_wakeup(int argc, Scheme_Object **argv)
{
if (SCHEME_TRUEP(argv[0])) {