original commit: fadba081443f887e44302ab0bcbc469af0554673
This commit is contained in:
Matthew Flatt 2001-04-06 22:35:43 +00:00
parent 32b27e5ac2
commit b7bda087b1
2 changed files with 153 additions and 47 deletions

112
collects/mzlib/process.ss Normal file
View File

@ -0,0 +1,112 @@
(module process mzscheme
(provide process
process*
process/ports
process*/ports
system
system*)
(require (lib "thread.ss"))
;; Helpers: ----------------------------------------
(define (shell-path/args)
(case (system-type)
((unix) '("/bin/sh" "-c"))
((windows) (let ([d (find-system-path 'sys-dir)])
(let ([cmd (build-path d "cmd.exe")])
(if (file-exists? cmd)
cmd
(build-path d "command.com")))))
(else (error "don't know what shell to use for ~e." (system-type)))))
(define (if-stream-out p)
(if (file-stream-port? p)
p
(if (output-port? p)
#f
(raise-type-error
'subprocess
"output port"
p))))
(define (if-stream-in p)
(if (file-stream-port? p)
p
(if (input-port? p)
#f
(raise-type-error
'subprocess
"input port"
p))))
(define (streamify-in cin in)
(if (and cin (not (file-stream-port? cin)))
(begin
(thread (lambda ()
(copy-port cin in)
(close-output-port in)))
#f)
in))
(define (streamify-out cout out)
(if (and cout (not (file-stream-port? cout)))
(begin
(thread (lambda () (copy-port out cout)))
#f)
out))
;; Old-style functions: ----------------------------------------
(define (process*/ports cout cin cerr exe . args)
(let-values ([(subp out in err pid) (apply subprocess
(if-stream-out cout)
(if-stream-in cin)
(if-stream-out cerr)
exe args)])
(list (streamify-out cout out)
(streamify-in cin in)
pid
(streamify-out cerr err)
(letrec ((control
(lambda (m)
(case m
((status) (let ((s (subprocess-status subp)))
(cond ((not (integer? s)) s)
((zero? s) 'done-ok)
(else 'done-error))))
((wait) (subprocess-wait subp))
(else
(raise-type-error 'control-process "'status or 'wait" m))))))
control))))
(define (process/ports out in err str)
(apply process*/ports out in err (append (shell-path/args) (list str))))
(define (process* exe . args)
(apply process*/ports #f #f #f exe args))
(define (process str)
(apply process* (append (shell-path/args) (list str))))
;; Note: these always use current ports
(define (system* exe . args)
(let ([cout (current-output-port)]
[cin (current-input-port)]
[cerr (current-error-port)])
(let-values ([(subp out in err pid)
(apply
subprocess
(if-stream-out cout)
(if-stream-in cin)
(if-stream-out cerr)
exe args)])
(streamify-out cout out)
(streamify-in cin in)
(streamify-out cerr err)
(subprocess-wait subp)
(zero? (subprocess-status subp)))))
(define (system str)
(apply system* (append (shell-path/args) (list str)))))

View File

@ -3,13 +3,15 @@
(require "spidey.ss")
(provide consumer-thread
merge-input
with-semaphore
semaphore-wait-multiple
dynamic-disable-break
dynamic-enable-break
make-single-threader)
make-single-threader
merge-input
copy-port)
#|
t accepts a function, f, and creates a thread. It returns the thread and a
@ -65,51 +67,6 @@
(semaphore-post protect)
(semaphore-post sema))))]))
(define (merge-input a b)
(or (input-port? a)
(raise-type-error 'merge-input "input-port" a))
(or (input-port? b)
(raise-type-error 'merge-input "input-port" b))
(let-values ([(rd wt) (make-pipe)])
(let* ([copy1-sema (make-semaphore 500)]
[copy2-sema (make-semaphore 500)]
[ready1-sema (make-semaphore)]
[ready2-sema (make-semaphore)]
[check-first? #t]
[close-sema (make-semaphore)]
[mk-copy (lambda (from to copy-sema ready-sema)
(lambda ()
(let loop ()
(semaphore-wait copy-sema)
(let ([c (read-char from)])
(unless (eof-object? c)
(semaphore-post ready-sema)
(write-char c to)
(loop))))
(semaphore-post close-sema)))])
(thread (mk-copy a wt copy1-sema ready1-sema))
(thread (mk-copy b wt copy2-sema ready2-sema))
(thread (lambda ()
(semaphore-wait close-sema)
(semaphore-wait close-sema)
(close-output-port wt)))
(make-input-port
(lambda () (let ([c (read-char rd)])
(unless (eof-object? c)
(if (and check-first? (semaphore-try-wait? ready1-sema))
(semaphore-post copy1-sema)
(if (not (semaphore-try-wait? ready2-sema))
; check-first? must be #f
(if (semaphore-try-wait? ready1-sema)
(semaphore-post copy1-sema)
(error 'join "internal error: char from nowhere!"))
(semaphore-post copy2-sema)))
(set! check-first? (not check-first?)))
c))
(lambda () (char-ready? rd))
(lambda () (close-input-port rd))))))
(define with-semaphore
(lambda (s f)
(semaphore-wait s)
@ -196,5 +153,42 @@
(lambda () (semaphore-wait sema))
thunk
(lambda () (semaphore-post sema))))))))
(define (copy-port src dest)
(let ([s (make-string 4096)])
(let loop ()
(let ([c (read-string-avail! s src)])
(unless (eof-object? c)
(let loop ([start 0])
(unless (= start c)
(let ([c2 (write-string-avail s dest start c)])
(loop (+ start c2)))))
(loop))))))
(define merge-input
(case-lambda
[(a b) (merge-input a b #f)]
[(a b limit)
(or (input-port? a)
(raise-type-error 'merge-input "input-port" a))
(or (input-port? b)
(raise-type-error 'merge-input "input-port" b))
(or (not limit)
(and (number? limit) (positive? limit) (exact? limit) (integer? limit))
(raise-type-error 'merge-input "positive exact integer or #f" limit))
(let-values ([(rd wt) (make-pipe limit)]
[(other-done?) #f]
[(sema) (make-semaphore)])
(let ([copy
(lambda (from)
(copy-port from wt)
(semaphore-wait sema)
(if other-done?
(close-output-port wt)
(set! other-done? #t))
(semaphore-post sema))])
(copy a)
(copy b)
rd))]))
)