diff --git a/pkgs/racket-doc/scribblings/guide/concurrency.scrbl b/pkgs/racket-doc/scribblings/guide/concurrency.scrbl index ba2e8fe7e4..021797e0c1 100644 --- a/pkgs/racket-doc/scribblings/guide/concurrency.scrbl +++ b/pkgs/racket-doc/scribblings/guide/concurrency.scrbl @@ -1,10 +1,11 @@ #lang scribble/doc @(require scribble/manual - scribble/eval + scribble/examples "guide-utils.rkt" (for-label racket)) @(define concurrency-eval (make-base-eval)) +@(concurrency-eval '(require racket/contract racket/math)) @(define reference-doc '(lib "scribblings/reference/reference.scrbl")) @@ -268,7 +269,10 @@ There are other ways to synchronize threads. The @racket[sync] function allows threads to coordinate via @tech[#:doc reference-doc]{synchronizable events}. Many values double as events, allowing a uniform way to synchronize threads using different types. Examples of events include channels, ports, threads, -and alarms. +and alarms. This section builds up a number of examples that show how +the combination of events, threads, and @racket[sync] (along with recursive functions) +allow you to implement arbitrarily sophisticated communication protocols +to coordinate concurrent parts of a program. In the next example, a channel and an alarm are used as synchronizable events. The workers @racket[sync] on both so that they can process channel items until the @@ -445,3 +449,153 @@ that its handler is not called in tail position with respect to @racket[sync]. At the same time, @racket[wrap-evt] disables break exceptions during its handler's invocation. +@section{Building Your Own Synchronization Patterns} + +Events also allow you to encode many different communication +patterns between multiple concurrent parts of a program. One +common such pattern is producer-consumer. Here is a way to +implement on variation on it using the above ideas. Generally +speaking, these communication patterns are implemented via +a server loops that uses @racket[sync] to wait for any of +a number of different possibilities to occur and then +reacts them, updating some local state. + +@examples[ + #:eval concurrency-eval + #:label #f + (eval:no-prompt + (define/contract (produce x) + (-> any/c void?) + (channel-put producer-chan x))) + + (eval:no-prompt + (define/contract (consume) + (-> any/c) + (channel-get consumer-chan))) + + (code:comment "private state and server loop") +(eval:no-prompt + (define producer-chan (make-channel)) + (define consumer-chan (make-channel)) + (void + (thread + (λ () + (code:comment "the items variable holds the items that") + (code:comment "have been produced but not yet consumed") + (let loop ([items '()]) + (sync + + (code:comment "wait for production") + (handle-evt + producer-chan + (λ (i) + (code:comment "if that event was chosen,") + (code:comment "we add an item to our list") + (code:comment "and go back around the loop") + (loop (cons i items)))) + + (code:comment "wait for consumption, but only") + (code:comment "if we have something to produce") + (handle-evt + (if (null? items) + never-evt + (channel-put-evt consumer-chan (car items))) + (λ (_) + (code:comment "if that event was chosen,") + (code:comment "we know that the first item item") + (code:comment "has been consumed; drop it and") + (code:comment "and go back around the loop") + (loop (cdr items)))))))))) + + (code:comment "an example (non-deterministic) interaction") + (void + (thread (λ () (sleep (/ (random 10) 100)) (produce 1))) + (thread (λ () (sleep (/ (random 10) 100)) (produce 2)))) + (list (consume) (consume)) + ] + +It is possible to build up more complex synchronization patterns. Here is +a silly example where we extend the producer consumer with an operation +to wait until at least a certain number of items have been produced. + +@examples[ + #:eval concurrency-eval + #:label #f + + (eval:no-prompt + (define/contract (produce x) + (-> any/c void?) + (channel-put producer-chan x)) + + (define/contract (consume) + (-> any/c) + (channel-get consumer-chan)) + + (define/contract (wait-at-least n) + (-> natural? void?) + (define c (make-channel)) + (code:comment "we send a new channel over to the") + (code:comment "main loop so that we can wait here") + (channel-put wait-at-least-chan (cons n c)) + (channel-get c))) + + (eval:no-prompt + (define producer-chan (make-channel)) + (define consumer-chan (make-channel)) + (define wait-at-least-chan (make-channel)) + (void + (thread + (λ () + (let loop ([items '()] + [total-items-seen 0] + [waiters '()]) + (code:comment "instead of waiting on just production/") + (code:comment "consumption now we wait to learn about") + (code:comment "threads that want to wait for a certain") + (code:comment "number of elements to be reached") + (apply + sync + (handle-evt + producer-chan + (λ (i) (loop (cons i items) + (+ total-items-seen 1) + waiters))) + (handle-evt + (if (null? items) + never-evt + (channel-put-evt consumer-chan (car items))) + (λ (_) (loop (cdr items) total-items-seen waiters))) + + (code:comment "wait for threads that are interested") + (code:comment "the number of items produced") + (handle-evt + wait-at-least-chan + (λ (waiter) (loop items total-items-seen (cons waiter waiters)))) + + (code:comment "for each thread that wants to wait,") + (for/list ([waiter (in-list waiters)]) + (code:comment "we check to see if there has been enough") + (code:comment "production") + (cond + [(>= (car waiter) total-items-seen) + (code:comment "if so, we send a mesage back on the channel") + (code:comment "and continue the loop without that item") + (handle-evt + (channel-put-evt + (cdr waiter) + (void)) + (λ (_) (loop items total-items-seen (remove waiter waiters))))] + [else + (code:comment "otherwise, we just ignore that one") + never-evt])))))))) + + (code:comment "an example (non-deterministic) interaction") + (define thds + (for/list ([i (in-range 10)]) + (thread (λ () + (produce i) + (wait-at-least 10) + (display (format "~a -> ~a\n" i (consume))))))) + (for ([thd (in-list thds)]) + (thread-wait thd)) + ] diff --git a/pkgs/racket-doc/scribblings/reference/evts.scrbl b/pkgs/racket-doc/scribblings/reference/evts.scrbl index 8948c3db0d..0923f7531b 100644 --- a/pkgs/racket-doc/scribblings/reference/evts.scrbl +++ b/pkgs/racket-doc/scribblings/reference/evts.scrbl @@ -31,10 +31,18 @@ events, however (such as a port), synchronizing does not modify the event's state. Racket values that act as @tech{synchronizable events} include -@tech{semaphores}, @tech{channels}, @tech{asynchronous channels}, -@tech{ports}, @tech{TCP listeners}, @tech{log receiver}s, @tech{threads}, -@tech{subprocess}es, @tech{will executors}, and @tech{custodian -box}es. Libraries can define new synchronizable events, especially +@tech{asynchronous channels}, +@tech{channels}, +@tech{custodian box}es, +@tech{log receivers}, +@tech{place channels}, +@tech{ports}, +@tech{semaphores}, +@tech{subprocess}es, +@tech{TCP listeners}, +@tech{threads}, and +@tech{will executors}. +Libraries can define new synchronizable events, especially though @racket[prop:evt]. @;------------------------------------------------------------------------