From 732c62b2a5a4e3a577f69ca091889c67335dcf0a Mon Sep 17 00:00:00 2001 From: Kevin Tew Date: Fri, 3 Sep 2010 16:38:28 -0600 Subject: [PATCH] Places: benchmarks --- collects/meta/props | 2 +- collects/racket/place.rkt | 4 +- .../benchmarks/places/place-channel.rktl | 45 ++++- .../benchmarks/places/place-launch.rktl | 70 +++++++ .../benchmarks/places/place-processes.rkt | 177 ++++++++++++++++++ .../racket/benchmarks/places/place-utils.rkt | 77 ++++++++ .../racket/benchmarks/places/symbols.rktl | 77 ++++++++ 7 files changed, 443 insertions(+), 9 deletions(-) create mode 100644 collects/tests/racket/benchmarks/places/place-launch.rktl create mode 100644 collects/tests/racket/benchmarks/places/place-processes.rkt create mode 100644 collects/tests/racket/benchmarks/places/place-utils.rkt create mode 100644 collects/tests/racket/benchmarks/places/symbols.rktl diff --git a/collects/meta/props b/collects/meta/props index f3106f870d..688076d2c2 100755 --- a/collects/meta/props +++ b/collects/meta/props @@ -1551,7 +1551,7 @@ path/s is either such a string or a list of them. "collects/tests/racket/benchmarks/mz/parsing.rktl" drdr:command-line (gracket "-f" *) "collects/tests/racket/benchmarks/mz/redsem.rktl" drdr:command-line (racket "-f" * "--" "--skip-struct-test") "collects/tests/racket/benchmarks/mz/ssax.rktl" drdr:command-line (racket "-f" *) -"collects/tests/racket/benchmarks/places/place-channel.rktl" drdr:command-line #f +"collects/tests/racket/benchmarks/places" drdr:command-line #f "collects/tests/racket/benchmarks/rx/auto.rkt" drdr:command-line (racket "-t" * "--" "racket" "simple") drdr:timeout 600 "collects/tests/racket/benchmarks/shootout/ackermann.rkt" drdr:command-line (racket "-t" * "--" "10") "collects/tests/racket/benchmarks/shootout/auto.rkt" drdr:command-line (racket "-qt" * "--" "hello") diff --git a/collects/racket/place.rkt b/collects/racket/place.rkt index 745d450945..4f39c4f82a 100644 --- a/collects/racket/place.rkt +++ b/collects/racket/place.rkt @@ -1,5 +1,6 @@ #lang scheme/base (require '#%place) +(require '#%futures) (define (place-channel-send/recv ch msg) (place-channel-send ch msg) @@ -13,4 +14,5 @@ place-channel-recv place-channel? place? - place-channel-send/recv) + place-channel-send/recv + processor-count) diff --git a/collects/tests/racket/benchmarks/places/place-channel.rktl b/collects/tests/racket/benchmarks/places/place-channel.rktl index c5b8ff95a6..61b78f7968 100644 --- a/collects/tests/racket/benchmarks/places/place-channel.rktl +++ b/collects/tests/racket/benchmarks/places/place-channel.rktl @@ -1,15 +1,47 @@ -#lang racket +#lang racket/base ;; stress tests for place-channels +(require (prefix-in pp: "place-processes.rkt")) +(require racket/place) (define (splat txt fn) (call-with-output-file fn #:exists 'replace (lambda (out) (fprintf out "~a" txt)))) +(define (print-out msg B/sE) + (displayln (list msg + (exact->inexact B/sE) + (exact->inexact (/ B/sE (* 1024 1024)))))) + +(define (processes-byte-message-test) + (let ([pl + (pp:place/base (bo ch) + (define message-size (* 4024 1024)) + (define count 10) + (define fourk-b-message (make-bytes message-size 66)) + (for ([i (in-range count)]) + (place-channel-recv ch) + (place-channel-send ch fourk-b-message)))]) + + (define message-size (* 4024 1024)) + (define four-k-message (make-bytes message-size 65)) + (define count 10) + (define-values (r t1 t2 t3) + (time-apply (lambda () + (for ([i (in-range count)]) + (pp:place-channel-send pl four-k-message) + (pp:place-channel-recv pl))) null)) + + + (print-out "processes" (/ (* 2 count message-size) (/ t2 1000))) + (pp:place-wait pl))) + + (define (byte-message-test) (splat #<inexact B/sE)) - (printf "MB/s ~a\n" (exact->inexact (/ B/sE (* 1024 1024)))) + (print-out "places" (/ (* 2 count message-size) (/ t2 1000))) (place-wait pl))) (define (cons-tree-test) (splat #<channel o) + (match o + [(? place-s? p) (place-s-ch p)] + [(? place-channel-s? p) p])) + +;; create a place-channel, should be called from children workers +(define (place-child-channel) (make-place-channel-s (current-input-port) (current-output-port))) + +;; send x on channel ch +(define (place-channel-send ch x) + (define out (place-channel-s-out (resolve->channel ch))) + (write (s-exp->fasl (serialize x)) out) + (flush-output out)) + +;; receives msg on channel ch +(define (place-channel-recv ch) + (deserialize (fasl->s-exp (read (place-channel-s-in (resolve->channel ch)))))) + +;; create a place given a module file path and a func-name to invoke +(define (place module-name func-name) + (define (send/msg x ch) + (write x ch) + (flush-output ch)) + (define (module-name->bytes name) + (cond + [(path? name) (path->bytes name)] + [(string? name) (string->bytes/locale name)] + [(bytes? name) name] + [else (raise 'module->path "expects a path or string")])) + (define (current-executable-path) + (parameterize ([current-directory (find-system-path 'orig-dir)]) + (find-executable-path (find-system-path 'exec-file) #f))) + (define (current-collects-path) + (let ([p (find-system-path 'collects-dir)]) + (if (complete-path? p) + p + (path->complete-path p (or (path-only (current-executable-path)) + (find-system-path 'orig-dir)))))) + (define worker-cmdline-list (list (current-executable-path) "-X" (path->string (current-collects-path)) "-e" "(eval(read))")) + (let-values ([(process-handle out in err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)]) + (send/msg `((dynamic-require (bytes->path ,(module-name->bytes module-name)) (quote ,func-name))) in) + (make-place-s (make-place-channel-s out in) process-handle err))) + +;; kill a place +(define (place-kill pl) (subprocess-kill (place-s-subprocess-obj pl) #t)) + +;; wait for a place to finish +(define (place-wait pl) + (let ((spo (place-s-subprocess-obj pl))) + (subprocess-wait spo) + (subprocess-status spo))) + +(define (place-channel-send/recv ch x) + (place-channel-send ch x) + (place-channel-recv ch)) + +;; splits lst into n equal pieces +(define (split-n n lst) + (match n + [(? (lambda (x) (x . < . 0))) (raise (format "split-n: n: ~a less than 0" n))] + [1 lst] + [else + (define splits (sub1 n)) + (define-values (q r) (quotient/remainder (length lst) n)) + (let loop ([lst-in lst] + [splits splits] + [left-overs r] + [result null]) + (define have-remainder (left-overs . > . 0)) + (define split-pos (if have-remainder (add1 q) q)) + (define-values (lst1 lst2) (split-at lst-in split-pos)) + (define new-result (cons lst1 result)) + (define new-splits (sub1 splits)) + (if (zero? new-splits) + (reverse (cons lst2 new-result)) + (loop + lst2 + new-splits + (sub1 left-overs) + new-result)))])) + +;; macro which lifts a place-worker body to module scope and provides it +;; (place/lambda (worker-name:identifier channel:identifier) body ...) +;; returns syntax that creates a place +(define-syntax (place/base stx) + (syntax-case stx () + [(_ (name ch) body ...) + (begin + (define (splat txt fn) + (call-with-output-file fn #:exists 'replace + (lambda (out) + (write txt out)))) + + (define module-path-prefix (make-temporary-file "place-benchmark~a.rkt" #f (current-directory))) + (define-values (base file-name isdir) (split-path module-path-prefix)) + (define worker-syntax + (with-syntax ([module-name (datum->syntax #'name (string->symbol (path->string (path-replace-suffix file-name ""))))]) + #'(module module-name racket/base + (require "place-processes.rkt") + (provide name) + (define (name) + (let ([ch (place-child-channel)]) + body ...))))) + (define module-path (path->string module-path-prefix)) + + (splat (syntax->datum worker-syntax) module-path) + + (define place-syntax #`(place #,module-path (quote name))) + ;(write (syntax->datum place-syntax)) + place-syntax)])) + +(define-syntax (place/lambda stx) + (syntax-case stx () + [(_ (name args ...) body ...) + (begin + (define (place/current-module-path funcname) + (with-syntax ([funcname funcname]) + #'(let ([module-path (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference)))]) + (place module-path (quote funcname))))) + (with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda () ((lambda (args ...) body ...) (place-child-channel))))]) + (syntax-local-lift-provide #'(rename interal-def-name name))) + (place/current-module-path #'name))])) +;; map-reduce/lambda +;; (map-reduce/lambda srclist (my-worker sublist) body ...) -> reduced-value +;; map-reduce/lambda divides srclist into (processor-count) pieces and maps them out to places to be reduced. +;; +;; === READ THIS === +;; map-reduce/lambda cannot be used in a script or main program file! It must be used in a module without top-level expressions. +;; map-reduce/lambda dynamic-requires the module in which it is defined in children worker processes. +;; reduce-body CANNOT close over any values it can only use the sublist-identifier and top level module defines +;; ================= +;; +;; unique-identifier: the body of map-reduce/lambda is lifted to module scope and defined as unique-identifier. The user does not need to reference unique-identifier, it just needs to be unique in the defining module. +;; sublist-identifier: the sublist map creates will be bound to sublist-identifier in map-reduce/lambda's body. +(define-syntax (map-reduce/lambda stx) + (syntax-case stx () + [(_ lst (name listvar) body ...) + #'(begin + (define places (for/list ([i (in-range (processor-count))]) + (place/lambda (name ch) + (place-channel-send ch ((lambda (listvar) body ...) (place-channel-recv ch)))))) + + (for ([p places] + [item (split-n (processor-count) lst)]) + (place-channel-send p item)) + (define result ((lambda (listvar) body ...) (map place-channel-recv places))) + (map place-wait places) + (map place-kill places) + result)])) diff --git a/collects/tests/racket/benchmarks/places/place-utils.rkt b/collects/tests/racket/benchmarks/places/place-utils.rkt new file mode 100644 index 0000000000..2177edbf60 --- /dev/null +++ b/collects/tests/racket/benchmarks/places/place-utils.rkt @@ -0,0 +1,77 @@ +#lang racket/base +(require racket/place + racket/file + (for-syntax racket/base + racket/file)) + +(provide splat + splat-tmp + barrier-m + barrier + places-wait + place/base + time-n) + + +(define (splat txt fn) + (call-with-output-file fn #:exists 'replace + (lambda (out) + (fprintf out "~a" txt)))) + +(define (splat-tmp txt) + (define fn (make-temporary-file "place-benchmark~a" #f (current-directory))) + (splat txt fn) + fn) + +(define (barrier-m pls) + (for ([ch pls]) (place-channel-recv ch)) + (for ([ch pls]) (place-channel-send ch 1))) + +(define (barrier ch) + (place-channel-send ch 0) + (place-channel-recv ch)) + +(define (places-wait pls) + (for ([p pls]) (place-wait p))) + +(define-syntax (place/base stx) + (syntax-case stx () + [(_ (name ch) body ...) + (begin + (define (splat txt fn) + (call-with-output-file fn #:exists 'replace + (lambda (out) + (write txt out)))) + + (define module-path-prefix (make-temporary-file "place-worker-~a.rkt" #f)) + (define-values (base file-name isdir) (split-path module-path-prefix)) + (define worker-syntax + (with-syntax ([module-name (datum->syntax #'name (string->symbol (path->string (path-replace-suffix file-name ""))))]) + #'(module module-name racket/base + (require racket/place) + (provide name) + (define (name ch) + body ...)))) + (define module-path (path->string module-path-prefix)) + + (splat (syntax->datum worker-syntax) module-path) + + (define place-syntax #`(place (make-resolved-module-path #,module-path) (quote name))) + ;(write (syntax->datum place-syntax)) + place-syntax)])) + +(define-syntax (time-n stx) + (syntax-case stx () + [(_ msg cnt body ...) + #'(let-values ([(r ct rt gct) (time-apply + (lambda () + body ... + ) + null)]) + (displayln (list msg cnt ct rt gct)) + (if (pair? r) (car r) r)) +#| + #'(time body ...) +|# +])) + diff --git a/collects/tests/racket/benchmarks/places/symbols.rktl b/collects/tests/racket/benchmarks/places/symbols.rktl new file mode 100644 index 0000000000..10c7e4274f --- /dev/null +++ b/collects/tests/racket/benchmarks/places/symbols.rktl @@ -0,0 +1,77 @@ +#lang racket/base +;; stress tests for place-channels communciating symbols +(require racket/place + racket/match + "place-utils.rkt") + +(define (symbol-test) + (splat + #<string id)) + (for ([j (in-range reps)]) + (define repstr (number->string j)) + (barrier ch) + (for ([i (in-range cnt)]) + (string->symbol (string-append ids "_" repstr "_" (number->string i)))) + (barrier ch) +)])) + ) +END + "pct1.ss") + + (define-values (plcnt reps symcnt) + (match (current-command-line-arguments) + [(vector) (values (processor-count) 10 1000000)] + [(vector a b c) (values a b c)])) + + (define pls (for/list ([i (in-range plcnt)]) (place "pct1.ss" 'place-main))) + + (for ([i (in-range plcnt)] + [pl pls]) + (place-channel-send pl (list i reps symcnt))) + (for ([j (in-range reps)]) + (time-n "1million-symbols" j + (barrier-m pls) + (barrier-m pls)))) + +(define (symbol-read-test) + (splat + #<