diff --git a/collects/mzlib/process.ss b/collects/mzlib/process.ss new file mode 100644 index 0000000..af2117c --- /dev/null +++ b/collects/mzlib/process.ss @@ -0,0 +1,112 @@ + +(module process mzscheme + (provide process + process* + process/ports + process*/ports + system + system*) + + (require (lib "thread.ss")) + + ;; Helpers: ---------------------------------------- + + (define (shell-path/args) + (case (system-type) + ((unix) '("/bin/sh" "-c")) + ((windows) (let ([d (find-system-path 'sys-dir)]) + (let ([cmd (build-path d "cmd.exe")]) + (if (file-exists? cmd) + cmd + (build-path d "command.com"))))) + (else (error "don't know what shell to use for ~e." (system-type))))) + + (define (if-stream-out p) + (if (file-stream-port? p) + p + (if (output-port? p) + #f + (raise-type-error + 'subprocess + "output port" + p)))) + + (define (if-stream-in p) + (if (file-stream-port? p) + p + (if (input-port? p) + #f + (raise-type-error + 'subprocess + "input port" + p)))) + + (define (streamify-in cin in) + (if (and cin (not (file-stream-port? cin))) + (begin + (thread (lambda () + (copy-port cin in) + (close-output-port in))) + #f) + in)) + + (define (streamify-out cout out) + (if (and cout (not (file-stream-port? cout))) + (begin + (thread (lambda () (copy-port out cout))) + #f) + out)) + + ;; Old-style functions: ---------------------------------------- + + (define (process*/ports cout cin cerr exe . args) + (let-values ([(subp out in err pid) (apply subprocess + (if-stream-out cout) + (if-stream-in cin) + (if-stream-out cerr) + exe args)]) + (list (streamify-out cout out) + (streamify-in cin in) + pid + (streamify-out cerr err) + (letrec ((control + (lambda (m) + (case m + ((status) (let ((s (subprocess-status subp))) + (cond ((not (integer? s)) s) + ((zero? s) 'done-ok) + (else 'done-error)))) + ((wait) (subprocess-wait subp)) + (else + (raise-type-error 'control-process "'status or 'wait" m)))))) + control)))) + + (define (process/ports out in err str) + (apply process*/ports out in err (append (shell-path/args) (list str)))) + + (define (process* exe . args) + (apply process*/ports #f #f #f exe args)) + + (define (process str) + (apply process* (append (shell-path/args) (list str)))) + + ;; Note: these always use current ports + (define (system* exe . args) + (let ([cout (current-output-port)] + [cin (current-input-port)] + [cerr (current-error-port)]) + (let-values ([(subp out in err pid) + (apply + subprocess + (if-stream-out cout) + (if-stream-in cin) + (if-stream-out cerr) + exe args)]) + (streamify-out cout out) + (streamify-in cin in) + (streamify-out cerr err) + (subprocess-wait subp) + (zero? (subprocess-status subp))))) + + (define (system str) + (apply system* (append (shell-path/args) (list str))))) diff --git a/collects/mzlib/thread.ss b/collects/mzlib/thread.ss index 7f5a7c5..32b0919 100644 --- a/collects/mzlib/thread.ss +++ b/collects/mzlib/thread.ss @@ -3,13 +3,15 @@ (require "spidey.ss") (provide consumer-thread - merge-input with-semaphore semaphore-wait-multiple dynamic-disable-break dynamic-enable-break - make-single-threader) + make-single-threader + + merge-input + copy-port) #| t accepts a function, f, and creates a thread. It returns the thread and a @@ -65,51 +67,6 @@ (semaphore-post protect) (semaphore-post sema))))])) - - (define (merge-input a b) - (or (input-port? a) - (raise-type-error 'merge-input "input-port" a)) - (or (input-port? b) - (raise-type-error 'merge-input "input-port" b)) - (let-values ([(rd wt) (make-pipe)]) - (let* ([copy1-sema (make-semaphore 500)] - [copy2-sema (make-semaphore 500)] - [ready1-sema (make-semaphore)] - [ready2-sema (make-semaphore)] - [check-first? #t] - [close-sema (make-semaphore)] - [mk-copy (lambda (from to copy-sema ready-sema) - (lambda () - (let loop () - (semaphore-wait copy-sema) - (let ([c (read-char from)]) - (unless (eof-object? c) - (semaphore-post ready-sema) - (write-char c to) - (loop)))) - (semaphore-post close-sema)))]) - (thread (mk-copy a wt copy1-sema ready1-sema)) - (thread (mk-copy b wt copy2-sema ready2-sema)) - (thread (lambda () - (semaphore-wait close-sema) - (semaphore-wait close-sema) - (close-output-port wt))) - (make-input-port - (lambda () (let ([c (read-char rd)]) - (unless (eof-object? c) - (if (and check-first? (semaphore-try-wait? ready1-sema)) - (semaphore-post copy1-sema) - (if (not (semaphore-try-wait? ready2-sema)) - ; check-first? must be #f - (if (semaphore-try-wait? ready1-sema) - (semaphore-post copy1-sema) - (error 'join "internal error: char from nowhere!")) - (semaphore-post copy2-sema))) - (set! check-first? (not check-first?))) - c)) - (lambda () (char-ready? rd)) - (lambda () (close-input-port rd)))))) - (define with-semaphore (lambda (s f) (semaphore-wait s) @@ -196,5 +153,42 @@ (lambda () (semaphore-wait sema)) thunk (lambda () (semaphore-post sema)))))))) + + (define (copy-port src dest) + (let ([s (make-string 4096)]) + (let loop () + (let ([c (read-string-avail! s src)]) + (unless (eof-object? c) + (let loop ([start 0]) + (unless (= start c) + (let ([c2 (write-string-avail s dest start c)]) + (loop (+ start c2))))) + (loop)))))) + + (define merge-input + (case-lambda + [(a b) (merge-input a b #f)] + [(a b limit) + (or (input-port? a) + (raise-type-error 'merge-input "input-port" a)) + (or (input-port? b) + (raise-type-error 'merge-input "input-port" b)) + (or (not limit) + (and (number? limit) (positive? limit) (exact? limit) (integer? limit)) + (raise-type-error 'merge-input "positive exact integer or #f" limit)) + (let-values ([(rd wt) (make-pipe limit)] + [(other-done?) #f] + [(sema) (make-semaphore)]) + (let ([copy + (lambda (from) + (copy-port from wt) + (semaphore-wait sema) + (if other-done? + (close-output-port wt) + (set! other-done? #t)) + (semaphore-post sema))]) + (copy a) + (copy b) + rd))])) )