From c5f9cb4984a1a55f039bf2afbae02550108eebf4 Mon Sep 17 00:00:00 2001 From: "David T. Pierson" Date: Sun, 6 Oct 2013 15:39:37 -0400 Subject: [PATCH] Add concurrency chapter to the Guide --- .../scribblings/guide/concurrency.scrbl | 395 ++++++++++++++++++ .../racket-doc/scribblings/guide/guide.scrbl | 2 + 2 files changed, 397 insertions(+) create mode 100644 pkgs/racket-pkgs/racket-doc/scribblings/guide/concurrency.scrbl diff --git a/pkgs/racket-pkgs/racket-doc/scribblings/guide/concurrency.scrbl b/pkgs/racket-pkgs/racket-doc/scribblings/guide/concurrency.scrbl new file mode 100644 index 0000000000..145613e8d5 --- /dev/null +++ b/pkgs/racket-pkgs/racket-doc/scribblings/guide/concurrency.scrbl @@ -0,0 +1,395 @@ +#lang scribble/doc +@(require scribble/manual scribble/eval "guide-utils.rkt") + +@(define concurrency-eval (make-base-eval)) + +@(define reference-doc '(lib "scribblings/reference/reference.scrbl")) + +@title[#:tag "concurrency"]{Concurrency, Threads, and Synchronization} + +@section{Thread basics} + +To execute a procedure concurrrently, use @racket[thread]. This example +creates 2 new threads from the main thread: + +@racketblock[ +(displayln "This is the original thread") +(thread (lambda () (displayln "This is a new thread."))) +(thread (lambda () (displayln "This is another new thread."))) +] + +The next example creates a new thread that would otherwise loop forever, but +the main thread uses @racket[sleep] to pause itself for 2.5 seconds, then +uses @racket[kill-thread] to terminate the worker thread: + +@racketblock[ +(define worker (thread (lambda () + (let loop () + (displayln "Working...") + (sleep 0.2) + (loop))))) +(sleep 2.5) +(kill-thread worker) +] + +@margin-note{In DrRacket, the main thread keeps going until the Stop button is +clicked, so in DrRacket the @racket[thread-wait] is not necessary.} + +If the main thread finishes or is killed, the application exits, even if +other threads were still running. A thread can use @racket[thread-wait] to +wait for another thread to finish. Here the main thread uses +@racket[thread-wait] to make sure the worker thread finishes before the main +thread exits: + +@racketblock[ +(define worker (thread + (lambda () + (for ([i 100]) + (printf "Working hard... ~a~n" i))))) +(thread-wait worker) +(displayln "Worker finished") +] + +@section{Thread Mailboxes} + +Each thread has a mailbox for receiving messages. @racket[thread-send] +asynchronously sends a message to another thread's mailbox, while +@racket[thread-receive] will return the oldest message from the current +thread's mailbox, blocking to wait for a message if necessary. In the +following example, the main thread sends data to the worker thread to be +processed, then sends a @racket['done] message when there is no more data and +waits for the worker thread to finish. + +@racketblock[ +(define worker-thread (thread + (lambda () + (let loop () + (match (thread-receive) + [(? number? num) + (printf "Processing ~a~n" num) + (loop)] + ['done + (printf "Done~n")]))))) +(for ([i 20]) + (thread-send worker-thread i)) +(thread-send worker-thread 'done) +(thread-wait worker-thread) +] + +In the next example the main thread delegates work to multiple arithmetic +threads, then waits to receive the results. The arithmetic threads process work +items then send the results to the main thread. + +@racketblock[ +(define (make-arithmetic-thread operation) + (thread (lambda () + (let loop () + (match (thread-receive) + [(list oper1 oper2 result-thread) + (thread-send result-thread + (format "~a + ~a = ~a" oper1 oper2 (operation oper1 oper2))) + (loop)]))))) + +(define addition-thread (make-arithmetic-thread +)) +(define subtraction-thread (make-arithmetic-thread -)) + +(define worklist '((+ 1 1) (+ 2 2) (- 3 2) (- 4 1))) +(for ([item worklist]) + (match item + [(list '+ o1 o2) + (thread-send addition-thread (list o1 o2 (current-thread)))] + [(list '- o1 o2) + (thread-send subtraction-thread (list o1 o2 (current-thread)))])) + +(for ([i (length worklist)]) + (displayln (thread-receive))) +] + +@section{Semaphores} + +Semaphores facilitate synchronized access to an arbitrary shared resource. +Use semaphores when multiple threads must perform non-atomic operations on a +single resource. + +In the following example, multiple threads print to standard output +concurrently. Without synchronization, a line printed by one thread might +appear in the middle of a line printed by another thread. By using a semaphore +initialized with a count of 1, only 1 thread will print at a time. +@racket[semaphore-wait] blocks until the semaphore's internal counter is +non-zero, then decrements the counter and returns. @racket[semaphore-post] +increments the counter so that another thread can unblock then print. + +@racketblock[ +(define output-semaphore (make-semaphore 1)) +(define (make-thread name) + (thread (lambda () + (for [(i 10)] + (semaphore-wait output-semaphore) + (printf "thread ~a: ~a~n" name i) + (semaphore-post output-semaphore))))) +(define threads + (map make-thread '(A B C))) +(for-each thread-wait threads) +] + +Semaphores are a low-level technique. Often a better solution is to restrict +resource access to a single thread. For instance, synchronizing access to +standard output might be better accomplished by having a dedicated thread for +printing output. + +@section{Channels} + +Channels synchronize two threads while a value is passed from one thread to the +other. Unlike a thread mailbox, multiple threads can get items from a single +channel, so channels should be used when multiple threads need to consume items +from a single work queue. + +In the following example, the main thread adds items to a channel using +@racket[channel-put] while multiple worker threads consume those items using +@racket[channel-get]. Each call to either procedure may block until another +thread calls the other procedure with the same channel. The workers process +the items and then pass their results to the result-thread via the result-channel. + +@racketblock[ +(define result-channel (make-channel)) +(define result-thread + (thread (lambda () + (let loop () + (displayln (channel-get result-channel)) + (loop))))) + +(define work-channel (make-channel)) +(define (make-worker thread-id) + (thread + (lambda () + (let loop () + (define item (channel-get work-channel)) + (case item + [(DONE) + (channel-put result-channel + (format "Thread ~a done" thread-id))] + [else + (channel-put result-channel + (format "Thread ~a processed ~a" + thread-id + item)) + (loop)]))))) +(define work-threads (map make-worker '(1 2))) +(for ([item '(A B C D E F G H DONE DONE)]) + (channel-put work-channel item)) +(for-each thread-wait work-threads) +] + +@section{Buffered Asynchronous Channels} + +Buffered asynchronous channels are similar to the channels described above, but +the put operation of asynchronous channels does not block unless the given +channel was created with a buffer limit and the limit has been reached. The +asynchronous put operation is therefore somewhat similar to +@racket[thread-send], but unlike thread mailboxes, asynchronous channels allow +multiple threads to consume items from a single channel. In the following +example, the main thread adds items to the work channel, which holds a maximum +of 3 items at a time. The worker threads process items from this channel and +then send results to the print thread. + +@racketblock[ +(require racket/async-channel) +(define print-thread + (thread (lambda () + (let loop () + (displayln (thread-receive)) + (loop))))) +(define (safer-printf . items) + (thread-send print-thread + (apply format items))) +(define work-channel (make-async-channel 3)) +(define (make-worker-thread thread-id) + (thread + (lambda () + (let loop () + (define item (async-channel-get work-channel)) + (safer-printf "Thread ~a processing item: ~a" thread-id item) + (loop))))) +(for-each make-worker-thread '(1 2 3)) +(for ([item '(a b c d e f g h i j k l m)]) + (async-channel-put work-channel item)) +] + +Note the above example lacks any synchronization to verify that all items were +processed. If the main thread were to exit without such synchronization, it is +possible that the worker threads will not finish processing some items or the +print thread will not print all items. + +@section{Synchronizable events and sync} + +There are other ways to synchronize threads. The @racket[sync] function allows +threads to coordinate via @tech[#:doc reference-doc]{synchronizable events}s. +Many types double as events, allowing a uniform way to synchronize threads +using different types. Examples include channels, ports, and alarms. + +In the next example, a channel and an alarm are used as synchronizable events. +The workers sync on both in order to process the channel items up until the +alarm is activated. The channel items are processed and then results sent back +to the main thread. + +@racketblock[ +(define main-thread (current-thread)) +(define alarm (alarm-evt (+ 3000 (current-inexact-milliseconds)))) +(define channel (make-channel)) +(define (make-worker-thread thread-id) + (thread + (lambda () + (define evt (sync channel alarm)) + (cond + [(equal? evt alarm) + (thread-send main-thread 'alarm)] + [else + (thread-send main-thread + (format "Thread ~a received ~a" + thread-id + evt))])))) +(make-worker-thread 1) +(make-worker-thread 2) +(make-worker-thread 3) +(channel-put channel 'A) +(channel-put channel 'B) +(let loop () + (match (thread-receive) + ['alarm + (displayln "Done")] + [result + (displayln result) + (loop)])) +] + +The next example shows a function for use in a simple TCP echo server. The +function uses @racket[sync/timeout] to synchronize on input from the given port +or a message in the thread's mailbox. @racket[sync/timeout]'s first argument +specifies the maximum number of seconds it should wait on the given events. +@racket[read-line-evt] returns an event that is ready when a line of input is +available in the given input port. @racket[thread-receive-evt] is ready when +@racket[thread-receive] would not block. In a real application, the messages +received in the thread mailbox could be used for control messages, etc. + +@racketblock[ +(define (serve in-port out-port) + (let loop [] + (define evt (sync/timeout 2 (read-line-evt in-port 'any) (thread-receive-evt))) + (cond + [(not evt) + (displayln "Timed out, exiting") + (tcp-abandon-port in-port) + (tcp-abandon-port out-port)] + [(string? evt) + (fprintf out-port "~a~n" evt) + (flush-output out-port) + (loop)] + [else + (printf "Received a message in mailbox: ~a~n" (thread-receive)) + (loop)]))) +] + +The previous example could be used in a program like the following. This code +starts a server thread and a client thread which communicate over TCP. The +client prints 3 lines to the server, which echoes them back. The client's +copy-port call will block until EOF is received. The server will timeout after +2 seconds, closing the ports which then allows copy-port to finish and the +client will exit. The main thread uses @racket[thread-wait] to wait for the +client-thread to exit. Without @racket[thread-wait], the main thread might +exit before the other threads are finished. + +@racketblock[ +(define port-num 4321) +(define (start-server) + (define listener (tcp-listen port-num)) + (thread + (lambda () + (define-values [in-port out-port] (tcp-accept listener)) + (serve in-port out-port)))) + +(start-server) + +(define client-thread + (thread + (lambda () + (define-values [in-port out-port] (tcp-connect "localhost" port-num)) + (display "first\nsecond\nthird\n" out-port) + (flush-output out-port) + (code:comment "copy-port will block until EOF is read from in-port") + (copy-port in-port (current-output-port))))) + +(thread-wait client-thread) +] + +Sometimes you want to attach result behavior directly to the event passed to +@racket[sync]. In the following example, the worker thread synchronizes on 3 +channels, but each channel must be handled differently. Using +@racket[handle-evt] associates a callback with the given event. When +@racket[sync] selects the given event, it calls the callback to generate the +synchronization result, rather than using the event's normal synchronization +result. Because the event is handled in the callback, there is no need to +dispatch on the return value of @racket[sync]. + +@racketblock[ +(define add-channel (make-channel)) +(define multiply-channel (make-channel)) +(define append-channel (make-channel)) + +(define (work) + (let loop () + (sync (handle-evt add-channel + (lambda (list-of-numbers) + (printf "Sum of ~a is ~a~n" + list-of-numbers + (apply + list-of-numbers)))) + (handle-evt multiply-channel + (lambda (list-of-numbers) + (printf "Product of ~a is ~a~n" + list-of-numbers + (apply * list-of-numbers)))) + (handle-evt append-channel + (lambda (list-of-strings) + (printf "Concatenation of ~s is ~s~n" + list-of-strings + (apply string-append list-of-strings))))) + (loop))) + +(define worker (thread work)) +(channel-put add-channel '(1 2)) +(channel-put multiply-channel '(3 4)) +(channel-put multiply-channel '(5 6)) +(channel-put add-channel '(7 8)) +(channel-put append-channel '("a" "b")) +] + +@racket[handle-evt] invokes the callback in tail position, so it is safer to +use recursion, as in the following example. + +@racketblock[ +(define control-channel (make-channel)) +(define add-channel (make-channel)) +(define subtract-channel (make-channel)) +(define (work state) + (printf "Current state: ~a~n" state) + (sync (handle-evt add-channel + (lambda (number) + (printf "Adding: ~a~n" number) + (work (+ state number)))) + (handle-evt subtract-channel + (lambda (number) + (printf "Subtracting: ~a~n" number) + (work (- state number)))) + (handle-evt control-channel + (lambda (kill-message) + (printf "Done~n"))))) + +(define worker (thread (lambda () (work 0)))) +(channel-put add-channel 2) +(channel-put subtract-channel 3) +(channel-put add-channel 4) +(channel-put add-channel 5) +(channel-put subtract-channel 1) +(channel-put control-channel 'done) +(thread-wait worker) +] + diff --git a/pkgs/racket-pkgs/racket-doc/scribblings/guide/guide.scrbl b/pkgs/racket-pkgs/racket-doc/scribblings/guide/guide.scrbl index c3f34026be..52ba3b7185 100644 --- a/pkgs/racket-pkgs/racket-doc/scribblings/guide/guide.scrbl +++ b/pkgs/racket-pkgs/racket-doc/scribblings/guide/guide.scrbl @@ -51,6 +51,8 @@ precise details to @|Racket| and other reference manuals. @include-section["languages.scrbl"] +@include-section["concurrency.scrbl"] + @include-section["performance.scrbl"] @include-section["running.scrbl"]