394 lines
14 KiB
Scheme
394 lines
14 KiB
Scheme
(module erl mzscheme
|
|
|
|
(require mzlib/list
|
|
mzlib/thread
|
|
mzlib/etc
|
|
net/dns
|
|
"mymatch.ss")
|
|
|
|
(define (with-semaphore s thunk)
|
|
(semaphore-wait s)
|
|
(let ([result (thunk)])
|
|
(semaphore-post s)
|
|
result))
|
|
|
|
#|
|
|
(define free-cons-cells
|
|
(box empty))
|
|
|
|
(define alloc-sem (make-semaphore 1))
|
|
|
|
(define (Mcons a d)
|
|
(with-semaphore
|
|
alloc-sem
|
|
(lambda ()
|
|
(let ([start (unbox free-cons-cells)])
|
|
(if (empty? start)
|
|
(cons a d)
|
|
(begin
|
|
(set-box! free-cons-cells (rest start))
|
|
(set-car! start a)
|
|
(set-cdr! start d)
|
|
start))))))
|
|
|
|
(define (release c)
|
|
(set-cdr! c (unbox free-cons-cells))
|
|
(set-box! free-cons-cells c))
|
|
|#
|
|
|
|
(define Mcons mcons)
|
|
(define release void)
|
|
|
|
; for thread ids, port is the TCP port number (not to be confused with MzScheme ports)
|
|
(define-values (listener port)
|
|
; for the time being, disable distribution
|
|
(values never-evt 1178))
|
|
|
|
(define (find-listener/port num-retries)
|
|
(parameterize ([current-pseudo-random-generator (make-pseudo-random-generator)])
|
|
(let loop ([i 0])
|
|
(with-handlers* ([(lambda (exn)
|
|
(and (exn:fail:network? exn)
|
|
(< i num-retries)))
|
|
(lambda (_) (loop (add1 i)))])
|
|
(let ([port (+ 1024 (random 64000))])
|
|
(values (tcp-listen port) port))))))
|
|
|
|
(define ip-address '127.0.0.1
|
|
#;(let*-values
|
|
([(sub-proc in-p dummy1 dummy2) (subprocess #f #f #f "/bin/hostname" "-i")]
|
|
[(ip-address) (read in-p)])
|
|
(subprocess-wait sub-proc)
|
|
(if (eof-object? ip-address)
|
|
'127.0.0.1
|
|
ip-address)))
|
|
|
|
(define my-ip:port
|
|
(string->symbol (format "~a:~a" ip-address port)))
|
|
|
|
(define dns #f
|
|
#;(dns-find-nameserver))
|
|
|
|
(define ip-regexp
|
|
(regexp "[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?"))
|
|
|
|
; a tid is a (vector 'tid symbol(ip:port) symbol(local-id))
|
|
|
|
(define make-tid
|
|
(case-lambda
|
|
[(thr) (vector 'tid my-ip:port thr)]
|
|
[(port thr) (vector 'tid (string->symbol (format "~a:~a" ip-address port)) thr)]
|
|
[(host port thr) (vector 'tid
|
|
(string->symbol
|
|
(format
|
|
"~a:~a"
|
|
(if (regexp-match ip-regexp (symbol->string host))
|
|
host
|
|
(string->symbol (dns-get-address dns (symbol->string host))))
|
|
port))
|
|
thr)]))
|
|
|
|
(define (tid-ip tid)
|
|
(vector-ref tid 1))
|
|
|
|
(define (tid-lid tid)
|
|
(vector-ref tid 2))
|
|
|
|
(define (tid? x)
|
|
(and (vector? x)
|
|
(= (vector-length x) 3)
|
|
(eq? (vector-ref x 0) 'tid)
|
|
(symbol? (vector-ref x 1))
|
|
(symbol? (vector-ref x 2))))
|
|
|
|
; We need a mapping from MzScheme's tids to our tids (just for `self')
|
|
; and a mapping from symbols to mailboxes (for local threads).
|
|
; A special thread is responsible for all communication with external threads.
|
|
; All processes spawned on a node have same ip-address.
|
|
|
|
(define tids
|
|
(make-hash-table 'weak))
|
|
|
|
(define mailboxes
|
|
(make-hash-table))
|
|
|
|
(define-struct mailbox (old-head old-last head tail sem-count sem-space lock-enqueue))
|
|
|
|
(define (try-extract m l)
|
|
(let loop ([prev l] [cur (mcdr l)])
|
|
(if (empty? (mcdr cur))
|
|
match-fail
|
|
(let ([v (m (mcar cur))])
|
|
(if (eq? v match-fail)
|
|
(loop cur (mcdr cur))
|
|
(begin
|
|
(set-mcdr! prev (mcdr cur))
|
|
(release cur)
|
|
v))))))
|
|
|
|
(define (receive-help timeout timeout-thunk matcher)
|
|
;(if (and timeout (negative? timeout))
|
|
;(timeout-thunk)
|
|
(let* ([start-time (current-inexact-milliseconds)]
|
|
[mb (hash-table-get mailboxes (tid-lid (self)))]
|
|
[val (try-extract matcher (mailbox-old-head mb))])
|
|
(if (eq? val match-fail)
|
|
(let loop ()
|
|
(let* ([elapsed (- (current-inexact-milliseconds) start-time)]
|
|
[wait-time (cond
|
|
[(not timeout) false]
|
|
[(> elapsed timeout) 0]
|
|
[else (/ (- timeout elapsed) 1000.0)])]
|
|
[val (sync/timeout wait-time (mailbox-sem-count mb))])
|
|
(if val
|
|
(let* ([oldhead (mailbox-head mb)]
|
|
[msg (mcar oldhead)]
|
|
[val (begin
|
|
(set-mailbox-head! mb (mcdr oldhead))
|
|
(release oldhead)
|
|
(semaphore-post (mailbox-sem-space mb))
|
|
(matcher msg))])
|
|
(if (eq? val match-fail)
|
|
(let ([new-last (Mcons empty empty)]
|
|
[old-last (mailbox-old-last mb)])
|
|
(set-mcar! old-last msg)
|
|
(set-mcdr! old-last new-last)
|
|
(set-mailbox-old-last! mb new-last)
|
|
(loop))
|
|
(val)))
|
|
(timeout-thunk))))
|
|
(val))));)
|
|
|
|
(define-syntax receive
|
|
(syntax-rules (after)
|
|
[(_ (after timeout to-expr ...) (pat expr ...) ...)
|
|
(let* ([matcher (match-lambda (pat (lambda () expr ...)) ...)]
|
|
[timeout-thunk (lambda () to-expr ...)])
|
|
(receive-help timeout timeout-thunk matcher))]
|
|
[(_ clause ...) (receive (after false (void)) clause ...)]))
|
|
|
|
; must ensure name not already taken
|
|
(define (spawn/name-help thunk name)
|
|
(if (hash-table-get mailboxes name (lambda () #f))
|
|
#f
|
|
(let ([new-tid (make-tid name)]
|
|
[parent-tid (self)])
|
|
(thread
|
|
(lambda ()
|
|
(hash-table-put! tids (current-thread) new-tid)
|
|
(hash-table-put! mailboxes name (new-mailbox))
|
|
(! parent-tid new-tid)
|
|
(thunk)))
|
|
(receive [(? (lambda (m) (equal? m new-tid))) new-tid]))))
|
|
|
|
(define last-thread 1)
|
|
|
|
(define next-thread
|
|
(let ([lock (make-semaphore 1)])
|
|
(lambda ()
|
|
(with-semaphore
|
|
lock
|
|
(lambda ()
|
|
(begin0
|
|
last-thread
|
|
(set! last-thread (add1 last-thread))))))))
|
|
|
|
(define-syntax spawn
|
|
(syntax-rules ()
|
|
[(_ expr ...) (spawn/name-help (lambda () expr ...)
|
|
(string->symbol
|
|
(string-append "thread" (number->string (next-thread)))))]))
|
|
|
|
(define-syntax spawn/name
|
|
(syntax-rules ()
|
|
[(_ name expr ...) (spawn/name-help (lambda () expr ...) name)]))
|
|
|
|
(define (new-mailbox)
|
|
(let* ([sentinel (Mcons empty empty)]
|
|
[old-sentinel (Mcons empty empty)]
|
|
[old-head (Mcons empty old-sentinel)])
|
|
(make-mailbox old-head
|
|
old-sentinel
|
|
sentinel
|
|
sentinel
|
|
(make-semaphore)
|
|
(make-semaphore 1000)
|
|
(make-semaphore 1))))
|
|
|
|
(define main (make-tid 'main))
|
|
(hash-table-put! tids (current-thread) main)
|
|
(hash-table-put! mailboxes (tid-lid main) (new-mailbox))
|
|
|
|
(define forward-mailbox (new-mailbox))
|
|
|
|
(define (split-string-at str c)
|
|
(let loop ([i 0])
|
|
(if (char=? (string-ref str i) c)
|
|
(values (substring str 0 i) (substring str (add1 i)))
|
|
(loop (add1 i)))))
|
|
|
|
(define (report-exn exn)
|
|
(fprintf (current-error-port) "erl.ss: ~a (~a)~n" exn (exn-message exn)))
|
|
|
|
; forwarder for remote communication
|
|
(thread
|
|
(lambda ()
|
|
(let* ([in-ports (make-hash-table)] ; set of input ports
|
|
[out-ports (make-hash-table)] ; symbol(ip:port) -> output port
|
|
[mk-wait-set (lambda () (apply choice-evt
|
|
(hash-table-map in-ports (lambda (key val) key))))]
|
|
[try-connect (lambda (ip:port)
|
|
(with-handlers ([exn? (lambda (exn) (report-exn exn) false)])
|
|
(let*-values ([(ip-str port-str) (split-string-at
|
|
(symbol->string ip:port)
|
|
#\:)]
|
|
[(in-p out-p)
|
|
(tcp-connect ip-str (string->number port-str))])
|
|
(hash-table-put! in-ports in-p ip:port)
|
|
(hash-table-put! out-ports ip:port out-p)
|
|
(write (list my-ip:port) out-p)
|
|
out-p)))])
|
|
(let loop ([wait-set (mk-wait-set)])
|
|
;(printf "have connections to ~a~n" (hash-table-map in-ports (lambda (k v) k)))
|
|
(let ([val (sync (mailbox-sem-count forward-mailbox)
|
|
listener wait-set)])
|
|
(cond
|
|
[(tcp-listener? val)
|
|
(with-handlers ([exn? (lambda (exn) (loop wait-set))])
|
|
(let*-values ([(in-p out-p) (tcp-accept listener)]
|
|
[(remote-ip:port) (first (read in-p))])
|
|
(hash-table-put! out-ports remote-ip:port out-p)
|
|
(hash-table-put! in-ports in-p remote-ip:port))
|
|
(loop (mk-wait-set)))]
|
|
[(input-port? val)
|
|
(match (with-handlers ([exn? (lambda (exn) (report-exn exn) eof)])
|
|
(read val))
|
|
[(lid msg)
|
|
; forward to local mailbox
|
|
(let ([mb (hash-table-get mailboxes lid (lambda () false))])
|
|
(when mb (send-msg mb msg)))
|
|
(loop wait-set)]
|
|
[(? eof-object?)
|
|
; close input port, remove from hash table
|
|
(close-input-port val)
|
|
(hash-table-remove! in-ports val)
|
|
(loop (mk-wait-set))])]
|
|
[else ; val was the mailbox semaphore
|
|
(match (mcar (mailbox-head forward-mailbox))
|
|
;['quit (void)]
|
|
[(#('tid ip:port lid) msg)
|
|
(let inner ([out-p (hash-table-get
|
|
out-ports ip:port
|
|
(lambda ()
|
|
(begin0
|
|
(try-connect ip:port)
|
|
(set! wait-set (mk-wait-set)))))])
|
|
(when out-p
|
|
; need to deal with closed ports here too
|
|
(with-handlers ([exn:fail?
|
|
(lambda (_)
|
|
(hash-table-remove! out-ports ip:port)
|
|
(let ([res (try-connect ip:port)])
|
|
(set! wait-set (mk-wait-set))
|
|
(inner res)))])
|
|
(write (list lid msg) out-p))))
|
|
(set-mailbox-head! forward-mailbox (mcdr (mailbox-head forward-mailbox)))
|
|
(semaphore-post (mailbox-sem-space forward-mailbox))
|
|
(loop wait-set)])]))))))
|
|
|
|
#|
|
|
(define (stop-network)
|
|
(when network-up?
|
|
(send-msg forward-mailbox 'quit)
|
|
(set! network-up? #f)))
|
|
|#
|
|
|
|
(define (local? tid)
|
|
(symbol=? (tid-ip tid) my-ip:port))
|
|
|
|
(define (! tid msg)
|
|
(if (local? tid)
|
|
(let ([mb (hash-table-get mailboxes (tid-lid tid) (lambda () false))])
|
|
(when mb
|
|
(send-msg mb msg)))
|
|
(send-msg forward-mailbox (list tid msg)))) ; forward via special thread
|
|
|
|
(define (send-msg mbox msg)
|
|
(with-semaphore
|
|
(mailbox-lock-enqueue mbox)
|
|
(lambda ()
|
|
(let ([newtail (Mcons empty empty)]
|
|
[oldtail (mailbox-tail mbox)])
|
|
(set-mcar! oldtail msg)
|
|
(set-mcdr! oldtail newtail)
|
|
(set-mailbox-tail! mbox newtail)
|
|
(semaphore-wait (mailbox-sem-space mbox))
|
|
(semaphore-post (mailbox-sem-count mbox))))))
|
|
|
|
(define (self)
|
|
(hash-table-get tids (current-thread)
|
|
; allows thread not created by spawn to receive messages
|
|
(lambda ()
|
|
(let* ([name (string->symbol
|
|
(string-append "thread" (number->string (next-thread))))]
|
|
[new-tid (make-tid name)])
|
|
(hash-table-put! tids (current-thread) new-tid)
|
|
(hash-table-put! mailboxes name (new-mailbox))
|
|
new-tid))))
|
|
|
|
(define (!! msg)
|
|
(let ([mb (hash-table-get mailboxes (tid-lid (self)) (lambda () false))])
|
|
(if mb
|
|
(let ([new-last (Mcons empty empty)]
|
|
[old-last (mailbox-old-last mb)])
|
|
(set-mcar! old-last msg)
|
|
(set-mcdr! old-last new-last)
|
|
(set-mailbox-old-last! mb new-last)))))
|
|
|
|
(define (mybox)
|
|
(hash-table-get mailboxes (self)))
|
|
|
|
(provide
|
|
; mailboxes
|
|
; mybox
|
|
; (struct mailbox (old-head old-last channel))
|
|
; allocations
|
|
; free-cons-cells
|
|
; my-ip:port
|
|
make-tid
|
|
tid?
|
|
spawn
|
|
spawn/name
|
|
!
|
|
!!
|
|
receive
|
|
self))
|
|
|
|
#|
|
|
(require erl)
|
|
|
|
(define (send-loop n)
|
|
(let ([me (self)])
|
|
(let loop ([i 0])
|
|
(if (>= i n)
|
|
void
|
|
(begin
|
|
(! me true)
|
|
(loop (+ i 1)))))))
|
|
|
|
(define (send-loop2 n)
|
|
(let loop ([i 0])
|
|
(if (>= i n)
|
|
void
|
|
(begin
|
|
(!! true)
|
|
(loop (+ i 1))))))
|
|
|
|
(define (flush-queue)
|
|
(let recur ()
|
|
(receive [after 0 void]
|
|
[_ (recur)])))
|
|
|
|
(define (mybox) (hash-table-get mailboxes (self)))
|
|
|#
|