cs & thread: move places API to the thread layer

The Rumble layer provides a primitive `fork-place` operation and
`start-place` hook to tie in place initialization from other layers
(especially "io"), but the rest can live in the "thread" layer,
especially to handle place-channel synchronization.
This commit is contained in:
Matthew Flatt 2018-09-03 09:03:18 -06:00
parent f03d5c0076
commit 35af40d850
25 changed files with 272 additions and 82 deletions

View File

@ -488,7 +488,8 @@
(regexp-place-init!)
(expander-place-init!)
(initialize-place!)
(dynamic-require mod sym)))
(lambda ()
(dynamic-require mod sym))))
(when (getenv "PLT_STATS_ON_BREAK")
(keyboard-interrupt-handler

View File

@ -29,4 +29,6 @@
[record-mutator (known-constant)]
[unsafe-struct? (known-constant)]
[fork-place (known-procedure 1)]
[start-place (known-procedure 32)]
[make-pthread-parameter (known-procedure 2)])

View File

@ -570,12 +570,10 @@
unsafe-extflvector-length unsafe-extflvector-ref unsafe-extflvector-set!
install-start-place! ; not exported to Racket
place-enabled? place? place-channel? place-break
place-channel-get place-channel-put place-sleep
place-channel place-dead-evt place-kill place-message-allowed?
dynamic-place place-wait place-pumper-threads place-shared?
fork-place ; not exported to Racket
start-place ; not exported to Racket
place-enabled?
unsafe-get-place-table
unsafe-add-post-custodian-shutdown
unsafe-make-place-local unsafe-place-local-ref unsafe-place-local-set!
place-local-register-ref ; not exported to Racket
place-local-register-set! ; not exported to Racket

View File

@ -54,8 +54,6 @@
;; ----------------------------------------
(define-record place (thread))
(meta-cond
[(threaded?)
(define (place-enabled?) #f) ;; FIXME
@ -68,40 +66,9 @@
(define (place-enabled?) #f)
(define (fork-place thunk) #f)])
(define start-place void)
(define do-start-place void)
(define (install-start-place! proc)
(set! start-place proc))
(set! do-start-place proc))
(define (dynamic-place path sym in out err)
(make-place
(fork-place (lambda ()
(start-place path sym in out err)))))
(define (place-channel? v)
#f)
(define-syntax define-place-not-yet-available
(syntax-rules ()
[(_ id)
(define (id . args)
(error 'id "place API not yet supported"))]
[(_ id ...)
(begin (define-place-not-yet-available id) ...)]))
;; This operation adds shutdown thunks to a non-main place, so it's a
;; no-op for now:
(define (unsafe-add-post-custodian-shutdown proc)
(void))
(define-place-not-yet-available
place-break
place-channel-get
place-channel-put
place-sleep
place-channel
place-dead-evt
place-kill
place-message-allowed?
place-wait
place-pumper-threads
place-shared?)
(define (start-place path sym in out err)
(do-start-place path sym in out err))

View File

@ -26,6 +26,8 @@
[mutex-acquire rumble:mutex-acquire]
[mutex-release rumble:mutex-release]
[pthread? rumble:thread?]
[fork-place rumble:fork-place]
[start-place rumble:start-place]
[fork-pthread rumble:fork-thread]
[threaded? rumble:threaded?]
[get-thread-id rumble:get-thread-id]
@ -39,7 +41,8 @@
;; Special handling of `current-atomic`: use the last virtual register;
;; we rely on the fact that the register's default value is 0.
(define-syntax (define stx)
(syntax-case stx (current-atomic make-pthread-parameter)
(syntax-case stx (current-atomic make-pthread-parameter unsafe-make-place-local)
;; Recognize definition of `current-atomic`:
[(_ current-atomic (make-pthread-parameter 0))
(with-syntax ([(_ id _) stx]
[n (datum->syntax #'here (sub1 (virtual-register-count)))])
@ -47,6 +50,9 @@
(syntax-rules ()
[(_) (virtual-register n)]
[(_ v) (set-virtual-register! n v)])))]
;; Workaround for redirected access of `unsafe-make-place-local` from engine:
[(_ alias-id unsafe-make-place-local) #'(begin)]
;; Chain to place-register handling:
[(_ . rest) #'(place:define . rest)]))
(define (exit n)
@ -67,7 +73,13 @@
;; expander, and they need to be listed in
;; "primitives/internal.ss".
(hash
'make-pthread-parameter make-pthread-parameter)]
'make-pthread-parameter make-pthread-parameter
;; These are actually redirected by "place-register.ss", but
;; we list them here for compatibility with the bootstrapping
;; variant of `#%pthread`
'unsafe-make-place-local rumble:unsafe-make-place-local
'unsafe-place-local-ref rumble:unsafe-place-local-ref
'unsafe-place-local-set! rumble:unsafe-place-local-set!)]
[(|#%engine|)
(hash
'make-engine rumble:make-engine
@ -93,6 +105,8 @@
'poll-async-callbacks poll-async-callbacks
'disable-interrupts disable-interrupts
'enable-interrupts enable-interrupts
'fork-place rumble:fork-place
'start-place rumble:start-place
'fork-pthread rumble:fork-thread
'pthread? rumble:thread?
'get-thread-id rumble:get-thread-id

View File

@ -48,18 +48,18 @@
;; Linklet compilation on Chez Scheme
(for-each register-built-in-symbol!
'(let
letrec*
define
or
and
pariah
variable-set!
variable-ref
variable-ref/no-check
make-instance-variable-reference
annotation?
annotation-expression
#%app
#%call-with-values
make-pthread-parameter))))
'(or
and
let
letrec*
define
pariah
variable-set!
variable-ref
variable-ref/no-check
make-instance-variable-reference
annotation?
annotation-expression
#%app
#%call-with-values
make-pthread-parameter))))

View File

@ -1,6 +1,6 @@
#lang racket/base
(require racket/private/place-local
"engine.rkt"
(require "host.rkt"
"place-local.rkt"
"internal-error.rkt"
"debug.rkt")

View File

@ -140,9 +140,34 @@
[() x]
[(v) (set! x v)]))
(define place-local-table (make-thread-cell (make-hasheq)))
(define (unsafe-make-place-local v)
(define key (vector v 'place-locale))
(hash-set! (thread-cell-ref place-local-table) key v)
key)
(define (unsafe-place-local-ref key)
(hash-ref (thread-cell-ref place-local-table) key (vector-ref key 0)))
(define (unsafe-place-local-set! key val)
(hash-set! (thread-cell-ref place-local-table) key val))
(define (fork-place thunk)
(thread (lambda ()
(thread-cell-set! place-local-table (make-hasheq))
(thunk))))
(define (start-place mod sym in out err)
(lambda (finish)
(void)))
(primitive-table '#%pthread
(hash
'make-pthread-parameter make-pthread-parameter))
'make-pthread-parameter make-pthread-parameter
'unsafe-make-place-local unsafe-make-place-local
'unsafe-place-local-ref unsafe-place-local-ref
'unsafe-place-local-set! unsafe-place-local-set!))
(primitive-table '#%engine
(hash
'make-engine make-engine
@ -168,6 +193,8 @@
'poll-async-callbacks (lambda () null)
'disable-interrupts void
'enable-interrupts void
'fork-place fork-place
'start-place start-place
'fork-pthread (lambda args
(error "fork-pthread: not ready"))
'pthread? (lambda args

View File

@ -1,7 +1,7 @@
#lang racket/base
(require "check.rkt"
(submod "thread.rkt" scheduling)
"engine.rkt")
"host.rkt")
(provide continuation-marks)

View File

@ -1,8 +1,8 @@
#lang racket/base
(require racket/private/place-local
(require "place-local.rkt"
"check.rkt"
"atomic.rkt"
"engine.rkt"
"host.rkt"
"evt.rkt"
"semaphore.rkt")

View File

@ -500,6 +500,12 @@
(thread (lambda () (sync (system-idle-evt)) (collect-garbage)))
(check #t (symbol? (will-execute we)))
;; Check places, where the various export symbols passed to
;; `dynamic-place` are built into "bootstrap.rkt"
(define pl (dynamic-place 'dummy 'no-op #f #f #f))
(check #t (place? pl))
(sleep 1)
(set! done? #t)))
(unless done?

View File

@ -1,9 +1,9 @@
#lang racket/base
(require racket/private/place-local
(require "place-local.rkt"
"check.rkt"
"internal-error.rkt"
"engine.rkt"
"host.rkt"
"atomic.rkt"
"parameter.rkt"
"../common/queue.rkt"

View File

@ -24,7 +24,10 @@
;; This `#%pthread` table's entries are linked more directly
;; than `#%engine` entries:
(bounce #%pthread
make-pthread-parameter)
make-pthread-parameter
unsafe-make-place-local
unsafe-place-local-ref
unsafe-place-local-set!)
(bounce #%engine
make-engine
@ -63,6 +66,9 @@
[disable-interrupts host:disable-interrupts]
[enable-interrupts host:enable-interrupts]
[fork-place host:fork-place]
[start-place host:start-place]
fork-pthread
pthread?
[get-thread-id get-pthread-id]

View File

@ -25,6 +25,7 @@
"time.rkt"
"stats.rkt"
"stack-size.rkt"
"place.rkt"
"future.rkt"
"fsemaphore.rkt"
"os-thread.rkt")
@ -157,6 +158,24 @@
unsafe-custodian-register
unsafe-custodian-unregister
dynamic-place ; not the one from `racket/place`
place?
place-break
place-kill
place-wait
place-dead-evt
place-sleep
place-channel
place-channel?
place-channel-get
place-channel-put
place-message-allowed?
place-pumper-threads
place-shared?
unsafe-add-post-custodian-shutdown
futures-enabled?
processor-count
future

View File

@ -1,7 +1,7 @@
#lang racket/base
(require "check.rkt"
"atomic.rkt"
"engine.rkt"
"host.rkt"
"thread.rkt"
(except-in (submod "thread.rkt" scheduling)
thread

View File

@ -1,6 +1,6 @@
#lang racket/base
(require "check.rkt"
"engine.rkt"
"host.rkt"
"atomic.rkt")
(provide unsafe-os-thread-enabled?

View File

@ -1,5 +1,5 @@
#lang racket/base
(require "engine.rkt")
(require "host.rkt")
(provide current-thread)

View File

@ -0,0 +1,20 @@
#lang racket/base
(require (for-syntax racket/base)
"host.rkt")
(provide define-place-local)
;; Just like the one from `racket/private/place-local`, but using the
;; exports of "host.rkt" so we can test in bootstrapping mode.
(define-syntax-rule (define-place-local id v)
(begin
(define cell (unsafe-make-place-local v))
(define-syntax id
(make-set!-transformer
(lambda (stx)
(...
(syntax-case stx (set!)
[(set! _ r) #'(unsafe-place-local-set! cell r)]
[(_ e ...) #'((unsafe-place-local-ref cell) e ...)]
[_ #'(unsafe-place-local-ref cell)])))))))

130
racket/src/thread/place.rkt Normal file
View File

@ -0,0 +1,130 @@
#lang racket/base
(require (only-in '#%unsafe unsafe-abort-current-continuation/no-wind)
"place-local.rkt"
"check.rkt"
"host.rkt"
"schedule.rkt"
"atomic.rkt"
"thread.rkt"
"custodian.rkt"
"plumber.rkt"
"exit.rkt"
"sync.rkt"
"evt.rkt")
(provide dynamic-place
place?
place-break
place-kill
place-wait
place-dead-evt
place-sleep
place-channel
place-channel?
place-channel-get
place-channel-put
place-message-allowed?
place-pumper-threads
place-shared?
unsafe-add-post-custodian-shutdown)
;; ----------------------------------------
(struct place ([result #:mutable]
custodian
[post-shutdown #:mutable]
pumper-threads))
(define-place-local current-place #f)
(define place-prompt-tag (make-continuation-prompt-tag 'place))
(define (dynamic-place path sym in out err)
(define c (make-custodian))
(define new-place (place #f ; result
c
'() ; post-shutdown
(make-vector 3 #f))) ; pumper-threads
(define orig-plumber (make-plumber))
(define (default-exit v)
(plumber-flush-all orig-plumber)
(unsafe-abort-current-continuation/no-wind
place-prompt-tag
(if (byte? v) v 0)))
(host:fork-place
(lambda ()
(define finish (host:start-place path sym in out err))
(call-in-main-thread
(lambda ()
(define result
(call-with-continuation-prompt
(lambda ()
(set! current-place new-place)
(current-custodian c)
(current-plumber orig-plumber)
(exit-handler default-exit)
(finish
(lambda (in-th out-th err-th)
(vector-set! (place-pumper-threads place) 0 in-th)
(vector-set! (place-pumper-threads place) 1 out-th)
(vector-set! (place-pumper-threads place) 2 err-th)))
(default-exit 0))
place-prompt-tag
(lambda (v) v)))
(set-place-result! new-place result)))))
new-place)
(define/who (place-break p [kind #f])
(check who place? p)
(unless (or (not kind) (eq? kind 'hangup) (eq? kind 'terminate))
(raise-argument-error who "(or/c #f 'hangup 'terminate)" kind))
(void))
(define/who (place-kill p)
(check who place? p)
(void))
(define/who (place-wait p)
(check who place? p)
(sync never-evt))
(define/who (place-dead-evt p)
(check who place? p)
never-evt)
(define/who (place-sleep msecs)
(void))
;; ----------------------------------------
(struct pchannel ()
#:reflection-name 'place-channel)
(define (place-channel? v)
(pchannel? v))
(define (place-channel)
(values (pchannel)
(pchannel)))
(define (place-channel-get pch)
(sync never-evt))
(define (place-channel-put pch v)
(void))
(define (place-message-allowed? v)
#t)
;; ----------------------------------------
(define (place-shared? v)
#f)
(define (unsafe-add-post-custodian-shutdown proc)
(atomically
(set-place-post-shutdown! current-place
(cons proc
(place-post-shutdown current-place)))))

View File

@ -1,5 +1,5 @@
#lang racket/base
(require racket/private/place-local
(require "place-local.rkt"
"check.rkt"
"tree.rkt"
"internal-error.rkt"

View File

@ -1,7 +1,7 @@
#lang racket/base
(require racket/private/place-local
(require "place-local.rkt"
"atomic.rkt"
"engine.rkt"
"host.rkt"
"internal-error.rkt"
"sandman.rkt"
"parameter.rkt"

View File

@ -1,5 +1,5 @@
#lang racket/base
(require racket/private/place-local
(require "place-local.rkt"
"check.rkt"
"internal-error.rkt"
"atomic.rkt")

View File

@ -1,9 +1,9 @@
#lang racket/base
(require racket/private/place-local
"../common/queue.rkt"
(require "../common/queue.rkt"
"place-local.rkt"
"check.rkt"
"internal-error.rkt"
"engine.rkt"
"host.rkt"
"sandman.rkt"
"parameter.rkt"
"evt.rkt"

View File

@ -1,14 +1,14 @@
#lang racket/base
(require "check.rkt"
(submod "thread.rkt" scheduling)
(prefix-in engine: "engine.rkt"))
(prefix-in host: "host.rkt"))
(provide current-process-milliseconds
set-get-subprocesses-time!)
(define/who (current-process-milliseconds [scope #f])
(cond
[(not scope) (engine:current-process-milliseconds)]
[(not scope) (host:current-process-milliseconds)]
[(thread? scope) (thread-cpu-time scope)]
[(eq? scope 'subprocesses) (get-subprocesses-time)]
[else

View File

@ -1,7 +1,7 @@
#lang racket/base
(require "check.rkt"
"atomic.rkt"
"engine.rkt"
"host.rkt"
"evt.rkt"
"sync.rkt"
"semaphore.rkt")