Add concurrency chapter to the Guide
This commit is contained in:
parent
cdbe416b09
commit
c5f9cb4984
395
pkgs/racket-pkgs/racket-doc/scribblings/guide/concurrency.scrbl
Normal file
395
pkgs/racket-pkgs/racket-doc/scribblings/guide/concurrency.scrbl
Normal file
|
@ -0,0 +1,395 @@
|
||||||
|
#lang scribble/doc
|
||||||
|
@(require scribble/manual scribble/eval "guide-utils.rkt")
|
||||||
|
|
||||||
|
@(define concurrency-eval (make-base-eval))
|
||||||
|
|
||||||
|
@(define reference-doc '(lib "scribblings/reference/reference.scrbl"))
|
||||||
|
|
||||||
|
@title[#:tag "concurrency"]{Concurrency, Threads, and Synchronization}
|
||||||
|
|
||||||
|
@section{Thread basics}
|
||||||
|
|
||||||
|
To execute a procedure concurrrently, use @racket[thread]. This example
|
||||||
|
creates 2 new threads from the main thread:
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(displayln "This is the original thread")
|
||||||
|
(thread (lambda () (displayln "This is a new thread.")))
|
||||||
|
(thread (lambda () (displayln "This is another new thread.")))
|
||||||
|
]
|
||||||
|
|
||||||
|
The next example creates a new thread that would otherwise loop forever, but
|
||||||
|
the main thread uses @racket[sleep] to pause itself for 2.5 seconds, then
|
||||||
|
uses @racket[kill-thread] to terminate the worker thread:
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define worker (thread (lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(displayln "Working...")
|
||||||
|
(sleep 0.2)
|
||||||
|
(loop)))))
|
||||||
|
(sleep 2.5)
|
||||||
|
(kill-thread worker)
|
||||||
|
]
|
||||||
|
|
||||||
|
@margin-note{In DrRacket, the main thread keeps going until the Stop button is
|
||||||
|
clicked, so in DrRacket the @racket[thread-wait] is not necessary.}
|
||||||
|
|
||||||
|
If the main thread finishes or is killed, the application exits, even if
|
||||||
|
other threads were still running. A thread can use @racket[thread-wait] to
|
||||||
|
wait for another thread to finish. Here the main thread uses
|
||||||
|
@racket[thread-wait] to make sure the worker thread finishes before the main
|
||||||
|
thread exits:
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define worker (thread
|
||||||
|
(lambda ()
|
||||||
|
(for ([i 100])
|
||||||
|
(printf "Working hard... ~a~n" i)))))
|
||||||
|
(thread-wait worker)
|
||||||
|
(displayln "Worker finished")
|
||||||
|
]
|
||||||
|
|
||||||
|
@section{Thread Mailboxes}
|
||||||
|
|
||||||
|
Each thread has a mailbox for receiving messages. @racket[thread-send]
|
||||||
|
asynchronously sends a message to another thread's mailbox, while
|
||||||
|
@racket[thread-receive] will return the oldest message from the current
|
||||||
|
thread's mailbox, blocking to wait for a message if necessary. In the
|
||||||
|
following example, the main thread sends data to the worker thread to be
|
||||||
|
processed, then sends a @racket['done] message when there is no more data and
|
||||||
|
waits for the worker thread to finish.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define worker-thread (thread
|
||||||
|
(lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(match (thread-receive)
|
||||||
|
[(? number? num)
|
||||||
|
(printf "Processing ~a~n" num)
|
||||||
|
(loop)]
|
||||||
|
['done
|
||||||
|
(printf "Done~n")])))))
|
||||||
|
(for ([i 20])
|
||||||
|
(thread-send worker-thread i))
|
||||||
|
(thread-send worker-thread 'done)
|
||||||
|
(thread-wait worker-thread)
|
||||||
|
]
|
||||||
|
|
||||||
|
In the next example the main thread delegates work to multiple arithmetic
|
||||||
|
threads, then waits to receive the results. The arithmetic threads process work
|
||||||
|
items then send the results to the main thread.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define (make-arithmetic-thread operation)
|
||||||
|
(thread (lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(match (thread-receive)
|
||||||
|
[(list oper1 oper2 result-thread)
|
||||||
|
(thread-send result-thread
|
||||||
|
(format "~a + ~a = ~a" oper1 oper2 (operation oper1 oper2)))
|
||||||
|
(loop)])))))
|
||||||
|
|
||||||
|
(define addition-thread (make-arithmetic-thread +))
|
||||||
|
(define subtraction-thread (make-arithmetic-thread -))
|
||||||
|
|
||||||
|
(define worklist '((+ 1 1) (+ 2 2) (- 3 2) (- 4 1)))
|
||||||
|
(for ([item worklist])
|
||||||
|
(match item
|
||||||
|
[(list '+ o1 o2)
|
||||||
|
(thread-send addition-thread (list o1 o2 (current-thread)))]
|
||||||
|
[(list '- o1 o2)
|
||||||
|
(thread-send subtraction-thread (list o1 o2 (current-thread)))]))
|
||||||
|
|
||||||
|
(for ([i (length worklist)])
|
||||||
|
(displayln (thread-receive)))
|
||||||
|
]
|
||||||
|
|
||||||
|
@section{Semaphores}
|
||||||
|
|
||||||
|
Semaphores facilitate synchronized access to an arbitrary shared resource.
|
||||||
|
Use semaphores when multiple threads must perform non-atomic operations on a
|
||||||
|
single resource.
|
||||||
|
|
||||||
|
In the following example, multiple threads print to standard output
|
||||||
|
concurrently. Without synchronization, a line printed by one thread might
|
||||||
|
appear in the middle of a line printed by another thread. By using a semaphore
|
||||||
|
initialized with a count of 1, only 1 thread will print at a time.
|
||||||
|
@racket[semaphore-wait] blocks until the semaphore's internal counter is
|
||||||
|
non-zero, then decrements the counter and returns. @racket[semaphore-post]
|
||||||
|
increments the counter so that another thread can unblock then print.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define output-semaphore (make-semaphore 1))
|
||||||
|
(define (make-thread name)
|
||||||
|
(thread (lambda ()
|
||||||
|
(for [(i 10)]
|
||||||
|
(semaphore-wait output-semaphore)
|
||||||
|
(printf "thread ~a: ~a~n" name i)
|
||||||
|
(semaphore-post output-semaphore)))))
|
||||||
|
(define threads
|
||||||
|
(map make-thread '(A B C)))
|
||||||
|
(for-each thread-wait threads)
|
||||||
|
]
|
||||||
|
|
||||||
|
Semaphores are a low-level technique. Often a better solution is to restrict
|
||||||
|
resource access to a single thread. For instance, synchronizing access to
|
||||||
|
standard output might be better accomplished by having a dedicated thread for
|
||||||
|
printing output.
|
||||||
|
|
||||||
|
@section{Channels}
|
||||||
|
|
||||||
|
Channels synchronize two threads while a value is passed from one thread to the
|
||||||
|
other. Unlike a thread mailbox, multiple threads can get items from a single
|
||||||
|
channel, so channels should be used when multiple threads need to consume items
|
||||||
|
from a single work queue.
|
||||||
|
|
||||||
|
In the following example, the main thread adds items to a channel using
|
||||||
|
@racket[channel-put] while multiple worker threads consume those items using
|
||||||
|
@racket[channel-get]. Each call to either procedure may block until another
|
||||||
|
thread calls the other procedure with the same channel. The workers process
|
||||||
|
the items and then pass their results to the result-thread via the result-channel.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define result-channel (make-channel))
|
||||||
|
(define result-thread
|
||||||
|
(thread (lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(displayln (channel-get result-channel))
|
||||||
|
(loop)))))
|
||||||
|
|
||||||
|
(define work-channel (make-channel))
|
||||||
|
(define (make-worker thread-id)
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(define item (channel-get work-channel))
|
||||||
|
(case item
|
||||||
|
[(DONE)
|
||||||
|
(channel-put result-channel
|
||||||
|
(format "Thread ~a done" thread-id))]
|
||||||
|
[else
|
||||||
|
(channel-put result-channel
|
||||||
|
(format "Thread ~a processed ~a"
|
||||||
|
thread-id
|
||||||
|
item))
|
||||||
|
(loop)])))))
|
||||||
|
(define work-threads (map make-worker '(1 2)))
|
||||||
|
(for ([item '(A B C D E F G H DONE DONE)])
|
||||||
|
(channel-put work-channel item))
|
||||||
|
(for-each thread-wait work-threads)
|
||||||
|
]
|
||||||
|
|
||||||
|
@section{Buffered Asynchronous Channels}
|
||||||
|
|
||||||
|
Buffered asynchronous channels are similar to the channels described above, but
|
||||||
|
the put operation of asynchronous channels does not block unless the given
|
||||||
|
channel was created with a buffer limit and the limit has been reached. The
|
||||||
|
asynchronous put operation is therefore somewhat similar to
|
||||||
|
@racket[thread-send], but unlike thread mailboxes, asynchronous channels allow
|
||||||
|
multiple threads to consume items from a single channel. In the following
|
||||||
|
example, the main thread adds items to the work channel, which holds a maximum
|
||||||
|
of 3 items at a time. The worker threads process items from this channel and
|
||||||
|
then send results to the print thread.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(require racket/async-channel)
|
||||||
|
(define print-thread
|
||||||
|
(thread (lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(displayln (thread-receive))
|
||||||
|
(loop)))))
|
||||||
|
(define (safer-printf . items)
|
||||||
|
(thread-send print-thread
|
||||||
|
(apply format items)))
|
||||||
|
(define work-channel (make-async-channel 3))
|
||||||
|
(define (make-worker-thread thread-id)
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(let loop ()
|
||||||
|
(define item (async-channel-get work-channel))
|
||||||
|
(safer-printf "Thread ~a processing item: ~a" thread-id item)
|
||||||
|
(loop)))))
|
||||||
|
(for-each make-worker-thread '(1 2 3))
|
||||||
|
(for ([item '(a b c d e f g h i j k l m)])
|
||||||
|
(async-channel-put work-channel item))
|
||||||
|
]
|
||||||
|
|
||||||
|
Note the above example lacks any synchronization to verify that all items were
|
||||||
|
processed. If the main thread were to exit without such synchronization, it is
|
||||||
|
possible that the worker threads will not finish processing some items or the
|
||||||
|
print thread will not print all items.
|
||||||
|
|
||||||
|
@section{Synchronizable events and sync}
|
||||||
|
|
||||||
|
There are other ways to synchronize threads. The @racket[sync] function allows
|
||||||
|
threads to coordinate via @tech[#:doc reference-doc]{synchronizable events}s.
|
||||||
|
Many types double as events, allowing a uniform way to synchronize threads
|
||||||
|
using different types. Examples include channels, ports, and alarms.
|
||||||
|
|
||||||
|
In the next example, a channel and an alarm are used as synchronizable events.
|
||||||
|
The workers sync on both in order to process the channel items up until the
|
||||||
|
alarm is activated. The channel items are processed and then results sent back
|
||||||
|
to the main thread.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define main-thread (current-thread))
|
||||||
|
(define alarm (alarm-evt (+ 3000 (current-inexact-milliseconds))))
|
||||||
|
(define channel (make-channel))
|
||||||
|
(define (make-worker-thread thread-id)
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(define evt (sync channel alarm))
|
||||||
|
(cond
|
||||||
|
[(equal? evt alarm)
|
||||||
|
(thread-send main-thread 'alarm)]
|
||||||
|
[else
|
||||||
|
(thread-send main-thread
|
||||||
|
(format "Thread ~a received ~a"
|
||||||
|
thread-id
|
||||||
|
evt))]))))
|
||||||
|
(make-worker-thread 1)
|
||||||
|
(make-worker-thread 2)
|
||||||
|
(make-worker-thread 3)
|
||||||
|
(channel-put channel 'A)
|
||||||
|
(channel-put channel 'B)
|
||||||
|
(let loop ()
|
||||||
|
(match (thread-receive)
|
||||||
|
['alarm
|
||||||
|
(displayln "Done")]
|
||||||
|
[result
|
||||||
|
(displayln result)
|
||||||
|
(loop)]))
|
||||||
|
]
|
||||||
|
|
||||||
|
The next example shows a function for use in a simple TCP echo server. The
|
||||||
|
function uses @racket[sync/timeout] to synchronize on input from the given port
|
||||||
|
or a message in the thread's mailbox. @racket[sync/timeout]'s first argument
|
||||||
|
specifies the maximum number of seconds it should wait on the given events.
|
||||||
|
@racket[read-line-evt] returns an event that is ready when a line of input is
|
||||||
|
available in the given input port. @racket[thread-receive-evt] is ready when
|
||||||
|
@racket[thread-receive] would not block. In a real application, the messages
|
||||||
|
received in the thread mailbox could be used for control messages, etc.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define (serve in-port out-port)
|
||||||
|
(let loop []
|
||||||
|
(define evt (sync/timeout 2 (read-line-evt in-port 'any) (thread-receive-evt)))
|
||||||
|
(cond
|
||||||
|
[(not evt)
|
||||||
|
(displayln "Timed out, exiting")
|
||||||
|
(tcp-abandon-port in-port)
|
||||||
|
(tcp-abandon-port out-port)]
|
||||||
|
[(string? evt)
|
||||||
|
(fprintf out-port "~a~n" evt)
|
||||||
|
(flush-output out-port)
|
||||||
|
(loop)]
|
||||||
|
[else
|
||||||
|
(printf "Received a message in mailbox: ~a~n" (thread-receive))
|
||||||
|
(loop)])))
|
||||||
|
]
|
||||||
|
|
||||||
|
The previous example could be used in a program like the following. This code
|
||||||
|
starts a server thread and a client thread which communicate over TCP. The
|
||||||
|
client prints 3 lines to the server, which echoes them back. The client's
|
||||||
|
copy-port call will block until EOF is received. The server will timeout after
|
||||||
|
2 seconds, closing the ports which then allows copy-port to finish and the
|
||||||
|
client will exit. The main thread uses @racket[thread-wait] to wait for the
|
||||||
|
client-thread to exit. Without @racket[thread-wait], the main thread might
|
||||||
|
exit before the other threads are finished.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define port-num 4321)
|
||||||
|
(define (start-server)
|
||||||
|
(define listener (tcp-listen port-num))
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(define-values [in-port out-port] (tcp-accept listener))
|
||||||
|
(serve in-port out-port))))
|
||||||
|
|
||||||
|
(start-server)
|
||||||
|
|
||||||
|
(define client-thread
|
||||||
|
(thread
|
||||||
|
(lambda ()
|
||||||
|
(define-values [in-port out-port] (tcp-connect "localhost" port-num))
|
||||||
|
(display "first\nsecond\nthird\n" out-port)
|
||||||
|
(flush-output out-port)
|
||||||
|
(code:comment "copy-port will block until EOF is read from in-port")
|
||||||
|
(copy-port in-port (current-output-port)))))
|
||||||
|
|
||||||
|
(thread-wait client-thread)
|
||||||
|
]
|
||||||
|
|
||||||
|
Sometimes you want to attach result behavior directly to the event passed to
|
||||||
|
@racket[sync]. In the following example, the worker thread synchronizes on 3
|
||||||
|
channels, but each channel must be handled differently. Using
|
||||||
|
@racket[handle-evt] associates a callback with the given event. When
|
||||||
|
@racket[sync] selects the given event, it calls the callback to generate the
|
||||||
|
synchronization result, rather than using the event's normal synchronization
|
||||||
|
result. Because the event is handled in the callback, there is no need to
|
||||||
|
dispatch on the return value of @racket[sync].
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define add-channel (make-channel))
|
||||||
|
(define multiply-channel (make-channel))
|
||||||
|
(define append-channel (make-channel))
|
||||||
|
|
||||||
|
(define (work)
|
||||||
|
(let loop ()
|
||||||
|
(sync (handle-evt add-channel
|
||||||
|
(lambda (list-of-numbers)
|
||||||
|
(printf "Sum of ~a is ~a~n"
|
||||||
|
list-of-numbers
|
||||||
|
(apply + list-of-numbers))))
|
||||||
|
(handle-evt multiply-channel
|
||||||
|
(lambda (list-of-numbers)
|
||||||
|
(printf "Product of ~a is ~a~n"
|
||||||
|
list-of-numbers
|
||||||
|
(apply * list-of-numbers))))
|
||||||
|
(handle-evt append-channel
|
||||||
|
(lambda (list-of-strings)
|
||||||
|
(printf "Concatenation of ~s is ~s~n"
|
||||||
|
list-of-strings
|
||||||
|
(apply string-append list-of-strings)))))
|
||||||
|
(loop)))
|
||||||
|
|
||||||
|
(define worker (thread work))
|
||||||
|
(channel-put add-channel '(1 2))
|
||||||
|
(channel-put multiply-channel '(3 4))
|
||||||
|
(channel-put multiply-channel '(5 6))
|
||||||
|
(channel-put add-channel '(7 8))
|
||||||
|
(channel-put append-channel '("a" "b"))
|
||||||
|
]
|
||||||
|
|
||||||
|
@racket[handle-evt] invokes the callback in tail position, so it is safer to
|
||||||
|
use recursion, as in the following example.
|
||||||
|
|
||||||
|
@racketblock[
|
||||||
|
(define control-channel (make-channel))
|
||||||
|
(define add-channel (make-channel))
|
||||||
|
(define subtract-channel (make-channel))
|
||||||
|
(define (work state)
|
||||||
|
(printf "Current state: ~a~n" state)
|
||||||
|
(sync (handle-evt add-channel
|
||||||
|
(lambda (number)
|
||||||
|
(printf "Adding: ~a~n" number)
|
||||||
|
(work (+ state number))))
|
||||||
|
(handle-evt subtract-channel
|
||||||
|
(lambda (number)
|
||||||
|
(printf "Subtracting: ~a~n" number)
|
||||||
|
(work (- state number))))
|
||||||
|
(handle-evt control-channel
|
||||||
|
(lambda (kill-message)
|
||||||
|
(printf "Done~n")))))
|
||||||
|
|
||||||
|
(define worker (thread (lambda () (work 0))))
|
||||||
|
(channel-put add-channel 2)
|
||||||
|
(channel-put subtract-channel 3)
|
||||||
|
(channel-put add-channel 4)
|
||||||
|
(channel-put add-channel 5)
|
||||||
|
(channel-put subtract-channel 1)
|
||||||
|
(channel-put control-channel 'done)
|
||||||
|
(thread-wait worker)
|
||||||
|
]
|
||||||
|
|
|
@ -51,6 +51,8 @@ precise details to @|Racket| and other reference manuals.
|
||||||
|
|
||||||
@include-section["languages.scrbl"]
|
@include-section["languages.scrbl"]
|
||||||
|
|
||||||
|
@include-section["concurrency.scrbl"]
|
||||||
|
|
||||||
@include-section["performance.scrbl"]
|
@include-section["performance.scrbl"]
|
||||||
|
|
||||||
@include-section["running.scrbl"]
|
@include-section["running.scrbl"]
|
||||||
|
|
Loading…
Reference in New Issue
Block a user