From b8970a439bb15f9ea715b9f48e03bd9a4d763e3f Mon Sep 17 00:00:00 2001 From: Jay McCarthy Date: Thu, 25 Jun 2009 15:19:20 +0000 Subject: [PATCH] Converting to manager/async-channels for mailboxes svn: r15267 --- collects/frtime/erl.ss | 484 +++++++------------------------------ collects/frtime/mailbox.ss | 77 ++++++ 2 files changed, 161 insertions(+), 400 deletions(-) create mode 100644 collects/frtime/mailbox.ss diff --git a/collects/frtime/erl.ss b/collects/frtime/erl.ss index fbf6e16949..cda1ddb621 100644 --- a/collects/frtime/erl.ss +++ b/collects/frtime/erl.ss @@ -1,405 +1,89 @@ -(module erl mzscheme - - (require mzlib/list - mzlib/thread - mzlib/etc - net/dns - scheme/match) - - (define (with-semaphore s thunk) - (semaphore-wait s) - (let ([result (thunk)]) - (semaphore-post s) - result)) - -#| - (define free-cons-cells - (box empty)) - - (define alloc-sem (make-semaphore 1)) - - (define (Mcons a d) - (with-semaphore - alloc-sem - (lambda () - (let ([start (unbox free-cons-cells)]) - (if (empty? start) - (cons a d) - (begin - (set-box! free-cons-cells (rest start)) - (set-car! start a) - (set-cdr! start d) - start)))))) - - (define (release c) - (set-cdr! c (unbox free-cons-cells)) - (set-box! free-cons-cells c)) -|# +#lang scheme +(require "mailbox.ss") - (define Mcons mcons) - (define release void) - - ; for thread ids, port is the TCP port number (not to be confused with MzScheme ports) - (define-values (listener port) - ; for the time being, disable distribution - (values never-evt 1178)) - - (define (find-listener/port num-retries) - (parameterize ([current-pseudo-random-generator (make-pseudo-random-generator)]) - (let loop ([i 0]) - (with-handlers* ([(lambda (exn) - (and (exn:fail:network? exn) - (< i num-retries))) - (lambda (_) (loop (add1 i)))]) - (let ([port (+ 1024 (random 64000))]) - (values (tcp-listen port) port)))))) - - (define ip-address '127.0.0.1 - #;(let*-values - ([(sub-proc in-p dummy1 dummy2) (subprocess #f #f #f "/bin/hostname" "-i")] - [(ip-address) (read in-p)]) - (subprocess-wait sub-proc) - (if (eof-object? ip-address) - '127.0.0.1 - ip-address))) - - (define my-ip:port - (string->symbol (format "~a:~a" ip-address port))) - - (define dns #f - #;(dns-find-nameserver)) - - (define ip-regexp - (regexp "[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?\\.[0-9][0-9]?[0-9]?")) - - ; a tid is a (vector 'tid symbol(ip:port) symbol(local-id)) - - (define make-tid - (case-lambda - [(thr) (vector 'tid my-ip:port thr)] - [(port thr) (vector 'tid (string->symbol (format "~a:~a" ip-address port)) thr)] - [(host port thr) (vector 'tid - (string->symbol - (format - "~a:~a" - (if (regexp-match ip-regexp (symbol->string host)) - host - (string->symbol (dns-get-address dns (symbol->string host)))) - port)) - thr)])) - - (define (tid-ip tid) - (vector-ref tid 1)) - - (define (tid-lid tid) - (vector-ref tid 2)) - - (define (tid? x) - (and (vector? x) - (= (vector-length x) 3) - (eq? (vector-ref x 0) 'tid) - (symbol? (vector-ref x 1)) - (symbol? (vector-ref x 2)))) - - ; We need a mapping from MzScheme's tids to our tids (just for `self') - ; and a mapping from symbols to mailboxes (for local threads). - ; A special thread is responsible for all communication with external threads. - ; All processes spawned on a node have same ip-address. - - (define tids - (make-hash-table 'weak)) - - (define mailboxes - (make-hash-table)) - - (define-struct mailbox (old-head old-last head tail sem-count sem-space lock-enqueue)) - - ; XXX This should be removed in preference to just an exception - (define match-fail - (let () - (define-struct match-failure ()) - (make-match-failure))) - (define (matcher->matcher/fail m) - (lambda (x) - (with-handlers ([exn:misc:match? - (lambda (x) - match-fail)]) - (m x)))) - - (define (try-extract m l) - (let loop ([prev l] [cur (mcdr l)]) - (if (empty? (mcdr cur)) - match-fail - (let ([v (m (mcar cur))]) - (if (eq? v match-fail) - (loop cur (mcdr cur)) - (begin - (set-mcdr! prev (mcdr cur)) - (release cur) - v)))))) - - (define (receive-help timeout timeout-thunk matcher) - ;(if (and timeout (negative? timeout)) - ;(timeout-thunk) - (let* ([start-time (current-inexact-milliseconds)] - [mb (hash-table-get mailboxes (tid-lid (self)))] - [val (try-extract matcher (mailbox-old-head mb))]) - (if (eq? val match-fail) - (let loop () - (let* ([elapsed (- (current-inexact-milliseconds) start-time)] - [wait-time (cond - [(not timeout) false] - [(> elapsed timeout) 0] - [else (/ (- timeout elapsed) 1000.0)])] - [val (sync/timeout wait-time (mailbox-sem-count mb))]) - (if val - (let* ([oldhead (mailbox-head mb)] - [msg (mcar oldhead)] - [val (begin - (set-mailbox-head! mb (mcdr oldhead)) - (release oldhead) - (semaphore-post (mailbox-sem-space mb)) - (matcher msg))]) - (if (eq? val match-fail) - (let ([new-last (Mcons empty empty)] - [old-last (mailbox-old-last mb)]) - (set-mcar! old-last msg) - (set-mcdr! old-last new-last) - (set-mailbox-old-last! mb new-last) - (loop)) - (val))) - (timeout-thunk)))) - (val))));) - - (define-syntax receive - (syntax-rules (after) - [(_ (after timeout to-expr ...) (pat expr ...) ...) - (let* ([matcher (match-lambda (pat (lambda () expr ...)) ...)] - [timeout-thunk (lambda () to-expr ...)]) - (receive-help timeout timeout-thunk (matcher->matcher/fail matcher)))] - [(_ clause ...) (receive (after false (void)) clause ...)])) - - ; must ensure name not already taken - (define (spawn/name-help thunk name) - (if (hash-table-get mailboxes name (lambda () #f)) - #f - (let ([new-tid (make-tid name)] - [parent-tid (self)]) - (thread - (lambda () - (hash-table-put! tids (current-thread) new-tid) - (hash-table-put! mailboxes name (new-mailbox)) - (! parent-tid new-tid) - (thunk))) - (receive [(? (lambda (m) (equal? m new-tid))) new-tid])))) - - (define last-thread 1) - - (define next-thread - (let ([lock (make-semaphore 1)]) - (lambda () - (with-semaphore - lock +(define-struct tid (lid) #:prefab) +(define (create-tid thr) (make-tid thr)) + +; We need a mapping from MzScheme's tids to our tids (just for `self') +; and a mapping from symbols to mailboxes (for local threads). + +(define tids (make-weak-hash)) +(define mailboxes (make-hash)) + +(define (do-receive timeout timeout-thunk matcher) + (define mb (hash-ref mailboxes (tid-lid (self)))) + (define timeout-evt + (if timeout + (alarm-evt (+ (current-inexact-milliseconds) timeout)) + never-evt)) + (define val-thunk (mailbox-receive mb timeout-evt timeout-thunk matcher)) + (val-thunk)) + +(define-syntax receive + (syntax-rules (after) + [(_ (after timeout to-expr ...) (pat expr ...) ...) + (do-receive + timeout + (lambda () to-expr ...) + (match-lambda (pat (lambda () expr ...)) ...))] + [(_ clause ...) (receive (after false (void)) clause ...)])) + +; must ensure name not already taken +(define (spawn/name-help thunk name) + (if (hash-ref mailboxes name (lambda () #f)) + #f + (let ([new-tid (create-tid name)] + [parent-tid (self)]) + (thread (lambda () - (begin0 - last-thread - (set! last-thread (add1 last-thread)))))))) - - (define-syntax spawn - (syntax-rules () - [(_ expr ...) (spawn/name-help (lambda () expr ...) - (string->symbol - (string-append "thread" (number->string (next-thread)))))])) - - (define-syntax spawn/name - (syntax-rules () - [(_ name expr ...) (spawn/name-help (lambda () expr ...) name)])) - - (define (new-mailbox) - (let* ([sentinel (Mcons empty empty)] - [old-sentinel (Mcons empty empty)] - [old-head (Mcons empty old-sentinel)]) - (make-mailbox old-head - old-sentinel - sentinel - sentinel - (make-semaphore) - (make-semaphore 1000) - (make-semaphore 1)))) - - (define main (make-tid 'main)) - (hash-table-put! tids (current-thread) main) - (hash-table-put! mailboxes (tid-lid main) (new-mailbox)) - - (define forward-mailbox (new-mailbox)) - - (define (split-string-at str c) - (let loop ([i 0]) - (if (char=? (string-ref str i) c) - (values (substring str 0 i) (substring str (add1 i))) - (loop (add1 i))))) + (hash-set! tids (current-thread) new-tid) + (hash-set! mailboxes name (new-mailbox)) + (! parent-tid new-tid) + (thunk))) + (receive [(? (lambda (m) (equal? m new-tid))) new-tid])))) - (define (report-exn exn) - (fprintf (current-error-port) "erl.ss: ~a (~a)~n" exn (exn-message exn))) - - ; forwarder for remote communication - (thread +(define next-thread + (let ([last-thread 1] + [lock (make-semaphore 1)]) + (lambda () + (call-with-semaphore + lock + (lambda () + (begin0 + last-thread + (set! last-thread (add1 last-thread)))))))) +(define (next-thread-name) + (string->symbol + (string-append "thread" (number->string (next-thread))))) + +(define-syntax spawn/name + (syntax-rules () + [(_ name expr ...) + (spawn/name-help + (lambda () expr ...) + name)])) + +(define (! tid msg) + (define mb (hash-ref mailboxes (tid-lid tid) (lambda () false))) + (when mb + (send-msg mb msg))) + +(define (send-msg mbox msg) + (mailbox-send! mbox msg)) + +(define (self) + (hash-ref! + tids (current-thread) + ; allows thread not created by spawn to receive messages (lambda () - (let* ([in-ports (make-hash-table)] ; set of input ports - [out-ports (make-hash-table)] ; symbol(ip:port) -> output port - [mk-wait-set (lambda () (apply choice-evt - (hash-table-map in-ports (lambda (key val) key))))] - [try-connect (lambda (ip:port) - (with-handlers ([exn? (lambda (exn) (report-exn exn) false)]) - (let*-values ([(ip-str port-str) (split-string-at - (symbol->string ip:port) - #\:)] - [(in-p out-p) - (tcp-connect ip-str (string->number port-str))]) - (hash-table-put! in-ports in-p ip:port) - (hash-table-put! out-ports ip:port out-p) - (write (list my-ip:port) out-p) - out-p)))]) - (let loop ([wait-set (mk-wait-set)]) - ;(printf "have connections to ~a~n" (hash-table-map in-ports (lambda (k v) k))) - (let ([val (sync (mailbox-sem-count forward-mailbox) - listener wait-set)]) - (cond - [(tcp-listener? val) - (with-handlers ([exn? (lambda (exn) (loop wait-set))]) - (let*-values ([(in-p out-p) (tcp-accept listener)] - [(remote-ip:port) (first (read in-p))]) - (hash-table-put! out-ports remote-ip:port out-p) - (hash-table-put! in-ports in-p remote-ip:port)) - (loop (mk-wait-set)))] - [(input-port? val) - (match (with-handlers ([exn? (lambda (exn) (report-exn exn) eof)]) - (read val)) - [(list lid msg) - ; forward to local mailbox - (let ([mb (hash-table-get mailboxes lid (lambda () false))]) - (when mb (send-msg mb msg))) - (loop wait-set)] - [(? eof-object?) - ; close input port, remove from hash table - (close-input-port val) - (hash-table-remove! in-ports val) - (loop (mk-wait-set))])] - [else ; val was the mailbox semaphore - (match (mcar (mailbox-head forward-mailbox)) - ;['quit (void)] - [(list (vector 'tid ip:port lid) msg) - (let inner ([out-p (hash-table-get - out-ports ip:port - (lambda () - (begin0 - (try-connect ip:port) - (set! wait-set (mk-wait-set)))))]) - (when out-p - ; need to deal with closed ports here too - (with-handlers ([exn:fail? - (lambda (_) - (hash-table-remove! out-ports ip:port) - (let ([res (try-connect ip:port)]) - (set! wait-set (mk-wait-set)) - (inner res)))]) - (write (list lid msg) out-p)))) - (set-mailbox-head! forward-mailbox (mcdr (mailbox-head forward-mailbox))) - (semaphore-post (mailbox-sem-space forward-mailbox)) - (loop wait-set)])])))))) - - #| - (define (stop-network) - (when network-up? - (send-msg forward-mailbox 'quit) - (set! network-up? #f))) - |# - - (define (local? tid) - (symbol=? (tid-ip tid) my-ip:port)) - - (define (! tid msg) - (if (local? tid) - (let ([mb (hash-table-get mailboxes (tid-lid tid) (lambda () false))]) - (when mb - (send-msg mb msg))) - (send-msg forward-mailbox (list tid msg)))) ; forward via special thread - - (define (send-msg mbox msg) - (with-semaphore - (mailbox-lock-enqueue mbox) - (lambda () - (let ([newtail (Mcons empty empty)] - [oldtail (mailbox-tail mbox)]) - (set-mcar! oldtail msg) - (set-mcdr! oldtail newtail) - (set-mailbox-tail! mbox newtail) - (semaphore-wait (mailbox-sem-space mbox)) - (semaphore-post (mailbox-sem-count mbox)))))) - - (define (self) - (hash-table-get tids (current-thread) - ; allows thread not created by spawn to receive messages - (lambda () - (let* ([name (string->symbol - (string-append "thread" (number->string (next-thread))))] - [new-tid (make-tid name)]) - (hash-table-put! tids (current-thread) new-tid) - (hash-table-put! mailboxes name (new-mailbox)) - new-tid)))) - - (define (!! msg) - (let ([mb (hash-table-get mailboxes (tid-lid (self)) (lambda () false))]) - (if mb - (let ([new-last (Mcons empty empty)] - [old-last (mailbox-old-last mb)]) - (set-mcar! old-last msg) - (set-mcdr! old-last new-last) - (set-mailbox-old-last! mb new-last))))) - - (define (mybox) - (hash-table-get mailboxes (self))) - - (provide - ; mailboxes - ; mybox - ; (struct mailbox (old-head old-last channel)) - ; allocations - ; free-cons-cells - ; my-ip:port - make-tid - tid? - spawn - spawn/name - ! - !! - receive - self)) + (define name (next-thread-name)) + (define new-tid (create-tid name)) + (hash-set! mailboxes name (new-mailbox)) + new-tid))) -#| -(require erl) - -(define (send-loop n) - (let ([me (self)]) - (let loop ([i 0]) - (if (>= i n) - void - (begin - (! me true) - (loop (+ i 1))))))) - -(define (send-loop2 n) - (let loop ([i 0]) - (if (>= i n) - void - (begin - (!! true) - (loop (+ i 1)))))) - -(define (flush-queue) - (let recur () - (receive [after 0 void] - [_ (recur)]))) - -(define (mybox) (hash-table-get mailboxes (self))) -|# +(provide + spawn/name + receive) +(provide/contract + [! (tid? any/c . -> . void)] + [self (-> tid?)]) \ No newline at end of file diff --git a/collects/frtime/mailbox.ss b/collects/frtime/mailbox.ss new file mode 100644 index 0000000000..2f129cbe7d --- /dev/null +++ b/collects/frtime/mailbox.ss @@ -0,0 +1,77 @@ +#lang scheme +(require scheme/async-channel) + +; XXX More efficient structure +(define (snoc x l) (append l (list x))) + +; Define mailboxes +(define-struct mailbox (manager control msgs)) +(define (new-mailbox) + (define control-ch (make-channel)) + (define msgs-ch (make-async-channel)) + ; Try to match one message + (define (try-to-match req msg) + (match req + [(struct receive (reply-ch _ _ matcher)) + (with-handlers ([exn:misc:match? (lambda (x) #f)]) + (define the-match-thunk (matcher msg)) + ; XXX Handle partner's death + (channel-put reply-ch the-match-thunk) + #t)])) + ; Try to match a list of messages + (define (try-to-match* req msgs) + (match msgs + [(list) (error 'try-to-match* "No matches")] + [(list-rest msg msgs) + (if (try-to-match req msg) + msgs + (list* msg (try-to-match* req msgs)))])) + ; Accept new messages until we need to match one + (define (not-on-receive msgs) + (sync (handle-evt msgs-ch + (lambda (new-msg) + (not-on-receive (snoc new-msg msgs)))) + (handle-evt control-ch + (lambda (req) + (with-handlers ([exn? (lambda (x) (waiting-for-matching req msgs))]) + (define new-msgs (try-to-match* req msgs)) + ; One worked + (not-on-receive new-msgs)))))) + ; Waiting for a message that will match + (define (waiting-for-matching req msgs) + (match req + [(struct receive (reply-ch timeout-evt timeout-thunk _)) + (sync (handle-evt timeout-evt + (lambda (_) + (channel-put reply-ch timeout-thunk) + (not-on-receive msgs))) + (handle-evt msgs-ch + (lambda (new-msg) + (if (try-to-match req new-msg) + (not-on-receive msgs) + (waiting-for-matching req (snoc new-msg msgs))))))])) + (define manager + (thread + (lambda () + (not-on-receive empty)))) + (make-mailbox manager control-ch msgs-ch)) + +(define-struct receive (reply-ch timeout timeout-thunk matcher)) +(define (mailbox-send! mb msg) + (match mb + [(struct mailbox (thd _ msgs)) + (thread-resume thd) + (async-channel-put msgs msg)])) +(define (mailbox-receive mb timeout-evt timeout-thunk matcher) + (match mb + [(struct mailbox (thd control _)) + (define reply-ch (make-channel)) + (thread-resume thd) + (channel-put control (make-receive reply-ch timeout-evt timeout-thunk matcher)) + (channel-get reply-ch)])) + +(provide/contract + [mailbox? (any/c . -> . boolean?)] + [new-mailbox (-> mailbox?)] + [mailbox-send! (mailbox? any/c . -> . void)] + [mailbox-receive (mailbox? evt? (-> any) (any/c . -> . (-> any)) . -> . (-> any))]) \ No newline at end of file