Converting to manager/async-channels for mailboxes
svn: r15267
This commit is contained in:
parent
fab1ce34d9
commit
b8970a439b
|
@ -1,405 +1,89 @@
|
|||
(module erl mzscheme
|
||||
|
||||
(require mzlib/list
|
||||
mzlib/thread
|
||||
mzlib/etc
|
||||
net/dns
|
||||
scheme/match)
|
||||
|
||||
(define (with-semaphore s thunk)
|
||||
(semaphore-wait s)
|
||||
(let ([result (thunk)])
|
||||
(semaphore-post s)
|
||||
result))
|
||||
|
||||
#|
|
||||
(define free-cons-cells
|
||||
(box empty))
|
||||
|
||||
(define alloc-sem (make-semaphore 1))
|
||||
|
||||
(define (Mcons a d)
|
||||
(with-semaphore
|
||||
alloc-sem
|
||||
(lambda ()
|
||||
(let ([start (unbox free-cons-cells)])
|
||||
(if (empty? start)
|
||||
(cons a d)
|
||||
(begin
|
||||
(set-box! free-cons-cells (rest start))
|
||||
(set-car! start a)
|
||||
(set-cdr! start d)
|
||||
start))))))
|
||||
|
||||
(define (release c)
|
||||
(set-cdr! c (unbox free-cons-cells))
|
||||
(set-box! free-cons-cells c))
|
||||
|#
|
||||
#lang scheme
|
||||
(require "mailbox.ss")
|
||||
|
||||
(define Mcons mcons)
|
||||
(define release void)
|
||||
|
||||
; for thread ids, port is the TCP port number (not to be confused with MzScheme ports)
|
||||
(define-values (listener port)
|
||||
; for the time being, disable distribution
|
||||
(values never-evt 1178))
|
||||
|
||||
(define (find-listener/port num-retries)
|
||||
(parameterize ([current-pseudo-random-generator (make-pseudo-random-generator)])
|
||||
(let loop ([i 0])
|
||||
(with-handlers* ([(lambda (exn)
|
||||
(and (exn:fail:network? exn)
|
||||
(< i num-retries)))
|
||||
(lambda (_) (loop (add1 i)))])
|
||||
(let ([port (+ 1024 (random 64000))])
|
||||
(values (tcp-listen port) port))))))
|
||||
|
||||
(define ip-address '127.0.0.1
|
||||
#;(let*-values
|
||||
([(sub-proc in-p dummy1 dummy2) (subprocess #f #f #f "/bin/hostname" "-i")]
|
||||
[(ip-address) (read in-p)])
|
||||
(subprocess-wait sub-proc)
|
||||
(if (eof-object? ip-address)
|
||||
'127.0.0.1
|
||||
ip-address)))
|
||||
|
||||
(define my-ip:port
|
||||
(string->symbol (format "~a:~a" ip-address port)))
|
||||
|
||||
(define dns #f
|
||||
#;(dns-find-nameserver))
|
||||
|
||||
(define ip-regexp
|
||||
(regexp "[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?"))
|
||||
|
||||
; a tid is a (vector 'tid symbol(ip:port) symbol(local-id))
|
||||
|
||||
(define make-tid
|
||||
(case-lambda
|
||||
[(thr) (vector 'tid my-ip:port thr)]
|
||||
[(port thr) (vector 'tid (string->symbol (format "~a:~a" ip-address port)) thr)]
|
||||
[(host port thr) (vector 'tid
|
||||
(string->symbol
|
||||
(format
|
||||
"~a:~a"
|
||||
(if (regexp-match ip-regexp (symbol->string host))
|
||||
host
|
||||
(string->symbol (dns-get-address dns (symbol->string host))))
|
||||
port))
|
||||
thr)]))
|
||||
|
||||
(define (tid-ip tid)
|
||||
(vector-ref tid 1))
|
||||
|
||||
(define (tid-lid tid)
|
||||
(vector-ref tid 2))
|
||||
|
||||
(define (tid? x)
|
||||
(and (vector? x)
|
||||
(= (vector-length x) 3)
|
||||
(eq? (vector-ref x 0) 'tid)
|
||||
(symbol? (vector-ref x 1))
|
||||
(symbol? (vector-ref x 2))))
|
||||
|
||||
; We need a mapping from MzScheme's tids to our tids (just for `self')
|
||||
; and a mapping from symbols to mailboxes (for local threads).
|
||||
; A special thread is responsible for all communication with external threads.
|
||||
; All processes spawned on a node have same ip-address.
|
||||
|
||||
(define tids
|
||||
(make-hash-table 'weak))
|
||||
|
||||
(define mailboxes
|
||||
(make-hash-table))
|
||||
|
||||
(define-struct mailbox (old-head old-last head tail sem-count sem-space lock-enqueue))
|
||||
|
||||
; XXX This should be removed in preference to just an exception
|
||||
(define match-fail
|
||||
(let ()
|
||||
(define-struct match-failure ())
|
||||
(make-match-failure)))
|
||||
(define (matcher->matcher/fail m)
|
||||
(lambda (x)
|
||||
(with-handlers ([exn:misc:match?
|
||||
(lambda (x)
|
||||
match-fail)])
|
||||
(m x))))
|
||||
|
||||
(define (try-extract m l)
|
||||
(let loop ([prev l] [cur (mcdr l)])
|
||||
(if (empty? (mcdr cur))
|
||||
match-fail
|
||||
(let ([v (m (mcar cur))])
|
||||
(if (eq? v match-fail)
|
||||
(loop cur (mcdr cur))
|
||||
(begin
|
||||
(set-mcdr! prev (mcdr cur))
|
||||
(release cur)
|
||||
v))))))
|
||||
|
||||
(define (receive-help timeout timeout-thunk matcher)
|
||||
;(if (and timeout (negative? timeout))
|
||||
;(timeout-thunk)
|
||||
(let* ([start-time (current-inexact-milliseconds)]
|
||||
[mb (hash-table-get mailboxes (tid-lid (self)))]
|
||||
[val (try-extract matcher (mailbox-old-head mb))])
|
||||
(if (eq? val match-fail)
|
||||
(let loop ()
|
||||
(let* ([elapsed (- (current-inexact-milliseconds) start-time)]
|
||||
[wait-time (cond
|
||||
[(not timeout) false]
|
||||
[(> elapsed timeout) 0]
|
||||
[else (/ (- timeout elapsed) 1000.0)])]
|
||||
[val (sync/timeout wait-time (mailbox-sem-count mb))])
|
||||
(if val
|
||||
(let* ([oldhead (mailbox-head mb)]
|
||||
[msg (mcar oldhead)]
|
||||
[val (begin
|
||||
(set-mailbox-head! mb (mcdr oldhead))
|
||||
(release oldhead)
|
||||
(semaphore-post (mailbox-sem-space mb))
|
||||
(matcher msg))])
|
||||
(if (eq? val match-fail)
|
||||
(let ([new-last (Mcons empty empty)]
|
||||
[old-last (mailbox-old-last mb)])
|
||||
(set-mcar! old-last msg)
|
||||
(set-mcdr! old-last new-last)
|
||||
(set-mailbox-old-last! mb new-last)
|
||||
(loop))
|
||||
(val)))
|
||||
(timeout-thunk))))
|
||||
(val))));)
|
||||
|
||||
(define-syntax receive
|
||||
(syntax-rules (after)
|
||||
[(_ (after timeout to-expr ...) (pat expr ...) ...)
|
||||
(let* ([matcher (match-lambda (pat (lambda () expr ...)) ...)]
|
||||
[timeout-thunk (lambda () to-expr ...)])
|
||||
(receive-help timeout timeout-thunk (matcher->matcher/fail matcher)))]
|
||||
[(_ clause ...) (receive (after false (void)) clause ...)]))
|
||||
|
||||
; must ensure name not already taken
|
||||
(define (spawn/name-help thunk name)
|
||||
(if (hash-table-get mailboxes name (lambda () #f))
|
||||
#f
|
||||
(let ([new-tid (make-tid name)]
|
||||
[parent-tid (self)])
|
||||
(thread
|
||||
(lambda ()
|
||||
(hash-table-put! tids (current-thread) new-tid)
|
||||
(hash-table-put! mailboxes name (new-mailbox))
|
||||
(! parent-tid new-tid)
|
||||
(thunk)))
|
||||
(receive [(? (lambda (m) (equal? m new-tid))) new-tid]))))
|
||||
|
||||
(define last-thread 1)
|
||||
|
||||
(define next-thread
|
||||
(let ([lock (make-semaphore 1)])
|
||||
(lambda ()
|
||||
(with-semaphore
|
||||
lock
|
||||
(define-struct tid (lid) #:prefab)
|
||||
(define (create-tid thr) (make-tid thr))
|
||||
|
||||
; We need a mapping from MzScheme's tids to our tids (just for `self')
|
||||
; and a mapping from symbols to mailboxes (for local threads).
|
||||
|
||||
(define tids (make-weak-hash))
|
||||
(define mailboxes (make-hash))
|
||||
|
||||
(define (do-receive timeout timeout-thunk matcher)
|
||||
(define mb (hash-ref mailboxes (tid-lid (self))))
|
||||
(define timeout-evt
|
||||
(if timeout
|
||||
(alarm-evt (+ (current-inexact-milliseconds) timeout))
|
||||
never-evt))
|
||||
(define val-thunk (mailbox-receive mb timeout-evt timeout-thunk matcher))
|
||||
(val-thunk))
|
||||
|
||||
(define-syntax receive
|
||||
(syntax-rules (after)
|
||||
[(_ (after timeout to-expr ...) (pat expr ...) ...)
|
||||
(do-receive
|
||||
timeout
|
||||
(lambda () to-expr ...)
|
||||
(match-lambda (pat (lambda () expr ...)) ...))]
|
||||
[(_ clause ...) (receive (after false (void)) clause ...)]))
|
||||
|
||||
; must ensure name not already taken
|
||||
(define (spawn/name-help thunk name)
|
||||
(if (hash-ref mailboxes name (lambda () #f))
|
||||
#f
|
||||
(let ([new-tid (create-tid name)]
|
||||
[parent-tid (self)])
|
||||
(thread
|
||||
(lambda ()
|
||||
(begin0
|
||||
last-thread
|
||||
(set! last-thread (add1 last-thread))))))))
|
||||
|
||||
(define-syntax spawn
|
||||
(syntax-rules ()
|
||||
[(_ expr ...) (spawn/name-help (lambda () expr ...)
|
||||
(string->symbol
|
||||
(string-append "thread" (number->string (next-thread)))))]))
|
||||
|
||||
(define-syntax spawn/name
|
||||
(syntax-rules ()
|
||||
[(_ name expr ...) (spawn/name-help (lambda () expr ...) name)]))
|
||||
|
||||
(define (new-mailbox)
|
||||
(let* ([sentinel (Mcons empty empty)]
|
||||
[old-sentinel (Mcons empty empty)]
|
||||
[old-head (Mcons empty old-sentinel)])
|
||||
(make-mailbox old-head
|
||||
old-sentinel
|
||||
sentinel
|
||||
sentinel
|
||||
(make-semaphore)
|
||||
(make-semaphore 1000)
|
||||
(make-semaphore 1))))
|
||||
|
||||
(define main (make-tid 'main))
|
||||
(hash-table-put! tids (current-thread) main)
|
||||
(hash-table-put! mailboxes (tid-lid main) (new-mailbox))
|
||||
|
||||
(define forward-mailbox (new-mailbox))
|
||||
|
||||
(define (split-string-at str c)
|
||||
(let loop ([i 0])
|
||||
(if (char=? (string-ref str i) c)
|
||||
(values (substring str 0 i) (substring str (add1 i)))
|
||||
(loop (add1 i)))))
|
||||
(hash-set! tids (current-thread) new-tid)
|
||||
(hash-set! mailboxes name (new-mailbox))
|
||||
(! parent-tid new-tid)
|
||||
(thunk)))
|
||||
(receive [(? (lambda (m) (equal? m new-tid))) new-tid]))))
|
||||
|
||||
(define (report-exn exn)
|
||||
(fprintf (current-error-port) "erl.ss: ~a (~a)~n" exn (exn-message exn)))
|
||||
|
||||
; forwarder for remote communication
|
||||
(thread
|
||||
(define next-thread
|
||||
(let ([last-thread 1]
|
||||
[lock (make-semaphore 1)])
|
||||
(lambda ()
|
||||
(call-with-semaphore
|
||||
lock
|
||||
(lambda ()
|
||||
(begin0
|
||||
last-thread
|
||||
(set! last-thread (add1 last-thread))))))))
|
||||
(define (next-thread-name)
|
||||
(string->symbol
|
||||
(string-append "thread" (number->string (next-thread)))))
|
||||
|
||||
(define-syntax spawn/name
|
||||
(syntax-rules ()
|
||||
[(_ name expr ...)
|
||||
(spawn/name-help
|
||||
(lambda () expr ...)
|
||||
name)]))
|
||||
|
||||
(define (! tid msg)
|
||||
(define mb (hash-ref mailboxes (tid-lid tid) (lambda () false)))
|
||||
(when mb
|
||||
(send-msg mb msg)))
|
||||
|
||||
(define (send-msg mbox msg)
|
||||
(mailbox-send! mbox msg))
|
||||
|
||||
(define (self)
|
||||
(hash-ref!
|
||||
tids (current-thread)
|
||||
; allows thread not created by spawn to receive messages
|
||||
(lambda ()
|
||||
(let* ([in-ports (make-hash-table)] ; set of input ports
|
||||
[out-ports (make-hash-table)] ; symbol(ip:port) -> output port
|
||||
[mk-wait-set (lambda () (apply choice-evt
|
||||
(hash-table-map in-ports (lambda (key val) key))))]
|
||||
[try-connect (lambda (ip:port)
|
||||
(with-handlers ([exn? (lambda (exn) (report-exn exn) false)])
|
||||
(let*-values ([(ip-str port-str) (split-string-at
|
||||
(symbol->string ip:port)
|
||||
#\:)]
|
||||
[(in-p out-p)
|
||||
(tcp-connect ip-str (string->number port-str))])
|
||||
(hash-table-put! in-ports in-p ip:port)
|
||||
(hash-table-put! out-ports ip:port out-p)
|
||||
(write (list my-ip:port) out-p)
|
||||
out-p)))])
|
||||
(let loop ([wait-set (mk-wait-set)])
|
||||
;(printf "have connections to ~a~n" (hash-table-map in-ports (lambda (k v) k)))
|
||||
(let ([val (sync (mailbox-sem-count forward-mailbox)
|
||||
listener wait-set)])
|
||||
(cond
|
||||
[(tcp-listener? val)
|
||||
(with-handlers ([exn? (lambda (exn) (loop wait-set))])
|
||||
(let*-values ([(in-p out-p) (tcp-accept listener)]
|
||||
[(remote-ip:port) (first (read in-p))])
|
||||
(hash-table-put! out-ports remote-ip:port out-p)
|
||||
(hash-table-put! in-ports in-p remote-ip:port))
|
||||
(loop (mk-wait-set)))]
|
||||
[(input-port? val)
|
||||
(match (with-handlers ([exn? (lambda (exn) (report-exn exn) eof)])
|
||||
(read val))
|
||||
[(list lid msg)
|
||||
; forward to local mailbox
|
||||
(let ([mb (hash-table-get mailboxes lid (lambda () false))])
|
||||
(when mb (send-msg mb msg)))
|
||||
(loop wait-set)]
|
||||
[(? eof-object?)
|
||||
; close input port, remove from hash table
|
||||
(close-input-port val)
|
||||
(hash-table-remove! in-ports val)
|
||||
(loop (mk-wait-set))])]
|
||||
[else ; val was the mailbox semaphore
|
||||
(match (mcar (mailbox-head forward-mailbox))
|
||||
;['quit (void)]
|
||||
[(list (vector 'tid ip:port lid) msg)
|
||||
(let inner ([out-p (hash-table-get
|
||||
out-ports ip:port
|
||||
(lambda ()
|
||||
(begin0
|
||||
(try-connect ip:port)
|
||||
(set! wait-set (mk-wait-set)))))])
|
||||
(when out-p
|
||||
; need to deal with closed ports here too
|
||||
(with-handlers ([exn:fail?
|
||||
(lambda (_)
|
||||
(hash-table-remove! out-ports ip:port)
|
||||
(let ([res (try-connect ip:port)])
|
||||
(set! wait-set (mk-wait-set))
|
||||
(inner res)))])
|
||||
(write (list lid msg) out-p))))
|
||||
(set-mailbox-head! forward-mailbox (mcdr (mailbox-head forward-mailbox)))
|
||||
(semaphore-post (mailbox-sem-space forward-mailbox))
|
||||
(loop wait-set)])]))))))
|
||||
|
||||
#|
|
||||
(define (stop-network)
|
||||
(when network-up?
|
||||
(send-msg forward-mailbox 'quit)
|
||||
(set! network-up? #f)))
|
||||
|#
|
||||
|
||||
(define (local? tid)
|
||||
(symbol=? (tid-ip tid) my-ip:port))
|
||||
|
||||
(define (! tid msg)
|
||||
(if (local? tid)
|
||||
(let ([mb (hash-table-get mailboxes (tid-lid tid) (lambda () false))])
|
||||
(when mb
|
||||
(send-msg mb msg)))
|
||||
(send-msg forward-mailbox (list tid msg)))) ; forward via special thread
|
||||
|
||||
(define (send-msg mbox msg)
|
||||
(with-semaphore
|
||||
(mailbox-lock-enqueue mbox)
|
||||
(lambda ()
|
||||
(let ([newtail (Mcons empty empty)]
|
||||
[oldtail (mailbox-tail mbox)])
|
||||
(set-mcar! oldtail msg)
|
||||
(set-mcdr! oldtail newtail)
|
||||
(set-mailbox-tail! mbox newtail)
|
||||
(semaphore-wait (mailbox-sem-space mbox))
|
||||
(semaphore-post (mailbox-sem-count mbox))))))
|
||||
|
||||
(define (self)
|
||||
(hash-table-get tids (current-thread)
|
||||
; allows thread not created by spawn to receive messages
|
||||
(lambda ()
|
||||
(let* ([name (string->symbol
|
||||
(string-append "thread" (number->string (next-thread))))]
|
||||
[new-tid (make-tid name)])
|
||||
(hash-table-put! tids (current-thread) new-tid)
|
||||
(hash-table-put! mailboxes name (new-mailbox))
|
||||
new-tid))))
|
||||
|
||||
(define (!! msg)
|
||||
(let ([mb (hash-table-get mailboxes (tid-lid (self)) (lambda () false))])
|
||||
(if mb
|
||||
(let ([new-last (Mcons empty empty)]
|
||||
[old-last (mailbox-old-last mb)])
|
||||
(set-mcar! old-last msg)
|
||||
(set-mcdr! old-last new-last)
|
||||
(set-mailbox-old-last! mb new-last)))))
|
||||
|
||||
(define (mybox)
|
||||
(hash-table-get mailboxes (self)))
|
||||
|
||||
(provide
|
||||
; mailboxes
|
||||
; mybox
|
||||
; (struct mailbox (old-head old-last channel))
|
||||
; allocations
|
||||
; free-cons-cells
|
||||
; my-ip:port
|
||||
make-tid
|
||||
tid?
|
||||
spawn
|
||||
spawn/name
|
||||
!
|
||||
!!
|
||||
receive
|
||||
self))
|
||||
(define name (next-thread-name))
|
||||
(define new-tid (create-tid name))
|
||||
(hash-set! mailboxes name (new-mailbox))
|
||||
new-tid)))
|
||||
|
||||
#|
|
||||
(require erl)
|
||||
|
||||
(define (send-loop n)
|
||||
(let ([me (self)])
|
||||
(let loop ([i 0])
|
||||
(if (>= i n)
|
||||
void
|
||||
(begin
|
||||
(! me true)
|
||||
(loop (+ i 1)))))))
|
||||
|
||||
(define (send-loop2 n)
|
||||
(let loop ([i 0])
|
||||
(if (>= i n)
|
||||
void
|
||||
(begin
|
||||
(!! true)
|
||||
(loop (+ i 1))))))
|
||||
|
||||
(define (flush-queue)
|
||||
(let recur ()
|
||||
(receive [after 0 void]
|
||||
[_ (recur)])))
|
||||
|
||||
(define (mybox) (hash-table-get mailboxes (self)))
|
||||
|#
|
||||
(provide
|
||||
spawn/name
|
||||
receive)
|
||||
(provide/contract
|
||||
[! (tid? any/c . -> . void)]
|
||||
[self (-> tid?)])
|
77
collects/frtime/mailbox.ss
Normal file
77
collects/frtime/mailbox.ss
Normal file
|
@ -0,0 +1,77 @@
|
|||
#lang scheme
|
||||
(require scheme/async-channel)
|
||||
|
||||
; XXX More efficient structure
|
||||
(define (snoc x l) (append l (list x)))
|
||||
|
||||
; Define mailboxes
|
||||
(define-struct mailbox (manager control msgs))
|
||||
(define (new-mailbox)
|
||||
(define control-ch (make-channel))
|
||||
(define msgs-ch (make-async-channel))
|
||||
; Try to match one message
|
||||
(define (try-to-match req msg)
|
||||
(match req
|
||||
[(struct receive (reply-ch _ _ matcher))
|
||||
(with-handlers ([exn:misc:match? (lambda (x) #f)])
|
||||
(define the-match-thunk (matcher msg))
|
||||
; XXX Handle partner's death
|
||||
(channel-put reply-ch the-match-thunk)
|
||||
#t)]))
|
||||
; Try to match a list of messages
|
||||
(define (try-to-match* req msgs)
|
||||
(match msgs
|
||||
[(list) (error 'try-to-match* "No matches")]
|
||||
[(list-rest msg msgs)
|
||||
(if (try-to-match req msg)
|
||||
msgs
|
||||
(list* msg (try-to-match* req msgs)))]))
|
||||
; Accept new messages until we need to match one
|
||||
(define (not-on-receive msgs)
|
||||
(sync (handle-evt msgs-ch
|
||||
(lambda (new-msg)
|
||||
(not-on-receive (snoc new-msg msgs))))
|
||||
(handle-evt control-ch
|
||||
(lambda (req)
|
||||
(with-handlers ([exn? (lambda (x) (waiting-for-matching req msgs))])
|
||||
(define new-msgs (try-to-match* req msgs))
|
||||
; One worked
|
||||
(not-on-receive new-msgs))))))
|
||||
; Waiting for a message that will match
|
||||
(define (waiting-for-matching req msgs)
|
||||
(match req
|
||||
[(struct receive (reply-ch timeout-evt timeout-thunk _))
|
||||
(sync (handle-evt timeout-evt
|
||||
(lambda (_)
|
||||
(channel-put reply-ch timeout-thunk)
|
||||
(not-on-receive msgs)))
|
||||
(handle-evt msgs-ch
|
||||
(lambda (new-msg)
|
||||
(if (try-to-match req new-msg)
|
||||
(not-on-receive msgs)
|
||||
(waiting-for-matching req (snoc new-msg msgs))))))]))
|
||||
(define manager
|
||||
(thread
|
||||
(lambda ()
|
||||
(not-on-receive empty))))
|
||||
(make-mailbox manager control-ch msgs-ch))
|
||||
|
||||
(define-struct receive (reply-ch timeout timeout-thunk matcher))
|
||||
(define (mailbox-send! mb msg)
|
||||
(match mb
|
||||
[(struct mailbox (thd _ msgs))
|
||||
(thread-resume thd)
|
||||
(async-channel-put msgs msg)]))
|
||||
(define (mailbox-receive mb timeout-evt timeout-thunk matcher)
|
||||
(match mb
|
||||
[(struct mailbox (thd control _))
|
||||
(define reply-ch (make-channel))
|
||||
(thread-resume thd)
|
||||
(channel-put control (make-receive reply-ch timeout-evt timeout-thunk matcher))
|
||||
(channel-get reply-ch)]))
|
||||
|
||||
(provide/contract
|
||||
[mailbox? (any/c . -> . boolean?)]
|
||||
[new-mailbox (-> mailbox?)]
|
||||
[mailbox-send! (mailbox? any/c . -> . void)]
|
||||
[mailbox-receive (mailbox? evt? (-> any) (any/c . -> . (-> any)) . -> . (-> any))])
|
Loading…
Reference in New Issue
Block a user