|
|
|
@ -1,16 +1,29 @@
|
|
|
|
|
#lang scribble/doc
|
|
|
|
|
@(require scribble/manual scribble/eval "guide-utils.rkt")
|
|
|
|
|
@(require scribble/manual
|
|
|
|
|
scribble/eval
|
|
|
|
|
"guide-utils.rkt"
|
|
|
|
|
(for-label racket))
|
|
|
|
|
|
|
|
|
|
@(define concurrency-eval (make-base-eval))
|
|
|
|
|
|
|
|
|
|
@(define reference-doc '(lib "scribblings/reference/reference.scrbl"))
|
|
|
|
|
|
|
|
|
|
@title[#:tag "concurrency"]{Concurrency, Threads, and Synchronization}
|
|
|
|
|
@title[#:tag "concurrency"]{Concurrency and Synchronization}
|
|
|
|
|
|
|
|
|
|
@section{Thread basics}
|
|
|
|
|
Racket provides @deftech{concurrency} in the form of
|
|
|
|
|
@deftech{threads}, and it provides a general @racket[sync] function
|
|
|
|
|
that can be used to synchronize both threads and other implicit forms of
|
|
|
|
|
concurrency, such as @tech{ports}.
|
|
|
|
|
|
|
|
|
|
To execute a procedure concurrrently, use @racket[thread]. This example
|
|
|
|
|
creates 2 new threads from the main thread:
|
|
|
|
|
Threads run concurrently in the sense that one thread can preempt
|
|
|
|
|
another without its cooperation, but threads do not run in parallel in
|
|
|
|
|
the sense of using multiple hardware processors. See
|
|
|
|
|
@secref["parallelism"] for information on parallelism in Racket.
|
|
|
|
|
|
|
|
|
|
@section{Threads}
|
|
|
|
|
|
|
|
|
|
To execute a procedure concurrrently, use @racket[thread]. The
|
|
|
|
|
following example creates two new threads from the main thread:
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(displayln "This is the original thread")
|
|
|
|
@ -36,8 +49,8 @@ uses @racket[kill-thread] to terminate the worker thread:
|
|
|
|
|
clicked, so in DrRacket the @racket[thread-wait] is not necessary.}
|
|
|
|
|
|
|
|
|
|
If the main thread finishes or is killed, the application exits, even if
|
|
|
|
|
other threads were still running. A thread can use @racket[thread-wait] to
|
|
|
|
|
wait for another thread to finish. Here the main thread uses
|
|
|
|
|
other threads are still running. A thread can use @racket[thread-wait] to
|
|
|
|
|
wait for another thread to finish. Here, the main thread uses
|
|
|
|
|
@racket[thread-wait] to make sure the worker thread finishes before the main
|
|
|
|
|
thread exits:
|
|
|
|
|
|
|
|
|
@ -52,9 +65,9 @@ thread exits:
|
|
|
|
|
|
|
|
|
|
@section{Thread Mailboxes}
|
|
|
|
|
|
|
|
|
|
Each thread has a mailbox for receiving messages. @racket[thread-send]
|
|
|
|
|
Each thread has a mailbox for receiving messages. The @racket[thread-send] function
|
|
|
|
|
asynchronously sends a message to another thread's mailbox, while
|
|
|
|
|
@racket[thread-receive] will return the oldest message from the current
|
|
|
|
|
@racket[thread-receive] returns the oldest message from the current
|
|
|
|
|
thread's mailbox, blocking to wait for a message if necessary. In the
|
|
|
|
|
following example, the main thread sends data to the worker thread to be
|
|
|
|
|
processed, then sends a @racket['done] message when there is no more data and
|
|
|
|
@ -76,7 +89,7 @@ waits for the worker thread to finish.
|
|
|
|
|
(thread-wait worker-thread)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
In the next example the main thread delegates work to multiple arithmetic
|
|
|
|
|
In the next example, the main thread delegates work to multiple arithmetic
|
|
|
|
|
threads, then waits to receive the results. The arithmetic threads process work
|
|
|
|
|
items then send the results to the main thread.
|
|
|
|
|
|
|
|
|
@ -87,7 +100,10 @@ items then send the results to the main thread.
|
|
|
|
|
(match (thread-receive)
|
|
|
|
|
[(list oper1 oper2 result-thread)
|
|
|
|
|
(thread-send result-thread
|
|
|
|
|
(format "~a + ~a = ~a" oper1 oper2 (operation oper1 oper2)))
|
|
|
|
|
(format "~a + ~a = ~a"
|
|
|
|
|
oper1
|
|
|
|
|
oper2
|
|
|
|
|
(operation oper1 oper2)))
|
|
|
|
|
(loop)])))))
|
|
|
|
|
|
|
|
|
|
(define addition-thread (make-arithmetic-thread +))
|
|
|
|
@ -97,9 +113,11 @@ items then send the results to the main thread.
|
|
|
|
|
(for ([item worklist])
|
|
|
|
|
(match item
|
|
|
|
|
[(list '+ o1 o2)
|
|
|
|
|
(thread-send addition-thread (list o1 o2 (current-thread)))]
|
|
|
|
|
(thread-send addition-thread
|
|
|
|
|
(list o1 o2 (current-thread)))]
|
|
|
|
|
[(list '- o1 o2)
|
|
|
|
|
(thread-send subtraction-thread (list o1 o2 (current-thread)))]))
|
|
|
|
|
(thread-send subtraction-thread
|
|
|
|
|
(list o1 o2 (current-thread)))]))
|
|
|
|
|
|
|
|
|
|
(for ([i (length worklist)])
|
|
|
|
|
(displayln (thread-receive)))
|
|
|
|
@ -114,10 +132,10 @@ single resource.
|
|
|
|
|
In the following example, multiple threads print to standard output
|
|
|
|
|
concurrently. Without synchronization, a line printed by one thread might
|
|
|
|
|
appear in the middle of a line printed by another thread. By using a semaphore
|
|
|
|
|
initialized with a count of 1, only 1 thread will print at a time.
|
|
|
|
|
@racket[semaphore-wait] blocks until the semaphore's internal counter is
|
|
|
|
|
non-zero, then decrements the counter and returns. @racket[semaphore-post]
|
|
|
|
|
increments the counter so that another thread can unblock then print.
|
|
|
|
|
initialized with a count of @racket[1], only one thread will print at a time.
|
|
|
|
|
The @racket[semaphore-wait] function blocks until the semaphore's internal counter is
|
|
|
|
|
non-zero, then decrements the counter and returns. The @racket[semaphore-post] function
|
|
|
|
|
increments the counter so that another thread can unblock and then print.
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(define output-semaphore (make-semaphore 1))
|
|
|
|
@ -132,8 +150,27 @@ increments the counter so that another thread can unblock then print.
|
|
|
|
|
(for-each thread-wait threads)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
Semaphores are a low-level technique. Often a better solution is to restrict
|
|
|
|
|
resource access to a single thread. For instance, synchronizing access to
|
|
|
|
|
The pattern of waiting on a semaphore, working, and posting to the
|
|
|
|
|
semaphore can also be expressed using
|
|
|
|
|
@racket[call-with-semaphore],which has the advantage of posting to the
|
|
|
|
|
semaphore if control escapes (e.g., due to an exception):
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(define output-semaphore (make-semaphore 1))
|
|
|
|
|
(define (make-thread name)
|
|
|
|
|
(thread (lambda ()
|
|
|
|
|
(for [(i 10)]
|
|
|
|
|
(call-with-semaphore
|
|
|
|
|
output-semaphore
|
|
|
|
|
(lambda ()
|
|
|
|
|
(printf "thread ~a: ~a~n" name i)))))))
|
|
|
|
|
(define threads
|
|
|
|
|
(map make-thread '(A B C)))
|
|
|
|
|
(for-each thread-wait threads)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
Semaphores are a low-level technique. Often, a better solution is to restrict
|
|
|
|
|
resource access to a single thread. For example, synchronizing access to
|
|
|
|
|
standard output might be better accomplished by having a dedicated thread for
|
|
|
|
|
printing output.
|
|
|
|
|
|
|
|
|
@ -145,10 +182,10 @@ channel, so channels should be used when multiple threads need to consume items
|
|
|
|
|
from a single work queue.
|
|
|
|
|
|
|
|
|
|
In the following example, the main thread adds items to a channel using
|
|
|
|
|
@racket[channel-put] while multiple worker threads consume those items using
|
|
|
|
|
@racket[channel-get]. Each call to either procedure may block until another
|
|
|
|
|
@racket[channel-put], while multiple worker threads consume those items using
|
|
|
|
|
@racket[channel-get]. Each call to either procedure blocks until another
|
|
|
|
|
thread calls the other procedure with the same channel. The workers process
|
|
|
|
|
the items and then pass their results to the result-thread via the result-channel.
|
|
|
|
|
the items and then pass their results to the result thread via the @racket[result-channel].
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(define result-channel (make-channel))
|
|
|
|
@ -183,17 +220,20 @@ the items and then pass their results to the result-thread via the result-channe
|
|
|
|
|
@section{Buffered Asynchronous Channels}
|
|
|
|
|
|
|
|
|
|
Buffered asynchronous channels are similar to the channels described above, but
|
|
|
|
|
the put operation of asynchronous channels does not block unless the given
|
|
|
|
|
the ``put'' operation of asynchronous channels does not block---unless the given
|
|
|
|
|
channel was created with a buffer limit and the limit has been reached. The
|
|
|
|
|
asynchronous put operation is therefore somewhat similar to
|
|
|
|
|
asynchronous-put operation is therefore somewhat similar to
|
|
|
|
|
@racket[thread-send], but unlike thread mailboxes, asynchronous channels allow
|
|
|
|
|
multiple threads to consume items from a single channel. In the following
|
|
|
|
|
multiple threads to consume items from a single channel.
|
|
|
|
|
|
|
|
|
|
In the following
|
|
|
|
|
example, the main thread adds items to the work channel, which holds a maximum
|
|
|
|
|
of 3 items at a time. The worker threads process items from this channel and
|
|
|
|
|
of three items at a time. The worker threads process items from this channel and
|
|
|
|
|
then send results to the print thread.
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(require racket/async-channel)
|
|
|
|
|
|
|
|
|
|
(define print-thread
|
|
|
|
|
(thread (lambda ()
|
|
|
|
|
(let loop ()
|
|
|
|
@ -202,6 +242,7 @@ then send results to the print thread.
|
|
|
|
|
(define (safer-printf . items)
|
|
|
|
|
(thread-send print-thread
|
|
|
|
|
(apply format items)))
|
|
|
|
|
|
|
|
|
|
(define work-channel (make-async-channel 3))
|
|
|
|
|
(define (make-worker-thread thread-id)
|
|
|
|
|
(thread
|
|
|
|
@ -210,6 +251,7 @@ then send results to the print thread.
|
|
|
|
|
(define item (async-channel-get work-channel))
|
|
|
|
|
(safer-printf "Thread ~a processing item: ~a" thread-id item)
|
|
|
|
|
(loop)))))
|
|
|
|
|
|
|
|
|
|
(for-each make-worker-thread '(1 2 3))
|
|
|
|
|
(for ([item '(a b c d e f g h i j k l m)])
|
|
|
|
|
(async-channel-put work-channel item))
|
|
|
|
@ -220,16 +262,17 @@ processed. If the main thread were to exit without such synchronization, it is
|
|
|
|
|
possible that the worker threads will not finish processing some items or the
|
|
|
|
|
print thread will not print all items.
|
|
|
|
|
|
|
|
|
|
@section{Synchronizable events and sync}
|
|
|
|
|
@section{Synchronizable Events and @racket[sync]}
|
|
|
|
|
|
|
|
|
|
There are other ways to synchronize threads. The @racket[sync] function allows
|
|
|
|
|
threads to coordinate via @tech[#:doc reference-doc]{synchronizable events}s.
|
|
|
|
|
Many types double as events, allowing a uniform way to synchronize threads
|
|
|
|
|
using different types. Examples include channels, ports, and alarms.
|
|
|
|
|
threads to coordinate via @tech[#:doc reference-doc]{synchronizable events}.
|
|
|
|
|
Many values double as events, allowing a uniform way to synchronize threads
|
|
|
|
|
using different types. Examples of events include channels, ports, threads,
|
|
|
|
|
and alarms.
|
|
|
|
|
|
|
|
|
|
In the next example, a channel and an alarm are used as synchronizable events.
|
|
|
|
|
The workers sync on both in order to process the channel items up until the
|
|
|
|
|
alarm is activated. The channel items are processed and then results sent back
|
|
|
|
|
The workers @racket[sync] on both so that they can process channel items until the
|
|
|
|
|
alarm is activated. The channel items are processed, and then results are sent back
|
|
|
|
|
to the main thread.
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
@ -264,17 +307,19 @@ to the main thread.
|
|
|
|
|
|
|
|
|
|
The next example shows a function for use in a simple TCP echo server. The
|
|
|
|
|
function uses @racket[sync/timeout] to synchronize on input from the given port
|
|
|
|
|
or a message in the thread's mailbox. @racket[sync/timeout]'s first argument
|
|
|
|
|
specifies the maximum number of seconds it should wait on the given events.
|
|
|
|
|
@racket[read-line-evt] returns an event that is ready when a line of input is
|
|
|
|
|
available in the given input port. @racket[thread-receive-evt] is ready when
|
|
|
|
|
or a message in the thread's mailbox. The first argument to @racket[sync/timeout]
|
|
|
|
|
specifies the maximum number of seconds it should wait on the given events. The
|
|
|
|
|
@racket[read-line-evt] function returns an event that is ready when a line of input is
|
|
|
|
|
available in the given input port. The result of @racket[thread-receive-evt] is ready when
|
|
|
|
|
@racket[thread-receive] would not block. In a real application, the messages
|
|
|
|
|
received in the thread mailbox could be used for control messages, etc.
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(define (serve in-port out-port)
|
|
|
|
|
(let loop []
|
|
|
|
|
(define evt (sync/timeout 2 (read-line-evt in-port 'any) (thread-receive-evt)))
|
|
|
|
|
(define evt (sync/timeout 2
|
|
|
|
|
(read-line-evt in-port 'any)
|
|
|
|
|
(thread-receive-evt)))
|
|
|
|
|
(cond
|
|
|
|
|
[(not evt)
|
|
|
|
|
(displayln "Timed out, exiting")
|
|
|
|
@ -285,18 +330,19 @@ received in the thread mailbox could be used for control messages, etc.
|
|
|
|
|
(flush-output out-port)
|
|
|
|
|
(loop)]
|
|
|
|
|
[else
|
|
|
|
|
(printf "Received a message in mailbox: ~a~n" (thread-receive))
|
|
|
|
|
(printf "Received a message in mailbox: ~a~n"
|
|
|
|
|
(thread-receive))
|
|
|
|
|
(loop)])))
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
The previous example could be used in a program like the following. This code
|
|
|
|
|
starts a server thread and a client thread which communicate over TCP. The
|
|
|
|
|
client prints 3 lines to the server, which echoes them back. The client's
|
|
|
|
|
copy-port call will block until EOF is received. The server will timeout after
|
|
|
|
|
2 seconds, closing the ports which then allows copy-port to finish and the
|
|
|
|
|
client will exit. The main thread uses @racket[thread-wait] to wait for the
|
|
|
|
|
client-thread to exit. Without @racket[thread-wait], the main thread might
|
|
|
|
|
exit before the other threads are finished.
|
|
|
|
|
The @racket[serve] function is used in the following example, which
|
|
|
|
|
starts a server thread and a client thread that communicate over TCP. The
|
|
|
|
|
client prints three lines to the server, which echoes them back. The client's
|
|
|
|
|
@racket[copy-port] call blocks until EOF is received. The server times out after
|
|
|
|
|
two seconds, closing the ports, which allows @racket[copy-port] to finish and the
|
|
|
|
|
client to exit. The main thread uses @racket[thread-wait] to wait for the
|
|
|
|
|
client thread to exit (since, without @racket[thread-wait], the main thread might
|
|
|
|
|
exit before the other threads are finished).
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(define port-num 4321)
|
|
|
|
@ -321,13 +367,13 @@ exit before the other threads are finished.
|
|
|
|
|
(thread-wait client-thread)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
Sometimes you want to attach result behavior directly to the event passed to
|
|
|
|
|
@racket[sync]. In the following example, the worker thread synchronizes on 3
|
|
|
|
|
Sometimes, you want to attach result behavior directly to the event passed to
|
|
|
|
|
@racket[sync]. In the following example, the worker thread synchronizes on three
|
|
|
|
|
channels, but each channel must be handled differently. Using
|
|
|
|
|
@racket[handle-evt] associates a callback with the given event. When
|
|
|
|
|
@racket[sync] selects the given event, it calls the callback to generate the
|
|
|
|
|
synchronization result, rather than using the event's normal synchronization
|
|
|
|
|
result. Because the event is handled in the callback, there is no need to
|
|
|
|
|
result. Since the event is handled in the callback, there is no need to
|
|
|
|
|
dispatch on the return value of @racket[sync].
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
@ -362,8 +408,9 @@ dispatch on the return value of @racket[sync].
|
|
|
|
|
(channel-put append-channel '("a" "b"))
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
@racket[handle-evt] invokes the callback in tail position, so it is safer to
|
|
|
|
|
use recursion, as in the following example.
|
|
|
|
|
The result of @racket[handle-evt] invokes its callback in tail position
|
|
|
|
|
with respect to @racket[sync], so it is safe to
|
|
|
|
|
use recursion as in the following example.
|
|
|
|
|
|
|
|
|
|
@racketblock[
|
|
|
|
|
(define control-channel (make-channel))
|
|
|
|
@ -393,3 +440,8 @@ use recursion, as in the following example.
|
|
|
|
|
(thread-wait worker)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
The @racket[wrap-evt] function is like @racket[handle-evt], except
|
|
|
|
|
that its handler is not called in tail position with respect to
|
|
|
|
|
@racket[sync]. At the same time, @racket[wrap-evt] disables break
|
|
|
|
|
exceptions during its handler's invocation.
|
|
|
|
|
|
|
|
|
|