dynamic-place now pipes standard io to standard out and error much like system.

This commit is contained in:
Kevin Tew 2011-09-07 14:14:49 -06:00
parent 039f9c10f3
commit b6972a3b7f
13 changed files with 567 additions and 174 deletions

View File

@ -30,40 +30,6 @@
;; ----------------------------------------
(define (copy-port src dest . dests)
(unless (input-port? src)
(raise-type-error 'copy-port "input-port" src))
(for-each
(lambda (dest)
(unless (output-port? dest)
(raise-type-error 'copy-port "output-port" dest)))
(cons dest dests))
(let ([s (make-bytes 4096)]
[dests (cons dest dests)])
(let loop ()
(let ([c (read-bytes-avail! s src)])
(cond
[(number? c)
(let loop ([dests dests])
(unless (null? dests)
(let loop ([start 0])
(unless (= start c)
(let ([c2 (write-bytes-avail s (car dests) start c)])
(loop (+ start c2)))))
(loop (cdr dests))))
(loop)]
[(procedure? c)
(let ([v (let-values ([(l col p) (port-next-location src)])
(c (object-name src) l col p))])
(let loop ([dests dests])
(unless (null? dests)
(write-special v (car dests))
(loop (cdr dests)))))
(loop)]
[else
;; Must be EOF
(void)])))))
(define merge-input
(case-lambda
[(a b) (merge-input a b 4096)]

View File

@ -1,85 +1,125 @@
#lang racket/base
;; A few simple port functions are needed in pretty.rkt, which is
;; used by contract.rkt, which is used by port.rkt --- so we
;; break the cycle with this module.
;;
;; copy-port is used by mzlib/private/streams.rkt, which is used by
;; racket/place.rkt, which we want to load without loading contracts
;; --- so copy port is place in this module.
(module port racket/base
(provide open-output-nowhere
relocate-output-port
transplant-output-port
transplant-to-relocate)
(provide copy-port
open-output-nowhere
relocate-output-port
transplant-output-port
transplant-to-relocate)
(define open-output-nowhere
(lambda ([name 'nowhere] [specials-ok? #t])
(make-output-port
name
always-evt
(lambda (s start end non-block? breakable?) (- end start))
void
(and specials-ok?
(lambda (special non-block? breakable?) #t))
(lambda (s start end) (wrap-evt
always-evt
(lambda (x)
(- end start))))
(and specials-ok?
(lambda (special)
(wrap-evt always-evt (lambda (x) #t)))))))
(define open-output-nowhere
(lambda ([name 'nowhere] [specials-ok? #t])
(make-output-port
name
always-evt
(lambda (s start end non-block? breakable?) (- end start))
void
(and specials-ok?
(lambda (special non-block? breakable?) #t))
(lambda (s start end) (wrap-evt
always-evt
(lambda (x)
(- end start))))
(and specials-ok?
(lambda (special)
(wrap-evt always-evt (lambda (x) #t)))))))
(define (transplant-to-relocate transplant p line col pos close?)
(let-values ([(init-l init-c init-p) (port-next-location p)])
(transplant
p
(lambda ()
(let-values ([(l c p) (port-next-location p)])
(values (and l init-l (+ l (- init-l) line))
(and c init-c (if (equal? l init-l)
(+ c (- init-c) col)
c))
(and p init-p (+ p (- init-p) pos)))))
pos
close?)))
(define (transplant-to-relocate transplant p line col pos close?)
(let-values ([(init-l init-c init-p) (port-next-location p)])
(transplant
p
(lambda ()
(let-values ([(l c p) (port-next-location p)])
(values (and l init-l (+ l (- init-l) line))
(and c init-c (if (equal? l init-l)
(+ c (- init-c) col)
c))
(and p init-p (+ p (- init-p) pos)))))
pos
close?)))
(define relocate-output-port
(lambda (p line col pos [close? #t])
(transplant-to-relocate
transplant-output-port
p line col pos close?)))
(define relocate-output-port
(lambda (p line col pos [close? #t])
(transplant-to-relocate
transplant-output-port
p line col pos close?)))
(define transplant-output-port
(lambda (p location-proc pos [close? #t] [count-lines!-proc void])
(make-output-port
(object-name p)
p
(lambda (s start end nonblock? breakable?)
(let ([v ((if nonblock?
write-bytes-avail*
(if breakable?
write-bytes-avail/enable-break
write-bytes-avail))
s p start end)])
(if (and (zero? v) (not (= start end)))
(wrap-evt p (lambda (x) #f))
v)))
(lambda ()
(when close?
(close-output-port p)))
(and (port-writes-special? p)
(lambda (special nonblock? breakable?)
((if nonblock?
write-special-avail*
(if breakable?
(lambda (spec p)
(parameterize-break #t
(write-special spec p)))
write-special))
special p)))
(and (port-writes-atomic? p)
(lambda (s start end)
(write-bytes-avail-evt s p start end)))
(and (port-writes-atomic? p)
(port-writes-special? p)
(lambda (spec)
(write-special-evt spec p)))
location-proc
count-lines!-proc
pos)))
(define (copy-port src dest . dests)
(unless (input-port? src)
(raise-type-error 'copy-port "input-port" src))
(for-each
(lambda (dest)
(unless (output-port? dest)
(raise-type-error 'copy-port "output-port" dest)))
(cons dest dests))
(let ([s (make-bytes 4096)]
[dests (cons dest dests)])
(let loop ()
(let ([c (read-bytes-avail! s src)])
(cond
[(number? c)
(let loop ([dests dests])
(unless (null? dests)
(let loop ([start 0])
(unless (= start c)
(let ([c2 (write-bytes-avail s (car dests) start c)])
(loop (+ start c2)))))
(loop (cdr dests))))
(loop)]
[(procedure? c)
(let ([v (let-values ([(l col p) (port-next-location src)])
(c (object-name src) l col p))])
(let loop ([dests dests])
(unless (null? dests)
(write-special v (car dests))
(loop (cdr dests)))))
(loop)]
[else
;; Must be EOF
(void)])))))
(define transplant-output-port
(lambda (p location-proc pos [close? #t] [count-lines!-proc void])
(make-output-port
(object-name p)
p
(lambda (s start end nonblock? breakable?)
(let ([v ((if nonblock?
write-bytes-avail*
(if breakable?
write-bytes-avail/enable-break
write-bytes-avail))
s p start end)])
(if (and (zero? v) (not (= start end)))
(wrap-evt p (lambda (x) #f))
v)))
(lambda ()
(when close?
(close-output-port p)))
(and (port-writes-special? p)
(lambda (special nonblock? breakable?)
((if nonblock?
write-special-avail*
(if breakable?
(lambda (spec p)
(parameterize-break #t
(write-special spec p)))
write-special))
special p)))
(and (port-writes-atomic? p)
(lambda (s start end)
(write-bytes-avail-evt s p start end)))
(and (port-writes-atomic? p)
(port-writes-special? p)
(lambda (spec)
(write-special-evt spec p)))
location-proc
count-lines!-proc
pos))))

View File

@ -0,0 +1,69 @@
#lang racket/base
(require "port.rkt")
(provide if-stream-out
if-stream-in
streamify-in
streamify-out
pump-ports)
(define (if-stream-out who p [sym-ok? #f])
(cond [(and sym-ok? (eq? p 'stdout)) p]
[(or (not p) (and (output-port? p) (file-stream-port? p))) p]
[(output-port? p) #f]
[else (raise-type-error who
(if sym-ok?
"output port, #f, or 'stdout"
"output port or #f")
p)]))
(define (if-stream-in who p)
(cond [(or (not p) (and (input-port? p) (file-stream-port? p))) p]
[(input-port? p) #f]
[else (raise-type-error who "input port or #f" p)]))
(define (streamify-in cin in ready-for-break)
(if (and cin (not (file-stream-port? cin)))
(thread (lambda ()
(dynamic-wind
void
(lambda ()
(with-handlers ([exn:break? void])
(ready-for-break #t)
(copy-port cin in)
(ready-for-break #f)))
(lambda () (close-output-port in)))
(ready-for-break #t)))
in))
(define (streamify-out cout out)
(if (and cout
(not (eq? cout 'stdout))
(not (file-stream-port? cout)))
(thread (lambda ()
(dynamic-wind
void
(lambda () (copy-port out cout))
(lambda () (close-input-port out)))))
out))
(define (pump-ports evt pin pout perr in out err)
(define who 'pump-ports)
(define it-ready (make-semaphore))
(define inpump (streamify-in in
(if-stream-out who pin)
(lambda (ok?)
(if ok?
(semaphore-post it-ready)
(semaphore-wait it-ready)))))
(define outpump (streamify-out out (if-stream-in who pout)))
(define errpump (streamify-out err (if-stream-in who perr)))
(when (thread? inpump)
;; Wait for place to end, then stop copying input:
(thread (lambda ()
(sync evt inpump)
(semaphore-wait it-ready)
(break-thread inpump))))
(values inpump outpump errpump))

View File

@ -8,7 +8,8 @@
system/exit-code
system*/exit-code)
(require mzlib/port)
(require mzlib/port
"private/streams.rkt")
;; Helpers: ----------------------------------------
@ -33,46 +34,6 @@
(format "~a: don't know what shell to use for platform: " who)
(system-type))]))
(define (if-stream-out who p [sym-ok? #f])
(cond [(and sym-ok? (eq? p 'stdout)) p]
[(or (not p) (and (output-port? p) (file-stream-port? p))) p]
[(output-port? p) #f]
[else (raise-type-error who
(if sym-ok?
"output port, #f, or 'stdout"
"output port or #f")
p)]))
(define (if-stream-in who p)
(cond [(or (not p) (and (input-port? p) (file-stream-port? p))) p]
[(input-port? p) #f]
[else (raise-type-error who "input port or #f" p)]))
(define (streamify-in cin in ready-for-break)
(if (and cin (not (file-stream-port? cin)))
(thread (lambda ()
(dynamic-wind
void
(lambda ()
(with-handlers ([exn:break? void])
(ready-for-break #t)
(copy-port cin in)
(ready-for-break #f)))
(lambda () (close-output-port in)))
(ready-for-break #t)))
in))
(define (streamify-out cout out)
(if (and cout
(not (eq? cout 'stdout))
(not (file-stream-port? cout)))
(thread (lambda ()
(dynamic-wind
void
(lambda () (copy-port out cout))
(lambda () (close-input-port out)))))
out))
(define (check-exe who exe)
(unless (path-string? exe)
(raise-type-error who "path or string" exe))

View File

@ -7,11 +7,13 @@
racket/fixnum
racket/flonum
racket/vector
"../mzlib/private/streams.rkt"
(for-syntax racket/base
racket/syntax))
(provide dynamic-place
dynamic-place*
place-sleep
place-wait
place-kill
@ -25,6 +27,7 @@
place-channel-put/get
processor-count
place
place*
(rename-out [pl-place-enabled? place-enabled?])
place-dead-evt)
@ -125,7 +128,6 @@
(define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t)))
(define-pl dynamic-place pl-dynamic-place th-dynamic-place)
(define-pl place-sleep pl-place-sleep th-place-sleep)
(define-pl place-wait pl-place-wait th-place-wait)
(define-pl place-kill pl-place-kill th-place-kill)
@ -138,33 +140,102 @@
(define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?)
(define-pl place-dead-evt pl-place-dead-evt th-place-dead-evt)
(define-syntax-rule (define-syntax-case (N a ...) b ...)
(define-syntax (N stx)
(syntax-case stx ()
[(_ a ...) b ...])))
(define (pump-place p pin pout perr in out err)
(cond
[(pl-place-enabled?)
(define-values (t-in t-out t-err) (pump-ports (place-dead-evt p) pin pout perr in out err))
(pl-place-pumper-threads p (vector t-in t-out t-err))]
[else (void)]))
(define-syntax (place stx)
(define (dynamic-place module-path function)
(define-values (p i o e) (dynamic-place* module-path
function
#:in #f
#:out (current-output-port)
#:err (current-error-port)))
(close-output-port i)
p)
(define (dynamic-place* module-path
function
#:in [in #f]
#:out [out (current-output-port)]
#:err [err (current-error-port)])
(cond
[(pl-place-enabled?)
(define-values (p pin pout perr)
(pl-dynamic-place module-path
function
(if-stream-in 'dynamic-place in)
(if-stream-out 'dynamic-place out)
(if-stream-out 'dynamic-place err)))
(pump-place p pin pout perr in out err)
(values p
(and (not in) pin)
(and (not out) pout)
(and (not err) perr))]
[else
(define-values (inr inw ) (if in (values #f #f) (make-pipe)))
(define-values (outr outw) (if out (values #f #f) (make-pipe)))
(define-values (errr errw) (if err (values #f #f) (make-pipe)))
(parameterize ([current-input-port (or in inr)]
[current-output-port (or out outw)]
[current-error-port (or err errw)])
(values (th-dynamic-place module-path function)
(and (not in ) inw )
(and (not out) outr)
(and (not err) errr)))]))
(define-for-syntax (place-form _in _out _err _dynamic-place-func stx)
(syntax-case stx ()
[(_ ch body1 body ...)
(begin
#;(when (in-module-expansion?)
(unless (syntax-transforming-module-expression?)
(raise-syntax-error #f "can only be used in a module" stx))
(unless (identifier? #'ch)
(raise-syntax-error #f "expected an indentifier" stx #'ch))
(raise-syntax-error #f "expected an identifier" stx #'ch))
(with-syntax ([internal-def-name
(syntax-local-lift-expression #'(lambda (ch) body1 body ...))]
[func-name (generate-temporary #'place/anon)])
[func-name (generate-temporary #'place/anon)]
[in _in]
[out _out]
[err _err]
[dynamic-place-func _dynamic-place-func])
(syntax-local-lift-provide #'(rename internal-def-name func-name))
#'(place/proc (#%variable-reference) 'func-name)))]
#'(place/proc (#%variable-reference) 'func-name dynamic-place-func #:in in #:out out #:err err)))]
[(_ ch)
(raise-syntax-error #f "expected at least one body expression" stx)]))
(define (place/proc vr func-name)
(define-syntax (place stx)
(place-form #'#f #'(current-output-port) #'(current-error-port) #'dynamic-place stx))
(define-syntax (place* stx)
(syntax-case stx ()
[(_ #:in in #:out out #:err err ch body ...) (place-form #'in #'out #'err #'dynamic-place* #'('place* ch body ...))]
[(_ #:in in #:out out ch body ...) (place-form #'in #'out #'#f #'dynamic-place* #'('place* ch body ...))]
[(_ #:out out #:err err ch body ...) (place-form #'#f #'out #'err #'dynamic-place* #'('place* ch body ...))]
[(_ #:in in #:err err ch body ...) (place-form #'in #'#f #'err #'dynamic-place* #'('place* ch body ...))]
[(_ #:in in ch body ...) (place-form #'in #'#f #'#f #'dynamic-place* #'('place* ch body ...))]
[(_ #:out out ch body ...) (place-form #'#f #'out #'#f #'dynamic-place* #'('place* ch body ...))]
[(_ #:err err ch body ...) (place-form #'#f #'#f #'err #'dynamic-place* #'('place* ch body ...))]
[(_ ch body ...) (place-form #'#f #'#f #'#f #'dynamic-place* #'('place* ch body ...))]
))
(define (place/proc vr
func-name
[dynamic-place-func dynamic-place]
#:in [in #f]
#:out [out (current-output-port)]
#:err [err (current-error-port)])
(define name
(resolved-module-path-name
(variable-reference->resolved-module-path
vr)))
(when (symbol? name)
(error 'place "the current module-path-name should be a path and not a symbol (if you are in DrRacket, save the file)"))
(dynamic-place name func-name))
(error 'place "the current module-path-name is not a file path"))
(if (eq? dynamic-place-func dynamic-place)
(dynamic-place-func name func-name)
(dynamic-place-func name func-name #:in in #:out out #:err err)))

View File

@ -104,7 +104,9 @@ are simulated using @racket[thread].}
}
@defproc[(dynamic-place [module-path module-path?] [start-proc symbol?]) place?]{
@defproc[(dynamic-place [module-path module-path?]
[start-proc symbol?]
place?]{
Creates a @tech{place} to run the procedure that is identified by
@racket[module-path] and @racket[start-proc]. The result is a
@ -132,6 +134,45 @@ are simulated using @racket[thread].}
@tech{completion value} @racket[0].}
@defproc[(dynamic-place* [module-path module-path?]
[start-proc symbol?]
[#:in in (or/c input-port? #f) #f]
[#:out out (or/c output-port? #f) (current-output-port)]
[#:err err (or/c output-port? #f) (current-error-port)]
(values place? (or/c input-port? #f) (or/c output-port? #f) (or/c output-port? #f)]{
The @racket[dynamic-place*] function behaves just like the
@racket[dynamic-place] but allows the user to specify the standard
in, out, and error ports for the new place. Upon execution of
@racket[dynamic-place*], the @racket[in], @racket[out], and
@racket[err] ports become the @racket[current-input-port],
@racket[current-output-port], and @racket[current-error-port] for the
@tech{place}. Any of the ports can be @racket[#f], in which case a
system pipe is created and returned by @racket[dynamic-place*]. The
@racket[stderr] argument can be @racket['stdout], in which case the
same file-stream port or system pipe that is supplied as standard
output is also used for standard error. For each port or
@racket['stdout] that is provided, no pipe is created and the
corresponding returned value is @racket[#f]
The @racket[dynamic-place*] procedure returns four values:
@itemize[
@item{a place descriptor value representing the created place;}
@item{an output port piped to the place's standard input, or
@racket[#f] if @racket[in] was a port;}
@item{an input port piped from the place's standard output, or
@racket[#f] if @racket[out] was a port;}
@item{an input port piped from the place's standard error, or
@racket[#f] if @racket[err] was a port or @racket['stdout].}
}
@defform[(place id body ...+)]{
Creates a place that evaluates @racket[body]
expressions with @racket[id] bound to a place channel. The
@ -142,6 +183,17 @@ are simulated using @racket[thread].}
like the result of @racket[dynamic-place].
}
@defform[(place* [#:in in #f]
[#:out out (current-output-port)]
[#:err err (current-error-port)]
id
body ...+)]{
Behaves like @racket[place] and allows the user to set
the @racket[current-input-port], @racket[current-output-port], and
@racket[current-error-port] for the @tech{place}. The result of a
@racket[place*] form is analogous to the result of @racket[dynamic-place*].
}
@defproc[(place-wait [p place?]) exact-integer?]{
Returns the @tech{completion value} of the place indicated by @racket[p],

View File

@ -3,6 +3,7 @@
(require racket/place
ffi/unsafe
racket/runtime-path
rackunit
(for-syntax racket/base))
(provide main)
@ -34,9 +35,13 @@
(define bn (BN_new))
(set-BN-j1! bn 1334)
(printf "BN-j1 ~a ~v\n" (BN-j1 bn) (cpointer-tag bn))
(check-equal? (BN-j1 bn) 1334)
(check-equal? (cpointer-tag bn) 'BN)
(check-equal? BN-tag 'BN)
(printf "BN tag ~v\n" BN-tag)
(define p (place ch
(define b (place-channel-get ch))
(printf "Got it ~a\n" (BN-j1 b))))
(printf "Got it ~a\n" (BN-j1 b))
(check-equal? (BN-j1 b) 1334)))
(place-channel-put p bn)
(place-wait p))

View File

@ -0,0 +1,50 @@
#lang racket/base
(require racket/place
racket/port
rackunit)
(provide main)
(define (main)
(place-wait (place ch (printf "Hello1\n")))
(place-wait (place ch (eprintf "Hello2\n")))
(place-wait (place ch (printf "~a\n" (read)))) ; #<eof>
(let-values ([(p pin pout perr) (place* ch (printf "Hello3\n"))])
(place-wait p))
(let-values ([(p pin pout perr) (place* ch (printf "Hello4\n"))])
(copy-port pout (current-output-port))
(place-wait p))
(let-values ([(p pin pout perr) (place* #:out (current-output-port) ch (printf "Hello5\n"))])
(place-wait p))
(let-values ([(p pin pout perr) (place* #:err (current-error-port) ch (eprintf "Hello6\n")
(flush-output (current-error-port)))])
(place-wait p))
(let-values ([(p pin pout perr) (place* #:out (current-output-port) ch (printf "Hello7 ~a\n" (read)))])
(write "Again" pin)
(flush-output pin)
(place-wait p))
(let-values ([(p pin pout perr) (place* ch (write "Hello8\n"))])
(check-equal? "Hello8\n" (read pout))
(place-wait p))
(let-values ([(p pin pout perr) (place* ch (write "Hello9\n" (current-error-port)))])
(check-equal? "Hello9\n" (read perr))
(place-wait p))
(let*-values ([(pipeout pipein) (make-pipe)]
[(p pin pout perr) (place* #:out pipein
ch (write "Hello10\n")
(close-output-port (current-output-port))
(close-output-port (current-error-port)))])
(place-wait p)
(thread (lambda ()
(sync (place-dead-evt p))
(close-output-port pipein)))
(copy-port pipeout (current-output-port))
(newline)
(flush-output)))

View File

@ -55,6 +55,7 @@ static int place_val_MARK(void *p, struct NewGC *gc) {
Scheme_Place *pr = (Scheme_Place *)p;
gcMARK2(pr->channel, gc);
gcMARK2(pr->mref, gc);
gcMARK2(pr->pumper_threads, gc);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place));
@ -64,6 +65,7 @@ static int place_val_FIXUP(void *p, struct NewGC *gc) {
Scheme_Place *pr = (Scheme_Place *)p;
gcFIXUP2(pr->channel, gc);
gcFIXUP2(pr->mref, gc);
gcFIXUP2(pr->pumper_threads, gc);
return
gcBYTES_TO_WORDS(sizeof(Scheme_Place));

View File

@ -1459,6 +1459,7 @@ place_val {
Scheme_Place *pr = (Scheme_Place *)p;
gcMARK2(pr->channel, gc);
gcMARK2(pr->mref, gc);
gcMARK2(pr->pumper_threads, gc);
size:
gcBYTES_TO_WORDS(sizeof(Scheme_Place));

View File

@ -44,6 +44,7 @@ SHARED_OK mz_proc_thread *scheme_master_proc_thread;
THREAD_LOCAL_DECL(static struct Scheme_Place_Object *place_object);
THREAD_LOCAL_DECL(static uintptr_t force_gc_for_place_accounting);
static Scheme_Object *scheme_place(int argc, Scheme_Object *args[]);
static Scheme_Object *place_pumper_threads(int argc, Scheme_Object *args[]);
static Scheme_Object *place_wait(int argc, Scheme_Object *args[]);
static Scheme_Object *place_kill(int argc, Scheme_Object *args[]);
static Scheme_Object *place_break(int argc, Scheme_Object *args[]);
@ -120,7 +121,8 @@ void scheme_init_place(Scheme_Env *env)
GLOBAL_PRIM_W_ARITY("place-enabled?", scheme_place_enabled, 0, 0, plenv);
GLOBAL_PRIM_W_ARITY("place-shared?", scheme_place_shared, 1, 1, plenv);
PLACE_PRIM_W_ARITY("dynamic-place", scheme_place, 2, 2, plenv);
PLACE_PRIM_W_ARITY("dynamic-place", scheme_place, 5, 5, plenv);
PLACE_PRIM_W_ARITY("place-pumper-threads", place_pumper_threads, 1, 2, plenv);
PLACE_PRIM_W_ARITY("place-sleep", place_sleep, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-wait", place_wait, 1, 1, plenv);
PLACE_PRIM_W_ARITY("place-kill", place_kill, 1, 1, plenv);
@ -177,6 +179,9 @@ typedef struct Place_Start_Data {
struct Scheme_Place_Object *place_obj; /* malloc'ed item */
struct NewGC *parent_gc;
Scheme_Object *cust_limit;
intptr_t in;
intptr_t out;
intptr_t err;
} Place_Start_Data;
static void null_out_runtime_globals() {
@ -206,6 +211,28 @@ Scheme_Object *scheme_make_place_object() {
return (Scheme_Object *)place_obj;
}
static void close_six_fds(int *rw) {
int i;
for (i=0; i<6; i++) { if (rw[i] >= 0) scheme_close_file_fd(rw[i]); }
}
Scheme_Object *place_pumper_threads(int argc, Scheme_Object *args[]) {
Scheme_Place *place;
Scheme_Object *tmp;
place = (Scheme_Place *) args[0];
if (!SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type))
scheme_wrong_type("place-pumper-threads", "place", 0, argc, args);
if (argc == 2) {
tmp = args[1];
if (!SCHEME_VECTORP(tmp) || SCHEME_VEC_SIZE(tmp) != 3)
scheme_wrong_type("place-pumper-threads", "vector of size 3", 1, argc, args);
place->pumper_threads = tmp;
}
return place->pumper_threads;
}
Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
Scheme_Place *place;
Place_Start_Data *place_data;
@ -216,6 +243,10 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
struct NewGC *parent_gc;
Scheme_Custodian *cust;
intptr_t mem_limit;
Scheme_Object *in_arg;
Scheme_Object *out_arg;
Scheme_Object *err_arg;
int rw[6] = {-1, -1, -1, -1, -1, -1};
/* To avoid runaway place creation, check for termination before continuing. */
scheme_thread_block(0.0);
@ -254,15 +285,29 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
{
Scheme_Object *so;
in_arg = args[2];
out_arg = args[3];
err_arg = args[4];
if (!scheme_is_module_path(args[0]) && !SCHEME_PATHP(args[0])) {
scheme_wrong_type("dynamic-place", "module-path or path", 0, argc, args);
}
if (SCHEME_PAIRP(args[0]) && SAME_OBJ(SCHEME_CAR(args[0]), quote_symbol)) {
scheme_wrong_type("dynamic-place", "non-interactively defined module-path", 0, argc, args);
}
if (!SCHEME_SYMBOLP(args[1])) {
scheme_wrong_type("dynamic-place", "symbol", 1, argc, args);
}
if (SCHEME_TRUEP(in_arg) && !SCHEME_TRUEP(scheme_file_stream_port_p(1, &in_arg))) {
scheme_wrong_type("dynamic-place", "file-stream-input-port or #f", 2, argc, args);
}
if (SCHEME_TRUEP(out_arg) && !SCHEME_TRUEP(scheme_file_stream_port_p(1, &out_arg))) {
scheme_wrong_type("dynamic-place", "file-stream-output-port or #f", 3, argc, args);
}
if (SCHEME_TRUEP(err_arg) && !SCHEME_TRUEP(scheme_file_stream_port_p(1, &err_arg))) {
scheme_wrong_type("dynamic-place", "file-stream-output-port or #f", 4, argc, args);
}
if (SCHEME_PAIRP(args[0]) && SAME_OBJ(SCHEME_CAR(args[0]), quote_symbol)) {
scheme_arg_mismatch("dynamic-place", "dynamic-place works on only on filesystem module-paths", args[0]);
}
so = places_deep_copy_to_master(args[0]);
place_data->module = so;
@ -290,6 +335,65 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
place_obj->memory_limit = mem_limit;
place_obj->parent_need_gc = &force_gc_for_place_accounting;
{
intptr_t tmpfd;
int errorno;
if (SCHEME_TRUEP(in_arg)) {
scheme_get_port_file_descriptor(in_arg, &tmpfd);
tmpfd = scheme_dup_file(tmpfd);
if (tmpfd == -1) {
errorno = scheme_errno();
close_six_fds(rw);
scheme_raise_exn(MZEXN_FAIL, "dup: error duplicating file descriptor(%e)", errorno);
}
rw[0] = tmpfd;
}
else if (pipe(rw)) {
errorno = scheme_errno();
close_six_fds(rw);
scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "pipe: error creating place standard input streams (%e)", errorno);
}
if (SCHEME_TRUEP(out_arg)) {
scheme_get_port_file_descriptor(out_arg, &tmpfd);
tmpfd = scheme_dup_file(tmpfd);
if (tmpfd == -1) {
errorno = scheme_errno();
close_six_fds(rw);
scheme_raise_exn(MZEXN_FAIL, "dup: error duplicating file descriptor(%e)", errorno);
}
rw[3] = tmpfd;
}
else if (pipe(rw + 2)) {
errorno = scheme_errno();
close_six_fds(rw);
scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "pipe: error creating place standard output streams (%e)", errorno);
}
if (SCHEME_TRUEP(err_arg)) {
scheme_get_port_file_descriptor(err_arg, &tmpfd);
tmpfd = scheme_dup_file(tmpfd);
if (tmpfd == -1) {
errorno = scheme_errno();
close_six_fds(rw);
scheme_raise_exn(MZEXN_FAIL, "dup: error duplicating file descriptor(%e)", errorno);
}
rw[5] = tmpfd;
}
else if (pipe(rw + 4)) {
errorno = scheme_errno();
close_six_fds(rw);
scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "pipe: error creating place standard error streams (%e)", errorno);
}
{
place_data->in = rw[0];
place_data->out = rw[3];
place_data->err = rw[5];
}
}
/* create new place */
proc_thread = mz_proc_thread_create(place_start_proc, place_data);
@ -323,7 +427,32 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) {
GC_register_new_thread(place, cust);
#endif
return (Scheme_Object*) place;
{
Scheme_Object *a[4];
Scheme_Object *tmpport;
a[0] = (Scheme_Object *) place;
if (rw[1] >= 0) {
tmpport = scheme_make_fd_output_port(rw[1], scheme_intern_symbol("place-in"), 1, 1, 0);
a[1] = tmpport;
}
else
a[1] = scheme_false;
if (rw[2] >= 0) {
tmpport = scheme_make_fd_input_port(rw[2], scheme_intern_symbol("place-out"), 1, 1);
a[2] = tmpport;
}
else
a[2] = scheme_false;
if (rw[4] >= 0) {
tmpport = scheme_make_fd_input_port(rw[4], scheme_intern_symbol("place-err"), 1, 1);
a[3] = tmpport;
}
else
a[3] = scheme_false;
return scheme_values(4, a);
}
}
static void do_place_kill(Scheme_Place *place)
@ -859,6 +988,7 @@ static int place_wait_ready(Scheme_Object *_p) {
if (done) {
do_place_kill(p); /* sets result, frees place */
/* wait for pumper threads to finish */
return 1;
}
@ -874,6 +1004,16 @@ static Scheme_Object *place_wait(int argc, Scheme_Object *args[]) {
scheme_block_until(place_wait_ready, NULL, (Scheme_Object*)place, 0);
if (SCHEME_VECTORP(place->pumper_threads)) {
int i;
for (i=0; i<3; i++) {
Scheme_Object *tmp;
tmp = SCHEME_VEC_ELS(place->pumper_threads)[i];
if (SCHEME_THREADP(tmp))
scheme_thread_wait(tmp);
}
}
return scheme_make_integer(place->result);
}
@ -2014,6 +2154,32 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
place_obj->signal_handle = signal_handle;
}
{
Scheme_Object *tmp;
if (place_data->in >= 0) {
tmp = scheme_make_fd_input_port (place_data->in, scheme_intern_symbol("place-in"), 1, 1);
if (scheme_orig_stdin_port) {
scheme_close_input_port(scheme_orig_stdin_port);
}
scheme_orig_stdin_port = tmp;
}
if (place_data->out >= 0) {
tmp = scheme_make_fd_output_port(place_data->out, scheme_intern_symbol("place-out"), 1, 1, 0);
if (scheme_orig_stdout_port) {
scheme_close_output_port(scheme_orig_stdout_port);
}
scheme_orig_stdout_port = tmp;
}
if (place_data->err >= 0) {
tmp = scheme_make_fd_output_port(place_data->err, scheme_intern_symbol("place-err"), 1, 1, 0);
if (scheme_orig_stderr_port) {
scheme_close_output_port(scheme_orig_stderr_port);
}
scheme_orig_stderr_port = tmp;
}
scheme_init_port_config();
}
mzrt_sema_post(place_data->ready);
place_data = NULL;
# ifdef MZ_PRECISE_GC
@ -2060,6 +2226,10 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) {
scheme_log(NULL, SCHEME_LOG_DEBUG, 0, "place %d: exiting", scheme_current_place_id);
scheme_close_input_port(scheme_orig_stdin_port);
scheme_close_output_port(scheme_orig_stdout_port);
scheme_close_output_port(scheme_orig_stderr_port);
/*printf("Leavin place: proc thread id%u\n", ptid);*/
scheme_place_instance_destroy(place_obj->die);

View File

@ -535,6 +535,7 @@ Scheme_Object *scheme_get_thread_suspend(Scheme_Thread *p);
void scheme_zero_unneeded_rands(Scheme_Thread *p);
int scheme_can_break(Scheme_Thread *p);
void scheme_thread_wait(Scheme_Object *thread);
# define DO_CHECK_FOR_BREAK(p, e) \
if (DECREMENT_FUEL(scheme_fuel_counter, 1) <= 0) { \
@ -3681,6 +3682,7 @@ typedef struct Scheme_Place {
#ifdef MZ_PRECISE_GC
struct GC_Thread_Info *gc_info; /* managed by the GC */
#endif
Scheme_Object *pumper_threads; /* Scheme_Vector of scheme threads */
} Scheme_Place;
typedef struct Scheme_Place_Object {

View File

@ -2993,6 +2993,10 @@ static Scheme_Object *thread_wait(int argc, Scheme_Object *args[])
return scheme_void;
}
void scheme_thread_wait(Scheme_Object *thread) {
thread_wait(1, &thread);
}
static void register_thread_sync()
{
scheme_add_evt(scheme_thread_type,