From b6972a3b7f867a187df24bba5718107621e3729f Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Wed, 7 Sep 2011 14:14:49 -0600 Subject: [PATCH] dynamic-place now pipes standard io to standard out and error much like system. --- collects/mzlib/port.rkt | 34 ---- collects/mzlib/private/port.rkt | 196 ++++++++++++-------- collects/mzlib/private/streams.rkt | 69 +++++++ collects/mzlib/process.rkt | 43 +---- collects/racket/place.rkt | 99 ++++++++-- collects/scribblings/reference/places.scrbl | 54 +++++- collects/tests/racket/place-channel-ffi.rkt | 7 +- collects/tests/racket/place-ports.rkt | 50 +++++ src/racket/src/mzmark_place.inc | 2 + src/racket/src/mzmarksrc.c | 1 + src/racket/src/place.c | 180 +++++++++++++++++- src/racket/src/schpriv.h | 2 + src/racket/src/thread.c | 4 + 13 files changed, 567 insertions(+), 174 deletions(-) create mode 100644 collects/mzlib/private/streams.rkt create mode 100644 collects/tests/racket/place-ports.rkt diff --git a/collects/mzlib/port.rkt b/collects/mzlib/port.rkt index 7f32e6b7d5..84fb89b6cc 100644 --- a/collects/mzlib/port.rkt +++ b/collects/mzlib/port.rkt @@ -30,40 +30,6 @@ ;; ---------------------------------------- -(define (copy-port src dest . dests) - (unless (input-port? src) - (raise-type-error 'copy-port "input-port" src)) - (for-each - (lambda (dest) - (unless (output-port? dest) - (raise-type-error 'copy-port "output-port" dest))) - (cons dest dests)) - (let ([s (make-bytes 4096)] - [dests (cons dest dests)]) - (let loop () - (let ([c (read-bytes-avail! s src)]) - (cond - [(number? c) - (let loop ([dests dests]) - (unless (null? dests) - (let loop ([start 0]) - (unless (= start c) - (let ([c2 (write-bytes-avail s (car dests) start c)]) - (loop (+ start c2))))) - (loop (cdr dests)))) - (loop)] - [(procedure? c) - (let ([v (let-values ([(l col p) (port-next-location src)]) - (c (object-name src) l col p))]) - (let loop ([dests dests]) - (unless (null? dests) - (write-special v (car dests)) - (loop (cdr dests))))) - (loop)] - [else - ;; Must be EOF - (void)]))))) - (define merge-input (case-lambda [(a b) (merge-input a b 4096)] diff --git a/collects/mzlib/private/port.rkt b/collects/mzlib/private/port.rkt index d7f4892bfb..de7546e822 100644 --- a/collects/mzlib/private/port.rkt +++ b/collects/mzlib/private/port.rkt @@ -1,85 +1,125 @@ +#lang racket/base ;; A few simple port functions are needed in pretty.rkt, which is ;; used by contract.rkt, which is used by port.rkt --- so we ;; break the cycle with this module. +;; +;; copy-port is used by mzlib/private/streams.rkt, which is used by +;; racket/place.rkt, which we want to load without loading contracts +;; --- so copy port is place in this module. -(module port racket/base - (provide open-output-nowhere - relocate-output-port - transplant-output-port - transplant-to-relocate) +(provide copy-port + open-output-nowhere + relocate-output-port + transplant-output-port + transplant-to-relocate) - (define open-output-nowhere - (lambda ([name 'nowhere] [specials-ok? #t]) - (make-output-port - name - always-evt - (lambda (s start end non-block? breakable?) (- end start)) - void - (and specials-ok? - (lambda (special non-block? breakable?) #t)) - (lambda (s start end) (wrap-evt - always-evt - (lambda (x) - (- end start)))) - (and specials-ok? - (lambda (special) - (wrap-evt always-evt (lambda (x) #t))))))) +(define open-output-nowhere + (lambda ([name 'nowhere] [specials-ok? #t]) + (make-output-port + name + always-evt + (lambda (s start end non-block? breakable?) (- end start)) + void + (and specials-ok? + (lambda (special non-block? breakable?) #t)) + (lambda (s start end) (wrap-evt + always-evt + (lambda (x) + (- end start)))) + (and specials-ok? + (lambda (special) + (wrap-evt always-evt (lambda (x) #t))))))) - (define (transplant-to-relocate transplant p line col pos close?) - (let-values ([(init-l init-c init-p) (port-next-location p)]) - (transplant - p - (lambda () - (let-values ([(l c p) (port-next-location p)]) - (values (and l init-l (+ l (- init-l) line)) - (and c init-c (if (equal? l init-l) - (+ c (- init-c) col) - c)) - (and p init-p (+ p (- init-p) pos))))) - pos - close?))) +(define (transplant-to-relocate transplant p line col pos close?) + (let-values ([(init-l init-c init-p) (port-next-location p)]) + (transplant + p + (lambda () + (let-values ([(l c p) (port-next-location p)]) + (values (and l init-l (+ l (- init-l) line)) + (and c init-c (if (equal? l init-l) + (+ c (- init-c) col) + c)) + (and p init-p (+ p (- init-p) pos))))) + pos + close?))) + +(define relocate-output-port + (lambda (p line col pos [close? #t]) + (transplant-to-relocate + transplant-output-port + p line col pos close?))) + +(define transplant-output-port + (lambda (p location-proc pos [close? #t] [count-lines!-proc void]) + (make-output-port + (object-name p) + p + (lambda (s start end nonblock? breakable?) + (let ([v ((if nonblock? + write-bytes-avail* + (if breakable? + write-bytes-avail/enable-break + write-bytes-avail)) + s p start end)]) + (if (and (zero? v) (not (= start end))) + (wrap-evt p (lambda (x) #f)) + v))) + (lambda () + (when close? + (close-output-port p))) + (and (port-writes-special? p) + (lambda (special nonblock? breakable?) + ((if nonblock? + write-special-avail* + (if breakable? + (lambda (spec p) + (parameterize-break #t + (write-special spec p))) + write-special)) + special p))) + (and (port-writes-atomic? p) + (lambda (s start end) + (write-bytes-avail-evt s p start end))) + (and (port-writes-atomic? p) + (port-writes-special? p) + (lambda (spec) + (write-special-evt spec p))) + location-proc + count-lines!-proc + pos))) + +(define (copy-port src dest . dests) + (unless (input-port? src) + (raise-type-error 'copy-port "input-port" src)) + (for-each + (lambda (dest) + (unless (output-port? dest) + (raise-type-error 'copy-port "output-port" dest))) + (cons dest dests)) + (let ([s (make-bytes 4096)] + [dests (cons dest dests)]) + (let loop () + (let ([c (read-bytes-avail! s src)]) + (cond + [(number? c) + (let loop ([dests dests]) + (unless (null? dests) + (let loop ([start 0]) + (unless (= start c) + (let ([c2 (write-bytes-avail s (car dests) start c)]) + (loop (+ start c2))))) + (loop (cdr dests)))) + (loop)] + [(procedure? c) + (let ([v (let-values ([(l col p) (port-next-location src)]) + (c (object-name src) l col p))]) + (let loop ([dests dests]) + (unless (null? dests) + (write-special v (car dests)) + (loop (cdr dests))))) + (loop)] + [else + ;; Must be EOF + (void)]))))) - (define relocate-output-port - (lambda (p line col pos [close? #t]) - (transplant-to-relocate - transplant-output-port - p line col pos close?))) - - (define transplant-output-port - (lambda (p location-proc pos [close? #t] [count-lines!-proc void]) - (make-output-port - (object-name p) - p - (lambda (s start end nonblock? breakable?) - (let ([v ((if nonblock? - write-bytes-avail* - (if breakable? - write-bytes-avail/enable-break - write-bytes-avail)) - s p start end)]) - (if (and (zero? v) (not (= start end))) - (wrap-evt p (lambda (x) #f)) - v))) - (lambda () - (when close? - (close-output-port p))) - (and (port-writes-special? p) - (lambda (special nonblock? breakable?) - ((if nonblock? - write-special-avail* - (if breakable? - (lambda (spec p) - (parameterize-break #t - (write-special spec p))) - write-special)) - special p))) - (and (port-writes-atomic? p) - (lambda (s start end) - (write-bytes-avail-evt s p start end))) - (and (port-writes-atomic? p) - (port-writes-special? p) - (lambda (spec) - (write-special-evt spec p))) - location-proc - count-lines!-proc - pos)))) diff --git a/collects/mzlib/private/streams.rkt b/collects/mzlib/private/streams.rkt new file mode 100644 index 0000000000..694cc6473d --- /dev/null +++ b/collects/mzlib/private/streams.rkt @@ -0,0 +1,69 @@ +#lang racket/base + +(require "port.rkt") + +(provide if-stream-out + if-stream-in + streamify-in + streamify-out + pump-ports) + +(define (if-stream-out who p [sym-ok? #f]) + (cond [(and sym-ok? (eq? p 'stdout)) p] + [(or (not p) (and (output-port? p) (file-stream-port? p))) p] + [(output-port? p) #f] + [else (raise-type-error who + (if sym-ok? + "output port, #f, or 'stdout" + "output port or #f") + p)])) + +(define (if-stream-in who p) + (cond [(or (not p) (and (input-port? p) (file-stream-port? p))) p] + [(input-port? p) #f] + [else (raise-type-error who "input port or #f" p)])) + +(define (streamify-in cin in ready-for-break) + (if (and cin (not (file-stream-port? cin))) + (thread (lambda () + (dynamic-wind + void + (lambda () + (with-handlers ([exn:break? void]) + (ready-for-break #t) + (copy-port cin in) + (ready-for-break #f))) + (lambda () (close-output-port in))) + (ready-for-break #t))) + in)) + +(define (streamify-out cout out) + (if (and cout + (not (eq? cout 'stdout)) + (not (file-stream-port? cout))) + (thread (lambda () + (dynamic-wind + void + (lambda () (copy-port out cout)) + (lambda () (close-input-port out))))) + out)) + +(define (pump-ports evt pin pout perr in out err) + (define who 'pump-ports) + (define it-ready (make-semaphore)) + (define inpump (streamify-in in + (if-stream-out who pin) + (lambda (ok?) + (if ok? + (semaphore-post it-ready) + (semaphore-wait it-ready))))) + (define outpump (streamify-out out (if-stream-in who pout))) + (define errpump (streamify-out err (if-stream-in who perr))) + (when (thread? inpump) + ;; Wait for place to end, then stop copying input: + (thread (lambda () + (sync evt inpump) + (semaphore-wait it-ready) + (break-thread inpump)))) + + (values inpump outpump errpump)) diff --git a/collects/mzlib/process.rkt b/collects/mzlib/process.rkt index 259b91a38b..b897927f97 100644 --- a/collects/mzlib/process.rkt +++ b/collects/mzlib/process.rkt @@ -8,7 +8,8 @@ system/exit-code system*/exit-code) -(require mzlib/port) +(require mzlib/port + "private/streams.rkt") ;; Helpers: ---------------------------------------- @@ -33,46 +34,6 @@ (format "~a: don't know what shell to use for platform: " who) (system-type))])) -(define (if-stream-out who p [sym-ok? #f]) - (cond [(and sym-ok? (eq? p 'stdout)) p] - [(or (not p) (and (output-port? p) (file-stream-port? p))) p] - [(output-port? p) #f] - [else (raise-type-error who - (if sym-ok? - "output port, #f, or 'stdout" - "output port or #f") - p)])) - -(define (if-stream-in who p) - (cond [(or (not p) (and (input-port? p) (file-stream-port? p))) p] - [(input-port? p) #f] - [else (raise-type-error who "input port or #f" p)])) - -(define (streamify-in cin in ready-for-break) - (if (and cin (not (file-stream-port? cin))) - (thread (lambda () - (dynamic-wind - void - (lambda () - (with-handlers ([exn:break? void]) - (ready-for-break #t) - (copy-port cin in) - (ready-for-break #f))) - (lambda () (close-output-port in))) - (ready-for-break #t))) - in)) - -(define (streamify-out cout out) - (if (and cout - (not (eq? cout 'stdout)) - (not (file-stream-port? cout))) - (thread (lambda () - (dynamic-wind - void - (lambda () (copy-port out cout)) - (lambda () (close-input-port out))))) - out)) - (define (check-exe who exe) (unless (path-string? exe) (raise-type-error who "path or string" exe)) diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index a8bd6f8604..de20cab617 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -7,11 +7,13 @@ racket/fixnum racket/flonum racket/vector + "../mzlib/private/streams.rkt" (for-syntax racket/base racket/syntax)) (provide dynamic-place + dynamic-place* place-sleep place-wait place-kill @@ -25,6 +27,7 @@ place-channel-put/get processor-count place + place* (rename-out [pl-place-enabled? place-enabled?]) place-dead-evt) @@ -125,7 +128,6 @@ (define-syntax-rule (define-pl x p t) (define x (if (pl-place-enabled?) p t))) -(define-pl dynamic-place pl-dynamic-place th-dynamic-place) (define-pl place-sleep pl-place-sleep th-place-sleep) (define-pl place-wait pl-place-wait th-place-wait) (define-pl place-kill pl-place-kill th-place-kill) @@ -138,33 +140,102 @@ (define-pl place-message-allowed? pl-place-message-allowed? th-place-message-allowed?) (define-pl place-dead-evt pl-place-dead-evt th-place-dead-evt) -(define-syntax-rule (define-syntax-case (N a ...) b ...) - (define-syntax (N stx) - (syntax-case stx () - [(_ a ...) b ...]))) +(define (pump-place p pin pout perr in out err) + (cond + [(pl-place-enabled?) + (define-values (t-in t-out t-err) (pump-ports (place-dead-evt p) pin pout perr in out err)) + (pl-place-pumper-threads p (vector t-in t-out t-err))] + [else (void)])) -(define-syntax (place stx) +(define (dynamic-place module-path function) + (define-values (p i o e) (dynamic-place* module-path + function + #:in #f + #:out (current-output-port) + #:err (current-error-port))) + (close-output-port i) + p) + +(define (dynamic-place* module-path + function + #:in [in #f] + #:out [out (current-output-port)] + #:err [err (current-error-port)]) + (cond + [(pl-place-enabled?) + (define-values (p pin pout perr) + (pl-dynamic-place module-path + function + (if-stream-in 'dynamic-place in) + (if-stream-out 'dynamic-place out) + (if-stream-out 'dynamic-place err))) + + (pump-place p pin pout perr in out err) + (values p + (and (not in) pin) + (and (not out) pout) + (and (not err) perr))] + + [else + (define-values (inr inw ) (if in (values #f #f) (make-pipe))) + (define-values (outr outw) (if out (values #f #f) (make-pipe))) + (define-values (errr errw) (if err (values #f #f) (make-pipe))) + + (parameterize ([current-input-port (or in inr)] + [current-output-port (or out outw)] + [current-error-port (or err errw)]) + (values (th-dynamic-place module-path function) + (and (not in ) inw ) + (and (not out) outr) + (and (not err) errr)))])) + +(define-for-syntax (place-form _in _out _err _dynamic-place-func stx) (syntax-case stx () [(_ ch body1 body ...) (begin - #;(when (in-module-expansion?) + (unless (syntax-transforming-module-expression?) (raise-syntax-error #f "can only be used in a module" stx)) (unless (identifier? #'ch) - (raise-syntax-error #f "expected an indentifier" stx #'ch)) + (raise-syntax-error #f "expected an identifier" stx #'ch)) (with-syntax ([internal-def-name (syntax-local-lift-expression #'(lambda (ch) body1 body ...))] - [func-name (generate-temporary #'place/anon)]) + [func-name (generate-temporary #'place/anon)] + [in _in] + [out _out] + [err _err] + [dynamic-place-func _dynamic-place-func]) (syntax-local-lift-provide #'(rename internal-def-name func-name)) - #'(place/proc (#%variable-reference) 'func-name)))] + #'(place/proc (#%variable-reference) 'func-name dynamic-place-func #:in in #:out out #:err err)))] [(_ ch) (raise-syntax-error #f "expected at least one body expression" stx)])) -(define (place/proc vr func-name) +(define-syntax (place stx) + (place-form #'#f #'(current-output-port) #'(current-error-port) #'dynamic-place stx)) + +(define-syntax (place* stx) + (syntax-case stx () + [(_ #:in in #:out out #:err err ch body ...) (place-form #'in #'out #'err #'dynamic-place* #'('place* ch body ...))] + [(_ #:in in #:out out ch body ...) (place-form #'in #'out #'#f #'dynamic-place* #'('place* ch body ...))] + [(_ #:out out #:err err ch body ...) (place-form #'#f #'out #'err #'dynamic-place* #'('place* ch body ...))] + [(_ #:in in #:err err ch body ...) (place-form #'in #'#f #'err #'dynamic-place* #'('place* ch body ...))] + [(_ #:in in ch body ...) (place-form #'in #'#f #'#f #'dynamic-place* #'('place* ch body ...))] + [(_ #:out out ch body ...) (place-form #'#f #'out #'#f #'dynamic-place* #'('place* ch body ...))] + [(_ #:err err ch body ...) (place-form #'#f #'#f #'err #'dynamic-place* #'('place* ch body ...))] + [(_ ch body ...) (place-form #'#f #'#f #'#f #'dynamic-place* #'('place* ch body ...))] +)) + +(define (place/proc vr + func-name + [dynamic-place-func dynamic-place] + #:in [in #f] + #:out [out (current-output-port)] + #:err [err (current-error-port)]) (define name (resolved-module-path-name (variable-reference->resolved-module-path vr))) (when (symbol? name) - (error 'place "the current module-path-name should be a path and not a symbol (if you are in DrRacket, save the file)")) - (dynamic-place name func-name)) - + (error 'place "the current module-path-name is not a file path")) + (if (eq? dynamic-place-func dynamic-place) + (dynamic-place-func name func-name) + (dynamic-place-func name func-name #:in in #:out out #:err err))) diff --git a/collects/scribblings/reference/places.scrbl b/collects/scribblings/reference/places.scrbl index 7f134b56ed..41783c728a 100644 --- a/collects/scribblings/reference/places.scrbl +++ b/collects/scribblings/reference/places.scrbl @@ -104,7 +104,9 @@ are simulated using @racket[thread].} } -@defproc[(dynamic-place [module-path module-path?] [start-proc symbol?]) place?]{ +@defproc[(dynamic-place [module-path module-path?] + [start-proc symbol?] + place?]{ Creates a @tech{place} to run the procedure that is identified by @racket[module-path] and @racket[start-proc]. The result is a @@ -132,6 +134,45 @@ are simulated using @racket[thread].} @tech{completion value} @racket[0].} +@defproc[(dynamic-place* [module-path module-path?] + [start-proc symbol?] + [#:in in (or/c input-port? #f) #f] + [#:out out (or/c output-port? #f) (current-output-port)] + [#:err err (or/c output-port? #f) (current-error-port)] + (values place? (or/c input-port? #f) (or/c output-port? #f) (or/c output-port? #f)]{ + + The @racket[dynamic-place*] function behaves just like the + @racket[dynamic-place] but allows the user to specify the standard + in, out, and error ports for the new place. Upon execution of + @racket[dynamic-place*], the @racket[in], @racket[out], and + @racket[err] ports become the @racket[current-input-port], + @racket[current-output-port], and @racket[current-error-port] for the + @tech{place}. Any of the ports can be @racket[#f], in which case a + system pipe is created and returned by @racket[dynamic-place*]. The + @racket[stderr] argument can be @racket['stdout], in which case the + same file-stream port or system pipe that is supplied as standard + output is also used for standard error. For each port or + @racket['stdout] that is provided, no pipe is created and the + corresponding returned value is @racket[#f] + +The @racket[dynamic-place*] procedure returns four values: + +@itemize[ + + @item{a place descriptor value representing the created place;} + + @item{an output port piped to the place's standard input, or + @racket[#f] if @racket[in] was a port;} + + @item{an input port piped from the place's standard output, or + @racket[#f] if @racket[out] was a port;} + + @item{an input port piped from the place's standard error, or + @racket[#f] if @racket[err] was a port or @racket['stdout].} + + +} + @defform[(place id body ...+)]{ Creates a place that evaluates @racket[body] expressions with @racket[id] bound to a place channel. The @@ -142,6 +183,17 @@ are simulated using @racket[thread].} like the result of @racket[dynamic-place]. } +@defform[(place* [#:in in #f] + [#:out out (current-output-port)] + [#:err err (current-error-port)] + id + body ...+)]{ + Behaves like @racket[place] and allows the user to set + the @racket[current-input-port], @racket[current-output-port], and + @racket[current-error-port] for the @tech{place}. The result of a + @racket[place*] form is analogous to the result of @racket[dynamic-place*]. + } + @defproc[(place-wait [p place?]) exact-integer?]{ Returns the @tech{completion value} of the place indicated by @racket[p], diff --git a/collects/tests/racket/place-channel-ffi.rkt b/collects/tests/racket/place-channel-ffi.rkt index c503d32a6f..1eda9f99ca 100644 --- a/collects/tests/racket/place-channel-ffi.rkt +++ b/collects/tests/racket/place-channel-ffi.rkt @@ -3,6 +3,7 @@ (require racket/place ffi/unsafe racket/runtime-path + rackunit (for-syntax racket/base)) (provide main) @@ -34,9 +35,13 @@ (define bn (BN_new)) (set-BN-j1! bn 1334) (printf "BN-j1 ~a ~v\n" (BN-j1 bn) (cpointer-tag bn)) + (check-equal? (BN-j1 bn) 1334) + (check-equal? (cpointer-tag bn) 'BN) + (check-equal? BN-tag 'BN) (printf "BN tag ~v\n" BN-tag) (define p (place ch (define b (place-channel-get ch)) - (printf "Got it ~a\n" (BN-j1 b)))) + (printf "Got it ~a\n" (BN-j1 b)) + (check-equal? (BN-j1 b) 1334))) (place-channel-put p bn) (place-wait p)) diff --git a/collects/tests/racket/place-ports.rkt b/collects/tests/racket/place-ports.rkt new file mode 100644 index 0000000000..e4a9063f09 --- /dev/null +++ b/collects/tests/racket/place-ports.rkt @@ -0,0 +1,50 @@ +#lang racket/base +(require racket/place + racket/port + rackunit) + +(provide main) + +(define (main) + + (place-wait (place ch (printf "Hello1\n"))) + (place-wait (place ch (eprintf "Hello2\n"))) + (place-wait (place ch (printf "~a\n" (read)))) ; # + + (let-values ([(p pin pout perr) (place* ch (printf "Hello3\n"))]) + (place-wait p)) + + (let-values ([(p pin pout perr) (place* ch (printf "Hello4\n"))]) + (copy-port pout (current-output-port)) + (place-wait p)) + (let-values ([(p pin pout perr) (place* #:out (current-output-port) ch (printf "Hello5\n"))]) + (place-wait p)) + + (let-values ([(p pin pout perr) (place* #:err (current-error-port) ch (eprintf "Hello6\n") + (flush-output (current-error-port)))]) + (place-wait p)) + (let-values ([(p pin pout perr) (place* #:out (current-output-port) ch (printf "Hello7 ~a\n" (read)))]) + (write "Again" pin) + (flush-output pin) + (place-wait p)) + + (let-values ([(p pin pout perr) (place* ch (write "Hello8\n"))]) + (check-equal? "Hello8\n" (read pout)) + (place-wait p)) + + (let-values ([(p pin pout perr) (place* ch (write "Hello9\n" (current-error-port)))]) + (check-equal? "Hello9\n" (read perr)) + (place-wait p)) + + (let*-values ([(pipeout pipein) (make-pipe)] + [(p pin pout perr) (place* #:out pipein + ch (write "Hello10\n") + (close-output-port (current-output-port)) + (close-output-port (current-error-port)))]) + (place-wait p) + (thread (lambda () + (sync (place-dead-evt p)) + (close-output-port pipein))) + (copy-port pipeout (current-output-port)) + (newline) + (flush-output))) diff --git a/src/racket/src/mzmark_place.inc b/src/racket/src/mzmark_place.inc index fe45335c73..2811f23c5f 100644 --- a/src/racket/src/mzmark_place.inc +++ b/src/racket/src/mzmark_place.inc @@ -55,6 +55,7 @@ static int place_val_MARK(void *p, struct NewGC *gc) { Scheme_Place *pr = (Scheme_Place *)p; gcMARK2(pr->channel, gc); gcMARK2(pr->mref, gc); + gcMARK2(pr->pumper_threads, gc); return gcBYTES_TO_WORDS(sizeof(Scheme_Place)); @@ -64,6 +65,7 @@ static int place_val_FIXUP(void *p, struct NewGC *gc) { Scheme_Place *pr = (Scheme_Place *)p; gcFIXUP2(pr->channel, gc); gcFIXUP2(pr->mref, gc); + gcFIXUP2(pr->pumper_threads, gc); return gcBYTES_TO_WORDS(sizeof(Scheme_Place)); diff --git a/src/racket/src/mzmarksrc.c b/src/racket/src/mzmarksrc.c index d20e9ca2fa..9a26cb5463 100644 --- a/src/racket/src/mzmarksrc.c +++ b/src/racket/src/mzmarksrc.c @@ -1459,6 +1459,7 @@ place_val { Scheme_Place *pr = (Scheme_Place *)p; gcMARK2(pr->channel, gc); gcMARK2(pr->mref, gc); + gcMARK2(pr->pumper_threads, gc); size: gcBYTES_TO_WORDS(sizeof(Scheme_Place)); diff --git a/src/racket/src/place.c b/src/racket/src/place.c index 174c258dcc..8619ef9f0d 100644 --- a/src/racket/src/place.c +++ b/src/racket/src/place.c @@ -44,6 +44,7 @@ SHARED_OK mz_proc_thread *scheme_master_proc_thread; THREAD_LOCAL_DECL(static struct Scheme_Place_Object *place_object); THREAD_LOCAL_DECL(static uintptr_t force_gc_for_place_accounting); static Scheme_Object *scheme_place(int argc, Scheme_Object *args[]); +static Scheme_Object *place_pumper_threads(int argc, Scheme_Object *args[]); static Scheme_Object *place_wait(int argc, Scheme_Object *args[]); static Scheme_Object *place_kill(int argc, Scheme_Object *args[]); static Scheme_Object *place_break(int argc, Scheme_Object *args[]); @@ -120,7 +121,8 @@ void scheme_init_place(Scheme_Env *env) GLOBAL_PRIM_W_ARITY("place-enabled?", scheme_place_enabled, 0, 0, plenv); GLOBAL_PRIM_W_ARITY("place-shared?", scheme_place_shared, 1, 1, plenv); - PLACE_PRIM_W_ARITY("dynamic-place", scheme_place, 2, 2, plenv); + PLACE_PRIM_W_ARITY("dynamic-place", scheme_place, 5, 5, plenv); + PLACE_PRIM_W_ARITY("place-pumper-threads", place_pumper_threads, 1, 2, plenv); PLACE_PRIM_W_ARITY("place-sleep", place_sleep, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-wait", place_wait, 1, 1, plenv); PLACE_PRIM_W_ARITY("place-kill", place_kill, 1, 1, plenv); @@ -177,6 +179,9 @@ typedef struct Place_Start_Data { struct Scheme_Place_Object *place_obj; /* malloc'ed item */ struct NewGC *parent_gc; Scheme_Object *cust_limit; + intptr_t in; + intptr_t out; + intptr_t err; } Place_Start_Data; static void null_out_runtime_globals() { @@ -206,6 +211,28 @@ Scheme_Object *scheme_make_place_object() { return (Scheme_Object *)place_obj; } +static void close_six_fds(int *rw) { + int i; + for (i=0; i<6; i++) { if (rw[i] >= 0) scheme_close_file_fd(rw[i]); } +} + +Scheme_Object *place_pumper_threads(int argc, Scheme_Object *args[]) { + Scheme_Place *place; + Scheme_Object *tmp; + + place = (Scheme_Place *) args[0]; + if (!SAME_TYPE(SCHEME_TYPE(args[0]), scheme_place_type)) + scheme_wrong_type("place-pumper-threads", "place", 0, argc, args); + + if (argc == 2) { + tmp = args[1]; + if (!SCHEME_VECTORP(tmp) || SCHEME_VEC_SIZE(tmp) != 3) + scheme_wrong_type("place-pumper-threads", "vector of size 3", 1, argc, args); + place->pumper_threads = tmp; + } + return place->pumper_threads; +} + Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { Scheme_Place *place; Place_Start_Data *place_data; @@ -216,6 +243,10 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { struct NewGC *parent_gc; Scheme_Custodian *cust; intptr_t mem_limit; + Scheme_Object *in_arg; + Scheme_Object *out_arg; + Scheme_Object *err_arg; + int rw[6] = {-1, -1, -1, -1, -1, -1}; /* To avoid runaway place creation, check for termination before continuing. */ scheme_thread_block(0.0); @@ -254,15 +285,29 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { { Scheme_Object *so; + in_arg = args[2]; + out_arg = args[3]; + err_arg = args[4]; + if (!scheme_is_module_path(args[0]) && !SCHEME_PATHP(args[0])) { scheme_wrong_type("dynamic-place", "module-path or path", 0, argc, args); } - if (SCHEME_PAIRP(args[0]) && SAME_OBJ(SCHEME_CAR(args[0]), quote_symbol)) { - scheme_wrong_type("dynamic-place", "non-interactively defined module-path", 0, argc, args); - } if (!SCHEME_SYMBOLP(args[1])) { scheme_wrong_type("dynamic-place", "symbol", 1, argc, args); } + if (SCHEME_TRUEP(in_arg) && !SCHEME_TRUEP(scheme_file_stream_port_p(1, &in_arg))) { + scheme_wrong_type("dynamic-place", "file-stream-input-port or #f", 2, argc, args); + } + if (SCHEME_TRUEP(out_arg) && !SCHEME_TRUEP(scheme_file_stream_port_p(1, &out_arg))) { + scheme_wrong_type("dynamic-place", "file-stream-output-port or #f", 3, argc, args); + } + if (SCHEME_TRUEP(err_arg) && !SCHEME_TRUEP(scheme_file_stream_port_p(1, &err_arg))) { + scheme_wrong_type("dynamic-place", "file-stream-output-port or #f", 4, argc, args); + } + + if (SCHEME_PAIRP(args[0]) && SAME_OBJ(SCHEME_CAR(args[0]), quote_symbol)) { + scheme_arg_mismatch("dynamic-place", "dynamic-place works on only on filesystem module-paths", args[0]); + } so = places_deep_copy_to_master(args[0]); place_data->module = so; @@ -290,6 +335,65 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { place_obj->memory_limit = mem_limit; place_obj->parent_need_gc = &force_gc_for_place_accounting; + { + intptr_t tmpfd; + int errorno; + + if (SCHEME_TRUEP(in_arg)) { + scheme_get_port_file_descriptor(in_arg, &tmpfd); + tmpfd = scheme_dup_file(tmpfd); + if (tmpfd == -1) { + errorno = scheme_errno(); + close_six_fds(rw); + scheme_raise_exn(MZEXN_FAIL, "dup: error duplicating file descriptor(%e)", errorno); + } + rw[0] = tmpfd; + } + else if (pipe(rw)) { + errorno = scheme_errno(); + close_six_fds(rw); + scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "pipe: error creating place standard input streams (%e)", errorno); + } + + if (SCHEME_TRUEP(out_arg)) { + scheme_get_port_file_descriptor(out_arg, &tmpfd); + tmpfd = scheme_dup_file(tmpfd); + if (tmpfd == -1) { + errorno = scheme_errno(); + close_six_fds(rw); + scheme_raise_exn(MZEXN_FAIL, "dup: error duplicating file descriptor(%e)", errorno); + } + rw[3] = tmpfd; + } + else if (pipe(rw + 2)) { + errorno = scheme_errno(); + close_six_fds(rw); + scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "pipe: error creating place standard output streams (%e)", errorno); + } + + if (SCHEME_TRUEP(err_arg)) { + scheme_get_port_file_descriptor(err_arg, &tmpfd); + tmpfd = scheme_dup_file(tmpfd); + if (tmpfd == -1) { + errorno = scheme_errno(); + close_six_fds(rw); + scheme_raise_exn(MZEXN_FAIL, "dup: error duplicating file descriptor(%e)", errorno); + } + rw[5] = tmpfd; + } + else if (pipe(rw + 4)) { + errorno = scheme_errno(); + close_six_fds(rw); + scheme_raise_exn(MZEXN_FAIL_FILESYSTEM, "pipe: error creating place standard error streams (%e)", errorno); + } + + { + place_data->in = rw[0]; + place_data->out = rw[3]; + place_data->err = rw[5]; + } + } + /* create new place */ proc_thread = mz_proc_thread_create(place_start_proc, place_data); @@ -323,7 +427,32 @@ Scheme_Object *scheme_place(int argc, Scheme_Object *args[]) { GC_register_new_thread(place, cust); #endif - return (Scheme_Object*) place; + { + Scheme_Object *a[4]; + Scheme_Object *tmpport; + a[0] = (Scheme_Object *) place; + if (rw[1] >= 0) { + tmpport = scheme_make_fd_output_port(rw[1], scheme_intern_symbol("place-in"), 1, 1, 0); + a[1] = tmpport; + } + else + a[1] = scheme_false; + + if (rw[2] >= 0) { + tmpport = scheme_make_fd_input_port(rw[2], scheme_intern_symbol("place-out"), 1, 1); + a[2] = tmpport; + } + else + a[2] = scheme_false; + + if (rw[4] >= 0) { + tmpport = scheme_make_fd_input_port(rw[4], scheme_intern_symbol("place-err"), 1, 1); + a[3] = tmpport; + } + else + a[3] = scheme_false; + return scheme_values(4, a); + } } static void do_place_kill(Scheme_Place *place) @@ -859,6 +988,7 @@ static int place_wait_ready(Scheme_Object *_p) { if (done) { do_place_kill(p); /* sets result, frees place */ + /* wait for pumper threads to finish */ return 1; } @@ -874,6 +1004,16 @@ static Scheme_Object *place_wait(int argc, Scheme_Object *args[]) { scheme_block_until(place_wait_ready, NULL, (Scheme_Object*)place, 0); + if (SCHEME_VECTORP(place->pumper_threads)) { + int i; + for (i=0; i<3; i++) { + Scheme_Object *tmp; + tmp = SCHEME_VEC_ELS(place->pumper_threads)[i]; + if (SCHEME_THREADP(tmp)) + scheme_thread_wait(tmp); + } + } + return scheme_make_integer(place->result); } @@ -2014,6 +2154,32 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { place_obj->signal_handle = signal_handle; } + { + Scheme_Object *tmp; + if (place_data->in >= 0) { + tmp = scheme_make_fd_input_port (place_data->in, scheme_intern_symbol("place-in"), 1, 1); + if (scheme_orig_stdin_port) { + scheme_close_input_port(scheme_orig_stdin_port); + } + scheme_orig_stdin_port = tmp; + } + if (place_data->out >= 0) { + tmp = scheme_make_fd_output_port(place_data->out, scheme_intern_symbol("place-out"), 1, 1, 0); + if (scheme_orig_stdout_port) { + scheme_close_output_port(scheme_orig_stdout_port); + } + scheme_orig_stdout_port = tmp; + } + if (place_data->err >= 0) { + tmp = scheme_make_fd_output_port(place_data->err, scheme_intern_symbol("place-err"), 1, 1, 0); + if (scheme_orig_stderr_port) { + scheme_close_output_port(scheme_orig_stderr_port); + } + scheme_orig_stderr_port = tmp; + } + scheme_init_port_config(); + } + mzrt_sema_post(place_data->ready); place_data = NULL; # ifdef MZ_PRECISE_GC @@ -2060,6 +2226,10 @@ static void *place_start_proc_after_stack(void *data_arg, void *stack_base) { scheme_log(NULL, SCHEME_LOG_DEBUG, 0, "place %d: exiting", scheme_current_place_id); + scheme_close_input_port(scheme_orig_stdin_port); + scheme_close_output_port(scheme_orig_stdout_port); + scheme_close_output_port(scheme_orig_stderr_port); + /*printf("Leavin place: proc thread id%u\n", ptid);*/ scheme_place_instance_destroy(place_obj->die); diff --git a/src/racket/src/schpriv.h b/src/racket/src/schpriv.h index 6933356cd5..0c204d67fe 100644 --- a/src/racket/src/schpriv.h +++ b/src/racket/src/schpriv.h @@ -535,6 +535,7 @@ Scheme_Object *scheme_get_thread_suspend(Scheme_Thread *p); void scheme_zero_unneeded_rands(Scheme_Thread *p); int scheme_can_break(Scheme_Thread *p); +void scheme_thread_wait(Scheme_Object *thread); # define DO_CHECK_FOR_BREAK(p, e) \ if (DECREMENT_FUEL(scheme_fuel_counter, 1) <= 0) { \ @@ -3681,6 +3682,7 @@ typedef struct Scheme_Place { #ifdef MZ_PRECISE_GC struct GC_Thread_Info *gc_info; /* managed by the GC */ #endif + Scheme_Object *pumper_threads; /* Scheme_Vector of scheme threads */ } Scheme_Place; typedef struct Scheme_Place_Object { diff --git a/src/racket/src/thread.c b/src/racket/src/thread.c index 362f6ce958..1e0901e44f 100644 --- a/src/racket/src/thread.c +++ b/src/racket/src/thread.c @@ -2993,6 +2993,10 @@ static Scheme_Object *thread_wait(int argc, Scheme_Object *args[]) return scheme_void; } +void scheme_thread_wait(Scheme_Object *thread) { + thread_wait(1, &thread); +} + static void register_thread_sync() { scheme_add_evt(scheme_thread_type,