-
Notifications
You must be signed in to change notification settings - Fork 57
Executors
Generally, the abstractions in Lamina don’t care about threads. If we create a topology using channels:
> (def ch (channel))
#'ch
> (->> ch (map* inc) (filter* even?))
<== [...]
then we can call (enqueue ch ...)
simultaneously on as many threads as we like without any blocking or contention. Channels and pipelines describe the flow of data, and are designed such that the specifics of how the data is produced doesn’t affect the result.
However, concurrency in the JVM is often built atop threads, and so Lamina provides tools for dealing with threads in the lamina.executor
namespace. The simplest tool is task
, which is like Clojure’s future
but returns an async-promise. This allows us to take a function call that is either computational expensive or uses blocking I/O, and use it with the existing tools in Lamina:
(run-pipeline
(->> urls
(map #(task (http/get %)))
(apply merge-results))
(fn [results]
....))
Unlike future
, by default task
does not propagate thread-local bindings into the body of the task. This is because this can often cause unintended resource leaks, or other issues. To replicate this behavior, use bound-task
instead.
Under the covers, task
uses an executor, which is a thread pool which plays nicely with Lamina’s instrumentation capabilities. Creating your own executor is easy, via (executor options)
, where options
is a map that must contain a :name
key, and optionally can contains the following:
:min-thread-count |
the minimum number of live threads in the thread pool, defaults to 1 |
:max-thread-count |
the maximum number of live threads in the thread pool, defaults to unbounded |
:idle-timeout |
the time in milliseconds that a thread must remain idle to be reclaimed, defaults to one minute |
:interrupt? |
whether tasks that time out should be interrupted, causing them to throw an InterruptedException in any blocking methods |
We may also create an executor using defexecutor
, which implicitly uses the fully qualified name as the executor’s name:
(defexecutor my-executor
{:max-thread-count 64})
Executors can be used in a variety of circumstances, but a common usage is in the options map of instrumented functions.
(defn-instrumented foo
{:executor my-executor
:timeout (fn [a b] 100)
:with-bindings? false}
[a b]
(+ a b))
This defines a function which will be executed on my-executor
, and return an async-promise representing the outcome. We have also defined a timeout, which is a function that takes the arguments, and returns the maximum time in milliseconds that can elapse before the async-promise is realized as a timeout exception. If the executor has :interrupt?
set to true, when the timeout elapses the thread running the task will be interrupted.
If :with-bindings?
is true, then the thread-local bindings at invocation will be propagated to the executor thread. This defaults to false.
By adding an :executor
field to a [[pipeline’s|pipelines] option map, we can control where the pipeline stages execute. This can be useful when pipelines are working with events created by an event loop or some other thread-constrained environment.
An executor-channel
is a channel which receives messages on one thread, and emits them on a thread provided by the defined :executor
.
> (def x-ch (executor-channel {:name "my-executor-channel", :executor my-executor}))
#'x-ch
> (.getId (Thread/currentThread))
39
> (receive-all x-ch #(prn "msg!" % (.getId (Thread/currentThread))))
true
> (enqueue ch 1)
"msg!" 1 157
<< ... >>
Assuming the executor has a :max-thread-count
options, this means that we can now control the level of parallelism downstream of the executor-channel, no matter how many threads are enqueueing messages. This can be useful both for reducing the number of threads when we want to bound the computational overhead, and for increasing the number of threads when we want to distribute message processing across a greater number of threads.
Since we often want to perform this threading control on a pre-existing thread, it’s often useful to use defer
instead of executor-channel
:
> (def ch (channel))
#'ch
> (defer my-executor ch)
<== [...]
This simply takes a channel, and returns a channel that emits messages via the provided executor.
Since messages coming in on the same thread may be emitted on different threads, this means that the order of messages may be changed. defer
and executor-channel
should only be used when this is not a problem.
If we do want to preserve message ordering, then we need to use pmap*
:
> (def ch (channel))
#'ch
> (pmap* {:executor my-executor} inc ch)
<== [...]
pmap*
behaves much like Clojure’s pmap
, with the additional parameter of what executor should be used when applying the transform function.