Create racket/place/dynamic to reduce dependencies.

Also adjust implementation of th-places slightly to avoid startup
time dependencies.
This commit is contained in:
Sam Tobin-Hochstadt 2020-11-24 14:44:23 -05:00 committed by Matthew Flatt
parent ef96d98b04
commit 1173006212
7 changed files with 277 additions and 224 deletions

View File

@ -33,23 +33,33 @@
(provide note-lib)
(define-syntax note-lib
(syntax-rules ()
[(_ lib #:use-sources (src ...) . more)
[(_ lib #:more-libs (lib+ ...) #:use-sources (src ...) . more)
(begin
(declare-exporting lib racket #:use-sources (src ...))
(defmodule*/no-declare (lib)
(declare-exporting lib lib+ ... racket #:use-sources (src ...))
(defmodule*/no-declare (lib lib+ ...)
(t (make-collect-element
#f null
(lambda (ci)
(collect-put! ci `(racket-extra-lib ,'lib) (racketmodname lib))))
"The bindings documented in this section are provided by the "
(racketmodname lib)
" and "
(combine-library-names (racketmodname lib) (racketmodname lib+) ...) ; includes trailing space
"and "
(racketmodname racket)
" libraries, but not " (racketmodname racket/base)
"."
. more)))]
[(_ lib #:more-libs (lib+ ...) . more)
(note-lib lib #:more-libs (lib+ ...) #:use-sources () . more)]
[(_ lib #:use-sources (src ...) . more)
(note-lib lib #:more-libs () #:use-sources (src ...) . more)]
[(_ lib . more)
(note-lib lib #:use-sources () . more)]))
(note-lib lib #:more-libs () #:use-sources () . more)]))
(define (combine-library-names lib . lib+s)
(if (null? lib+s)
(list lib " ")
(for/list ([lib (in-list (cons lib lib+s))])
(list lib ", "))))
(provide note-init-lib)
(define-syntax note-init-lib

View File

@ -18,7 +18,7 @@
@guideintro["effective-places"]{places}
@note-lib[racket/place]
@note-lib[racket/place #:more-libs (racket/place/dynamic) #:use-sources(racket/place)]
@tech{Places} enable the development of parallel programs that
take advantage of machines with multiple processors, cores, or
@ -246,46 +246,6 @@ The @racket[dynamic-place*] procedure returns four values:
The @racket[dynamic-place*] binding is protected in the same way as
@racket[dynamic-place].}
@defform[(place id body ...+)]{
Creates a place that evaluates @racket[body]
expressions with @racket[id] bound to a place channel. The
@racket[body]s close only over @racket[id] plus the top-level
bindings of the enclosing module, because the
@racket[body]s are lifted to a submodule.
The result of @racket[place] is a place descriptor,
like the result of @racket[dynamic-place].
The generated submodule has the name @racketidfont{place-body-@racket[_n]}
for an integer @racket[_n], and the submodule exports a @racket[main]
function that takes a place channel for the new place. The submodule
is not intended for use, however, except by the expansion of the
@racket[place] form.
The @racket[place] binding is protected in the same way as
@racket[dynamic-place].}
@defform/subs[(place* maybe-port ...
id
body ...+)
([maybe-port code:blank
(code:line #:in in-expr)
(code:line #:out out-expr)
(code:line #:err err-expr)])]{
Like @racket[place], but supports optional @racket[#:in], @racket[#:out],
and @racket[#:err] expressions (at most one of each) to specify ports in the same way and
with the same defaults as @racket[dynamic-place*]. The result of
a @racket[place*] form is also the same as for @racket[dynamic-place*].
The @racket[place*] binding is protected in the same way as
@racket[dynamic-place].}
@defform[(place/context id body ...+)]{
Like @racket[place], but @racket[body ...] may have free lexical
variables, which are automatically sent to the newly-created place.
Note that these variables must have values accepted by
@racket[place-message-allowed?], otherwise an @exnraise[exn:fail:contract].
}
@defproc[(place-wait [p place?]) exact-integer?]{
Returns the @tech{completion value} of the place indicated by @racket[p],
@ -322,7 +282,6 @@ If any pumping threads were created to connect a non-@tech{file-stream
Sends the main thread of place @racket[p] a break; see @secref["breakhandler"].
}
@defproc[(place-channel) (values place-channel? place-channel?)]{
Returns two @tech{place channels}. Data sent through the first
@ -427,6 +386,56 @@ A @tech{place location} can be passed as the @racket[#:at] argument to
A distributed places note created with @racket[create-place-node]
is an example of a @tech{place location}.}
@section[#:tag "places-syntax"]{Syntactic Support for Using Places}
@declare-exporting[racket/place]
The bindings in this section are @emph{not} provided by
@racketmodname[racket/place/dynamic].
@defform[(place id body ...+)]{
Creates a place that evaluates @racket[body]
expressions with @racket[id] bound to a place channel. The
@racket[body]s close only over @racket[id] plus the top-level
bindings of the enclosing module, because the
@racket[body]s are lifted to a submodule.
The result of @racket[place] is a place descriptor,
like the result of @racket[dynamic-place].
The generated submodule has the name @racketidfont{place-body-@racket[_n]}
for an integer @racket[_n], and the submodule exports a @racket[main]
function that takes a place channel for the new place. The submodule
is not intended for use, however, except by the expansion of the
@racket[place] form.
The @racket[place] binding is protected in the same way as
@racket[dynamic-place].}
@defform/subs[(place* maybe-port ...
id
body ...+)
([maybe-port code:blank
(code:line #:in in-expr)
(code:line #:out out-expr)
(code:line #:err err-expr)])]{
Like @racket[place], but supports optional @racket[#:in], @racket[#:out],
and @racket[#:err] expressions (at most one of each) to specify ports in the same way and
with the same defaults as @racket[dynamic-place*]. The result of
a @racket[place*] form is also the same as for @racket[dynamic-place*].
The @racket[place*] binding is protected in the same way as
@racket[dynamic-place].}
@defform[(place/context id body ...+)]{
Like @racket[place], but @racket[body ...] may have free lexical
variables, which are automatically sent to the newly-created place.
Note that these variables must have values accepted by
@racket[place-message-allowed?], otherwise an @exnraise[exn:fail:contract].
}
@;------------------------------------------------------------------------
@include-section["places-logging.scrbl"]

View File

@ -1,171 +1,19 @@
#lang racket/base
(require (prefix-in pl- '#%place)
(only-in '#%paramz parameterization-key)
(only-in '#%futures processor-count)
'#%place-struct
racket/fixnum
racket/flonum
racket/vector
racket/place/private/th-place
racket/place/private/prop
racket/private/streams
racket/match
racket/runtime-path
racket/place/dynamic
(only-in "private/place.rkt"
start-place
start-place*)
(for-syntax racket/base
racket/syntax
syntax/parse
syntax/free-vars))
(provide (protect-out dynamic-place
dynamic-place*)
place-wait
place-kill
(rename-out [place-break/opt place-break])
place-channel
place-channel-put
place-channel-get
place-channel?
place?
place-message-allowed?
place-channel-put/get
processor-count
(protect-out place
place*)
place/context
(rename-out [pl-place-enabled? place-enabled?])
place-dead-evt
place-location?
prop:place-location)
(provide (all-from-out racket/place/dynamic)
(protect-out place place*)
place/context)
(define-syntax (define-pl-func stx)
(syntax-case stx ()
[(_ func p args ...)
(with-syntax [(func-sym #'(quote func))
(pl-func (string->symbol (string-append "pl-" (symbol->string (syntax->datum #'func)))))
(th-func (string->symbol (string-append "th-" (symbol->string (syntax->datum #'func)))))]
#'(define (func p args ...)
(cond
[(prop:place? p) ((prop:place-ref p) func-sym p args ...)]
[(pl-place-enabled?) (pl-func p args ...)]
[else (th-func p args ...)])))]))
(define (place-channel-put/get ch msg)
(place-channel-put ch msg)
(place-channel-get ch))
(define place-channel (if (pl-place-enabled?) pl-place-channel th-place-channel))
(define-pl-func place-wait p)
(define-pl-func place-kill p)
(define-pl-func place-break p kind)
(define-pl-func place-channel-put p msg)
(define-pl-func place-channel-get p)
(define-pl-func place-channel? p)
(define-pl-func place? p)
(define-pl-func place-message-allowed? p)
(define-pl-func place-dead-evt p)
(define-values (prop:place-location place-location? place-location-ref)
(make-struct-type-property 'place-location
(lambda (v info)
(unless (and (procedure? v)
(procedure-arity-includes? v 4))
(raise-argument-error 'guard-for-prop:place-location
"(procedure-arity-includes/c 4)"
v))
v)))
(define place-break/opt
(let ([place-break (lambda (p [kind #f]) (place-break p kind))])
place-break))
(define (pump-place p pin pout perr in out err)
(cond
[(pl-place-enabled?)
(define-values (t-in t-out t-err) (pump-ports (place-dead-evt p) pin pout perr in out err))
(pl-place-pumper-threads p (vector t-in t-out t-err))]
[else (void)]))
(define (dynamic-place module-path function #:at [node #f] #:named [named #f])
(cond
[node
(unless (place-location? node)
(raise-argument-error 'dynamic-place "(or/c place-location? #f)" node))
((place-location-ref node) node module-path function named)]
[else
(start-place 'dynamic-place module-path function
#f (current-output-port) (current-error-port))]))
(define (dynamic-place* module-path
function
#:in [in #f]
#:out [out (current-output-port)]
#:err [err (current-error-port)])
(start-place* 'dynamic-place* module-path function in out err))
(define (start-place who module-path function in out err)
(define-values (p i o e) (start-place* who
module-path
function
in
out
err))
(close-output-port i)
p)
(define (start-place* who module-path function in out err)
;; Duplicate checks in that are in the primitive `pl-dynamic-place',
;; unfortunately, but we want these checks before we start making
;; stream-pumping threads, etc.
(unless (or (module-path? module-path) (path? module-path))
(raise-argument-error who "(or/c module-path? path?)" module-path))
(unless (symbol? function)
(raise-argument-error who "symbol?" function))
(unless (or (not in) (input-port? in))
(raise-argument-error who "(or/c input-port? #f)" in))
(unless (or (not out) (output-port? out))
(raise-argument-error who "(or/c output-port? #f)" out))
(unless (or (not err) (output-port? err) (eq? err 'stdout))
(raise-argument-error who "(or/c output-port? #f 'stdout)" err))
(when (and (pair? module-path) (eq? (car module-path) 'quote)
(not (module-predefined? module-path)))
(raise-arguments-error who "not a filesystem or predefined module path"
"module path" module-path))
(when (and (input-port? in) (port-closed? in))
(raise-arguments-error who "input port is closed" "port" in))
(when (and (output-port? out) (port-closed? out))
(raise-arguments-error who "output port is closed" "port" out))
(when (and (output-port? err) (port-closed? err))
(raise-arguments-error who "error port is closed" "port" err))
(cond
[(pl-place-enabled?)
(define-values (p pin pout perr)
(pl-dynamic-place module-path
function
(if-stream-in who in)
(if-stream-out who out)
(if-stream-out who err)))
(pump-place p pin pout perr in out err)
(values p
(and (not in) pin)
(and (not out) pout)
(and (not err) perr))]
[else
(define-values (inr inw ) (if in (values #f #f) (make-pipe)))
(define-values (outr outw) (if out (values #f #f) (make-pipe)))
(define-values (errr errw) (if err (values #f #f) (make-pipe)))
(parameterize ([current-input-port (or in inr)]
[current-output-port (or out outw)]
[current-error-port (or err errw)])
(values (th-dynamic-place module-path function)
(and (not in ) inw )
(and (not out) outr)
(and (not err) errr)))]))
(define-for-syntax place-body-counter 0)

View File

@ -0,0 +1,6 @@
#lang racket/base
(require "../private/place.rkt")
(provide (except-out (all-from-out "../private/place.rkt")
start-place
start-place*))

View File

@ -1,8 +1,8 @@
#lang racket/base
#lang s-exp racket/kernel
(provide prop:place
prop:place?
prop:place-ref)
(#%provide prop:place
prop:place?
prop:place-ref)
(define-values (prop:place prop:place? prop:place-ref)
(make-struct-type-property 'place))

View File

@ -2,13 +2,11 @@
(require (prefix-in pl- '#%place)
'#%boot
(only-in '#%paramz parameterization-key)
(only-in '#%unsafe unsafe-make-custodian-at-root)
'#%place-struct
racket/fixnum
racket/flonum
racket/vector
(only-in ffi/unsafe cpointer?)
racket/tcp)
'#%flfxnum
(only-in '#%unsafe unsafe-make-custodian-at-root)
(only-in '#%foreign cpointer?)
(only-in '#%network tcp-port? tcp-listener?))
(provide th-dynamic-place
;th-dynamic-place*
@ -25,6 +23,17 @@
th-place-dead-evt
)
(define-syntax-rule (copiers fXvector-copy! fXvector-set! fXvector-ref)
(define (fXvector-copy! vec dest-start flv start end)
(let ([len (- end start)])
(for ([i (in-range len)])
(fXvector-set! vec (+ i dest-start)
(fXvector-ref flv (+ i start)))))))
(copiers fxvector-copy! fxvector-set! fxvector-ref)
(copiers flvector-copy! flvector-set! flvector-ref)
(define-struct TH-place (th ch cust cust-box result-box)
#:property prop:evt (lambda (x) (TH-place-channel-in (TH-place-ch x))))
@ -111,19 +120,33 @@
[(cond
[(path-for-some-system? o) o]
[(bytes? o) (if (pl-place-shared? o) o (record o (bytes-copy o)))]
[(fxvector? o) (if (pl-place-shared? o) o (record o (fxvector-copy o)))]
[(flvector? o) (if (pl-place-shared? o) o (record o (flvector-copy o)))]
[(fxvector? o) (if (pl-place-shared? o)
o
(let* ([c (make-fxvector (fxvector-length o))])
(fxvector-copy! c 0 o 0 (fxvector-length o))
(record o c)))]
[(flvector? o) (if (pl-place-shared? o)
o
(let* ([c (make-flvector (flvector-length o))])
(flvector-copy! c 0 o 0 (flvector-length o))
(record o c)))]
[else #f])
=> values]
[(TH-place? o) (dcw (TH-place-ch o))]
[(pair? o)
[(pair? o)
(with-placeholder
o
(lambda ()
(cons (dcw (car o)) (dcw (cdr o)))))]
[(vector? o)
(vector-map! dcw (record o (vector-copy o)))]
[(hash? o)
[(vector? o)
(define new-v (make-vector (vector-length o)))
(vector-copy! new-v 0 o)
(define r (record o new-v))
(for ([i (in-naturals)]
[v (in-vector r)])
(vector-set! new-v i (dcw v)))
r]
[(hash? o)
(with-placeholder
o
(lambda ()
@ -183,7 +206,7 @@
[(ormap (lambda (x) (x o)) (list number? char? boolean? null? void? string? symbol? keyword? TH-place-channel?
path? bytes? fxvector? flvector? TH-place?)) #t]
[(pair? o) (and (dcw (car o)) (dcw (cdr o)))]
[(vector? o)
[(vector? o)
(for/fold ([nh #t]) ([i (in-vector o)])
(and nh (dcw i)))]
[(hash? o)

View File

@ -0,0 +1,157 @@
#lang racket/base
(require (prefix-in pl- '#%place)
(only-in '#%futures processor-count)
racket/place/private/th-place
racket/place/private/prop
racket/private/streams
(for-syntax racket/base racket/syntax))
;; This module is mostly re-exported by `racket/place/dynamic`
(provide (protect-out dynamic-place
dynamic-place*
start-place
start-place*)
place-wait
place-kill
(rename-out [place-break/opt place-break])
place-channel
place-channel-put
place-channel-get
place-channel?
place?
place-message-allowed?
place-channel-put/get
processor-count
(rename-out [pl-place-enabled? place-enabled?])
place-dead-evt
place-location?
prop:place-location)
(define-syntax (define-pl-func stx)
(syntax-case stx ()
[(_ func p args ...)
(with-syntax [(func-sym #'(quote func))
(pl-func (format-id #'here "pl-~a" #'func))
(th-func (format-id #'here "th-~a" #'func))]
#'(define (func p args ...)
(cond
[(prop:place? p) ((prop:place-ref p) func-sym p args ...)]
[(pl-place-enabled?) (pl-func p args ...)]
[else (th-func p args ...)])))]))
(define (place-channel-put/get ch msg)
(place-channel-put ch msg)
(place-channel-get ch))
(define place-channel (if (pl-place-enabled?) pl-place-channel th-place-channel))
(define-pl-func place-wait p)
(define-pl-func place-kill p)
(define-pl-func place-break p kind)
(define-pl-func place-channel-put p msg)
(define-pl-func place-channel-get p)
(define-pl-func place-channel? p)
(define-pl-func place? p)
(define-pl-func place-message-allowed? p)
(define-pl-func place-dead-evt p)
(define-values (prop:place-location place-location? place-location-ref)
(make-struct-type-property 'place-location
(lambda (v info)
(unless (and (procedure? v)
(procedure-arity-includes? v 4))
(raise-argument-error 'guard-for-prop:place-location
"(procedure-arity-includes/c 4)"
v))
v)))
(define place-break/opt
(let ([place-break (lambda (p [kind #f]) (place-break p kind))])
place-break))
(define (pump-place p pin pout perr in out err)
(cond
[(pl-place-enabled?)
(define-values (t-in t-out t-err) (pump-ports (place-dead-evt p) pin pout perr in out err))
(pl-place-pumper-threads p (vector t-in t-out t-err))]
[else (void)]))
(define (dynamic-place module-path function #:at [node #f] #:named [named #f])
(cond
[node
(unless (place-location? node)
(raise-argument-error 'dynamic-place "(or/c place-location? #f)" node))
((place-location-ref node) node module-path function named)]
[else
(start-place 'dynamic-place module-path function
#f (current-output-port) (current-error-port))]))
(define (dynamic-place* module-path
function
#:in [in #f]
#:out [out (current-output-port)]
#:err [err (current-error-port)])
(start-place* 'dynamic-place* module-path function in out err))
(define (start-place who module-path function in out err)
(define-values (p i o e) (start-place* who
module-path
function
in
out
err))
(close-output-port i)
p)
(define (start-place* who module-path function in out err)
;; Duplicate checks in that are in the primitive `pl-dynamic-place',
;; unfortunately, but we want these checks before we start making
;; stream-pumping threads, etc.
(unless (or (module-path? module-path) (path? module-path))
(raise-argument-error who "(or/c module-path? path?)" module-path))
(unless (symbol? function)
(raise-argument-error who "symbol?" function))
(unless (or (not in) (input-port? in))
(raise-argument-error who "(or/c input-port? #f)" in))
(unless (or (not out) (output-port? out))
(raise-argument-error who "(or/c output-port? #f)" out))
(unless (or (not err) (output-port? err) (eq? err 'stdout))
(raise-argument-error who "(or/c output-port? #f 'stdout)" err))
(when (and (pair? module-path) (eq? (car module-path) 'quote)
(not (module-predefined? module-path)))
(raise-arguments-error who "not a filesystem or predefined module path"
"module path" module-path))
(when (and (input-port? in) (port-closed? in))
(raise-arguments-error who "input port is closed" "port" in))
(when (and (output-port? out) (port-closed? out))
(raise-arguments-error who "output port is closed" "port" out))
(when (and (output-port? err) (port-closed? err))
(raise-arguments-error who "error port is closed" "port" err))
(cond
[(pl-place-enabled?)
(define-values (p pin pout perr)
(pl-dynamic-place module-path
function
(if-stream-in who in)
(if-stream-out who out)
(if-stream-out who err)))
(pump-place p pin pout perr in out err)
(values p
(and (not in) pin)
(and (not out) pout)
(and (not err) perr))]
[else
(define-values (inr inw ) (if in (values #f #f) (make-pipe)))
(define-values (outr outw) (if out (values #f #f) (make-pipe)))
(define-values (errr errw) (if err (values #f #f) (make-pipe)))
(parameterize ([current-input-port (or in inr)]
[current-output-port (or out outw)]
[current-error-port (or err errw)])
(values (th-dynamic-place module-path function)
(and (not in ) inw )
(and (not out) outr)
(and (not err) errr)))]))