adjust the threading protocol for compilings files to be kill safe
This commit is contained in:
parent
8ea1487eea
commit
61aaf584c5
|
@ -675,43 +675,128 @@
|
|||
|
||||
(define (make-compile-lock)
|
||||
(define-values (manager-side-chan build-side-chan) (place-channel))
|
||||
(struct pending (response-chan bytes))
|
||||
(struct pending (response-chan zo-path died-chan-manager-side) #:transparent)
|
||||
(struct running (zo-path died-chan-manager-side) #:transparent)
|
||||
|
||||
(define currently-locked-files (make-hash))
|
||||
(define pending-requests '())
|
||||
(define running-compiles '())
|
||||
|
||||
(thread
|
||||
(λ ()
|
||||
(let loop ()
|
||||
(define req (place-channel-get manager-side-chan))
|
||||
(define command (list-ref req 0))
|
||||
(define bytes (list-ref req 1))
|
||||
(define response-manager-side (list-ref req 2))
|
||||
(cond
|
||||
[(eq? command 'lock)
|
||||
(cond
|
||||
[(hash-ref currently-locked-files bytes #f)
|
||||
(set! pending-requests (cons (pending response-manager-side bytes)
|
||||
pending-requests))
|
||||
(loop)]
|
||||
[else
|
||||
(hash-set! currently-locked-files bytes #t)
|
||||
(place-channel-put response-manager-side #t)
|
||||
(loop)])]
|
||||
[(eq? command 'unlock)
|
||||
(define (same-bytes? pending) (equal? (pending-bytes pending) bytes))
|
||||
(define to-unlock (filter same-bytes? pending-requests))
|
||||
(set! pending-requests (filter (compose not same-bytes?) pending-requests))
|
||||
(for ([pending (in-list to-unlock)])
|
||||
(place-channel-put (pending-response-chan pending) #f))
|
||||
(hash-remove! currently-locked-files bytes)
|
||||
(loop)]))))
|
||||
(apply
|
||||
sync
|
||||
(handle-evt
|
||||
manager-side-chan
|
||||
(λ (req)
|
||||
(define command (list-ref req 0))
|
||||
(define zo-path (list-ref req 1))
|
||||
(define response-manager-side (list-ref req 2))
|
||||
(define died-chan-manager-side (list-ref req 3))
|
||||
(case command
|
||||
[(lock)
|
||||
(cond
|
||||
[(hash-ref currently-locked-files zo-path #f)
|
||||
(set! pending-requests (cons (pending response-manager-side zo-path died-chan-manager-side)
|
||||
pending-requests))
|
||||
(loop)]
|
||||
[else
|
||||
(hash-set! currently-locked-files zo-path #t)
|
||||
(place-channel-put response-manager-side #t)
|
||||
(set! running-compiles (cons (running zo-path died-chan-manager-side) running-compiles))
|
||||
(loop)])]
|
||||
[(unlock)
|
||||
(define (same-bytes? pending) (equal? (pending-zo-path pending) zo-path))
|
||||
(define to-unlock (filter same-bytes? pending-requests))
|
||||
(set! pending-requests (filter (compose not same-bytes?) pending-requests))
|
||||
(for ([pending (in-list to-unlock)])
|
||||
(place-channel-put (pending-response-chan pending) #f))
|
||||
(hash-remove! currently-locked-files zo-path)
|
||||
(loop)])))
|
||||
(for/list ([running-compile (in-list running-compiles)])
|
||||
(handle-evt
|
||||
(running-died-chan-manager-side running-compile)
|
||||
(λ (_)
|
||||
(define zo-path (running-zo-path running-compile))
|
||||
(set! running-compiles (remove running-compile running-compiles))
|
||||
(define same-zo-pending
|
||||
(filter (λ (pending) (equal? zo-path (pending-zo-path pending)))
|
||||
pending-requests))
|
||||
(cond
|
||||
[(null? same-zo-pending)
|
||||
(hash-set! currently-locked-files zo-path #f)
|
||||
(loop)]
|
||||
[else
|
||||
(define to-be-running (car same-zo-pending))
|
||||
(set! pending-requests (remq to-be-running pending-requests))
|
||||
(place-channel-put (pending-response-chan to-be-running) #t)
|
||||
(set! running-compiles
|
||||
(cons (running zo-path (pending-died-chan-manager-side to-be-running))
|
||||
running-compiles))
|
||||
(loop)]))))))))
|
||||
|
||||
build-side-chan)
|
||||
|
||||
(define (compile-lock->parallel-lock-client build-side-chan)
|
||||
(define (compile-lock->parallel-lock-client build-side-chan [custodian #f])
|
||||
(define monitor-threads (make-hash))
|
||||
(define add-monitor-chan (make-channel))
|
||||
(define kill-monitor-chan (make-channel))
|
||||
|
||||
(define (clean-up-hash)
|
||||
(for ([key+val (in-list (hash-map monitor-threads list))])
|
||||
(define key (list-ref key+val 0))
|
||||
(define val (list-ref key+val 1))
|
||||
(unless (weak-box-value val)
|
||||
(hash-remove! monitor-threads key))))
|
||||
|
||||
(when custodian
|
||||
(parameterize ([current-custodian custodian])
|
||||
(thread
|
||||
(λ ()
|
||||
(let loop ()
|
||||
(sync
|
||||
(if (zero? (hash-count monitor-threads))
|
||||
never-evt
|
||||
(handle-evt (alarm-evt (+ (current-inexact-milliseconds) 500))
|
||||
(λ (arg)
|
||||
(clean-up-hash)
|
||||
(loop))))
|
||||
(handle-evt add-monitor-chan
|
||||
(λ (arg)
|
||||
(define-values (zo-path monitor-thread) (apply values arg))
|
||||
(hash-set! monitor-threads zo-path (make-weak-box monitor-thread))
|
||||
(clean-up-hash)
|
||||
(loop)))
|
||||
(handle-evt kill-monitor-chan
|
||||
(λ (zo-path)
|
||||
(define thd/f (weak-box-value (hash-ref monitor-threads zo-path)))
|
||||
(when thd/f (kill-thread thd/f))
|
||||
(hash-remove! monitor-threads zo-path)
|
||||
(clean-up-hash)
|
||||
(loop)))))))))
|
||||
|
||||
(λ (command zo-path)
|
||||
(define-values (response-builder-side response-manager-side) (place-channel))
|
||||
(place-channel-put build-side-chan (list command zo-path response-manager-side))
|
||||
(when (eq? command 'lock)
|
||||
(place-channel-get response-builder-side))))
|
||||
(define-values (died-chan-compiling-side died-chan-manager-side) (place-channel))
|
||||
(place-channel-put build-side-chan (list command zo-path response-manager-side died-chan-manager-side))
|
||||
(define compiling-thread (current-thread))
|
||||
(cond
|
||||
[(eq? command 'lock)
|
||||
(define monitor-thread
|
||||
(and custodian
|
||||
(parameterize ([current-custodian custodian])
|
||||
(thread
|
||||
(λ ()
|
||||
(thread-wait compiling-thread)
|
||||
(place-channel-put died-chan-compiling-side 'dead))))))
|
||||
(when monitor-thread (channel-put add-monitor-chan (list zo-path monitor-thread)))
|
||||
(define res (place-channel-get response-builder-side))
|
||||
(when monitor-thread
|
||||
(unless res ;; someone else finished compilation for us; kill the monitor
|
||||
(channel-put kill-monitor-chan zo-path)))
|
||||
res]
|
||||
[(eq? command 'unlock)
|
||||
(when custodian
|
||||
;; we finished the compilation; kill the monitor
|
||||
(channel-put kill-monitor-chan zo-path))])))
|
||||
|
|
|
@ -17,7 +17,8 @@
|
|||
;; get the module-language-compile-lock in the initial message
|
||||
(set! module-language-parallel-lock-client
|
||||
(compile-lock->parallel-lock-client
|
||||
(place-channel-get p)))
|
||||
(place-channel-get p)
|
||||
(current-custodian)))
|
||||
|
||||
;; get the handlers in a second message
|
||||
(set! handlers (for/list ([lst (place-channel-get p)])
|
||||
|
|
|
@ -1396,7 +1396,8 @@
|
|||
|
||||
(define module-language-parallel-lock-client
|
||||
(compile-lock->parallel-lock-client
|
||||
module-language-compile-lock))
|
||||
module-language-compile-lock
|
||||
(current-custodian)))
|
||||
|
||||
;; in-module-language : top-level-window<%> -> module-language-settings or #f
|
||||
(define (in-module-language tlw)
|
||||
|
|
|
@ -389,12 +389,20 @@ result will not call @racket[proc] with @racket['unlock].)
|
|||
]
|
||||
}
|
||||
|
||||
@defproc[(compile-lock->parallel-lock-client [pc place-channel?])
|
||||
@defproc[(compile-lock->parallel-lock-client [pc place-channel?] [cust (or/c #f custodian?) #f])
|
||||
(-> (or/c 'lock 'unlock) bytes? boolean?)]{
|
||||
|
||||
Returns a function that follows the @racket[parallel-lock-client]
|
||||
by communicating over @racket[pc]. The argument must have
|
||||
be the result of @racket[make-compile-lock].
|
||||
|
||||
This communication protocol implementation is not kill safe. To make it kill safe,
|
||||
it needs a sufficiently powerful custodian, i.e., one that is not subject to
|
||||
termination (unless all of the participants in the compilation are also terminated).
|
||||
It uses this custodian to create a thread that monitors the threads that are
|
||||
doing the compilation. If one of them is terminated, the presence of the
|
||||
custodian lets another one continue. (The custodian is also used to create
|
||||
a thread that manages a thread safe table.)
|
||||
}
|
||||
|
||||
@defproc[(make-compile-lock) place-channel?]{
|
||||
|
|
|
@ -146,6 +146,82 @@
|
|||
(parameterize ([current-load/use-compiled (make-compilation-manager-load/use-compiled-handler)])
|
||||
(test (void) dynamic-require 'compiler/cm #f))))
|
||||
|
||||
;; ----------------------------------------
|
||||
;; test for make-compile-lock
|
||||
|
||||
(let ()
|
||||
#|
|
||||
|
||||
This test creates a file to compile that, during compilation, conditionally
|
||||
freezes forever. It first creates a thread to compile the file in freeze-forever
|
||||
mode, and then, when the thread is stuck, creates a second thread to compile
|
||||
the file and kills the first thread. The second compile should complete properly
|
||||
and the test makes sure that it does and that the first thread doesn't complete.
|
||||
|
||||
|#
|
||||
|
||||
(define (sexps=>file file #:lang [lang #f] . sexps)
|
||||
(call-with-output-file file
|
||||
(λ (port)
|
||||
(when lang (fprintf port "~a\n" lang))
|
||||
(for ([x (in-list sexps)]) (fprintf port "~s\n" x)))
|
||||
#:exists 'truncate))
|
||||
|
||||
(define (poll-file file for)
|
||||
(let loop ([n 100])
|
||||
(when (zero? n)
|
||||
(error 'compiler/cm::poll-file "never found ~s in ~s" for file))
|
||||
(define now (call-with-input-file file (λ (port) (read-line port))))
|
||||
(unless (equal? now for)
|
||||
(sleep .1)
|
||||
(loop (- n 1)))))
|
||||
|
||||
(define file-to-compile (make-temporary-file "cmtest-file-to-compile~a.rkt"))
|
||||
(define control-file (make-temporary-file "cmtest-control-file-~a.rktd"))
|
||||
(define about-to-get-stuck-file (make-temporary-file "cmtest-about-to-get-stuck-file-~a.rktd"))
|
||||
|
||||
(sexps=>file file-to-compile #:lang "#lang racket"
|
||||
`(define-syntax (m stx)
|
||||
(call-with-output-file ,(path->string about-to-get-stuck-file)
|
||||
(λ (port) (fprintf port "about\n"))
|
||||
#:exists 'truncate)
|
||||
(if (call-with-input-file ,(path->string control-file) read)
|
||||
(semaphore-wait (make-semaphore 0))
|
||||
#'1))
|
||||
'(void (m)))
|
||||
(sexps=>file control-file #t)
|
||||
|
||||
(define p-l-c (compile-lock->parallel-lock-client (make-compile-lock) (current-custodian)))
|
||||
(define t1-finished? #f)
|
||||
(parameterize ([parallel-lock-client p-l-c]
|
||||
[current-load/use-compiled (make-compilation-manager-load/use-compiled-handler)])
|
||||
(define finished (make-channel))
|
||||
(define t1 (thread (λ () (dynamic-require file-to-compile #f) (set! t1-finished? #t))))
|
||||
(poll-file about-to-get-stuck-file "about")
|
||||
(sexps=>file control-file #f)
|
||||
(define t2 (thread (λ () (dynamic-require file-to-compile #f) (channel-put finished #t))))
|
||||
(sleep .1) ;; give thread t2 time to get stuck waiting for t1 to compile
|
||||
(kill-thread t1)
|
||||
(channel-get finished)
|
||||
|
||||
(test #f 't1-finished? t1-finished?)
|
||||
|
||||
(test #t
|
||||
'compile-lock::compiled-file-exists
|
||||
(file-exists?
|
||||
(let-values ([(base name dir?) (split-path file-to-compile)])
|
||||
(build-path base
|
||||
"compiled"
|
||||
(bytes->path (regexp-replace #rx"[.]rkt" (path->bytes name) "_rkt.zo"))))))
|
||||
|
||||
(define compiled-dir
|
||||
(let-values ([(base name dir?) (split-path file-to-compile)])
|
||||
(build-path base "compiled")))
|
||||
(delete-file file-to-compile)
|
||||
(delete-file control-file)
|
||||
(delete-file about-to-get-stuck-file)
|
||||
(delete-directory/files compiled-dir)))
|
||||
|
||||
;; ----------------------------------------
|
||||
|
||||
(report-errs)
|
||||
|
|
Loading…
Reference in New Issue
Block a user