Parallel Locking

This commit is contained in:
Kevin Tew 2011-01-11 12:08:03 -07:00
parent e3d7ffbe82
commit e7a24a6b41
4 changed files with 80 additions and 48 deletions

View File

@ -20,13 +20,16 @@
(rename-out [trace manager-trace-handler]) (rename-out [trace manager-trace-handler])
get-file-sha1 get-file-sha1
get-compiled-file-sha1 get-compiled-file-sha1
with-compile-output) with-compile-output
parallel-lock-client)
(define manager-compile-notify-handler (make-parameter void)) (define manager-compile-notify-handler (make-parameter void))
(define trace (make-parameter void)) (define trace (make-parameter void))
(define indent (make-parameter "")) (define indent (make-parameter ""))
(define trust-existing-zos (make-parameter #f)) (define trust-existing-zos (make-parameter #f))
(define manager-skip-file-handler (make-parameter (λ (x) #f))) (define manager-skip-file-handler (make-parameter (λ (x) #f)))
(define depth (make-parameter 0))
(define parallel-lock-client (make-parameter #f))
(define (file-stamp-in-collection p) (define (file-stamp-in-collection p)
(file-stamp-in-paths p (current-library-collection-paths))) (file-stamp-in-paths p (current-library-collection-paths)))
@ -359,8 +362,6 @@
(verify-times path tmp-name) (verify-times path tmp-name)
(write-deps code mode path src-sha1 external-deps reader-deps up-to-date read-src-syntax))))) (write-deps code mode path src-sha1 external-deps reader-deps up-to-date read-src-syntax)))))
(define depth (make-parameter 0))
(define (actual-source-path path) (define (actual-source-path path)
(if (file-exists? path) (if (file-exists? path)
path path
@ -406,21 +407,31 @@
#f) #f)
((if sha1-only? values (lambda (build) (build) #f)) ((if sha1-only? values (lambda (build) (build) #f))
(lambda () (lambda ()
(when zo-exists? (try-delete-file zo-name #f)) (let* ([lc (parallel-lock-client)]
(log-info (format "cm: ~acompiling ~a" [locked? (and lc (lc 'lock zo-name))]
(build-string [ok-to-compile? (or (not lc) locked?)])
(depth) (dynamic-wind
(λ (x) (if (= 2 (modulo x 3)) #\| #\space))) (lambda () (void))
actual-path)) (lambda ()
(parameterize ([depth (+ (depth) 1)]) (when ok-to-compile?
(with-handlers (when zo-exists? (try-delete-file zo-name #f))
([exn:get-module-code? (log-info (format "cm: ~acompiling ~a"
(lambda (ex) (build-string
(compilation-failure mode path zo-name (depth)
(exn:get-module-code-path ex) (λ (x) (if (= 2 (modulo x 3)) #\| #\space)))
(exn-message ex)) actual-path))
(raise ex))]) (parameterize ([depth (+ (depth) 1)])
(compile-zo* mode path src-sha1 read-src-syntax zo-name up-to-date)))))))))) (with-handlers
([exn:get-module-code?
(lambda (ex)
(compilation-failure mode path zo-name
(exn:get-module-code-path ex)
(exn-message ex))
(raise ex))])
(compile-zo* mode path src-sha1 read-src-syntax zo-name up-to-date)))))
(lambda ()
(when locked?
(lc 'unlock zo-name))))))))))))
(unless sha1-only? (unless sha1-only?
(trace-printf "end compile: ~a" actual-path))))) (trace-printf "end compile: ~a" actual-path)))))

View File

@ -1,6 +1,8 @@
#lang racket/base #lang racket/base
(require compiler/cm) (require compiler/cm
(require racket/match) racket/match
racket/fasl
racket/serialize)
(define prev-uncaught-exception-handler (uncaught-exception-handler)) (define prev-uncaught-exception-handler (uncaught-exception-handler))
(uncaught-exception-handler (lambda (x) (uncaught-exception-handler (lambda (x)
@ -8,7 +10,7 @@
(prev-uncaught-exception-handler x))) (prev-uncaught-exception-handler x)))
(let ([cmc (make-caching-managed-compile-zo)] (let ([cmc (make-caching-managed-compile-zo)]
[worker-id (read)]) [worker-id (deserialize (fasl->s-exp (read)))])
(let loop () (let loop ()
(match (read) (match (read)
[(list 'DIE) void] [(list 'DIE) void]
@ -17,24 +19,37 @@
[file (bytes->path file)]) [file (bytes->path file)])
(let ([out-str-port (open-output-string)] (let ([out-str-port (open-output-string)]
[err-str-port (open-output-string)]) [err-str-port (open-output-string)])
(define (send/resp type) (let ([cip (current-input-port)]
(let ([msg (list type (get-output-string out-str-port) (get-output-string err-str-port))]) [cop (current-output-port)]
(write msg))) [cep (current-error-port)])
(let ([cep (current-error-port)]) (define (send/msg msg)
(write msg cop)
(flush-output cop))
(define (send/resp type)
(send/msg (list type (get-output-string out-str-port) (get-output-string err-str-port))))
(define (pp x) (define (pp x)
(fprintf cep "COMPILING ~a ~a ~a ~a\n" worker-id name file x)) (fprintf cep "COMPILING ~a ~a ~a ~a\n" worker-id name file x))
(with-handlers ([exn:fail? (lambda (x) (define (lock-client cmd fn)
(send/resp (list 'ERROR (exn-message x))))]) (match cmd
(parameterize ( ['lock
[current-namespace (make-base-empty-namespace)] (send/msg (list (list 'LOCK (path->bytes fn)) "" ""))
[current-directory dir] (match (read cip)
[current-load-relative-directory dir] [(list 'locked) #t]
[current-output-port out-str-port] [(list 'compiled) #f])]
[current-error-port err-str-port] ['unlock (send/msg (list (list 'UNLOCK (path->bytes fn)) "" ""))]))
;[manager-compile-notify-handler pp] (with-handlers ([exn:fail? (lambda (x)
) (send/resp (list 'ERROR (exn-message x))))])
(parameterize ([parallel-lock-client lock-client]
[current-namespace (make-base-empty-namespace)]
[current-directory dir]
[current-load-relative-directory dir]
[current-input-port (open-input-string "")]
[current-output-port out-str-port]
[current-error-port err-str-port]
;[manager-compile-notify-handler pp]
)
(cmc (build-path dir file))) (cmc (build-path dir file)))
(send/resp 'DONE)))) (send/resp 'DONE))))
(flush-output) (flush-output)
(loop))]))) (loop))])))

View File

@ -16,15 +16,21 @@
(if v (if v
(match v [(list w waitlst) (list w (append waitlst (list wrkr)))]) (match v [(list w waitlst) (list w (append waitlst (list wrkr)))])
(begin (begin
(send wrkr send/msg 'locked) (wrkr/send wrkr (list 'locked))
(list wrkr null)))) (list wrkr null))))
(not v))) (not v)))
(define/public (unlock fn) (define/public (unlock fn)
(for ([x (second (hash-ref locks fn))]) (match (hash-ref locks fn)
(wrkr/send x 'compiled)) [(list w waitlst)
(hash-remove! locks fn)) (for ([x (second (hash-ref locks fn))])
(wrkr/send x (list 'compiled)))
(hash-remove! locks fn)]))
(super-new))) (super-new)))
(define/class/generics Lock-Manager%
(lm/lock lock fn wrkr)
(lm/unlock unlock fn))
(provide parallel-compile (provide parallel-compile
parallel-build-worker) parallel-build-worker)
@ -43,14 +49,14 @@
[(list 'ERROR msg) [(list 'ERROR msg)
(append-error cc "making" (exn msg (current-continuation-marks)) out err "error") (append-error cc "making" (exn msg (current-continuation-marks)) out err "error")
#t] #t]
;[(list 'LOCK fn) (lock fn wrkr) #f] [(list 'LOCK fn) (lm/lock lock-mgr fn wrkr) #f]
;[(list 'UNLOCK fn) (unlock fn) #f] [(list 'UNLOCK fn) (lm/unlock lock-mgr fn) #f]
['DONE ['DONE
(define (string-!empty? s) (not (zero? (string-length s)))) (define (string-!empty? s) (not (zero? (string-length s))))
(when (ormap string-!empty? (list out err)) (when (ormap string-!empty? (list out err))
(append-error cc "making" null out err "output")) (append-error cc "making" null out err "output"))
#t]) (when last (printer (current-output-port) "made" "~a" (cc-name cc)))
(when last (printer (current-output-port) "made" "~a" (cc-name cc))))] #t]))]
[else [else
(match work (match work
[(list-rest (list cc file last) message) [(list-rest (list cc file last) message)

View File

@ -164,9 +164,9 @@
(begin (begin
(queue/work-done jobqueue node wrkr (string-append msg (port->string out))) (queue/work-done jobqueue node wrkr (string-append msg (port->string out)))
(kill/remove-dead-worker node-worker wrkr)))))))] (kill/remove-dead-worker node-worker wrkr)))))))]
[else [else
(eprintf "parallel-do-event-loop match node-worker failed.\n") (eprintf "parallel-do-event-loop match node-worker failed.\n")
(eprintf "trying to match:\n~a\n" node-worker)])))]))) (eprintf "trying to match:\n~a\n" node-worker)])))])))
(lambda () (lambda ()
(for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE)))) (for ([p workers]) (with-handlers ([exn? void]) (wrkr/send p (list 'DIE))))
(for ([p workers]) (send p wait))))) (for ([p workers]) (send p wait)))))