Task.async_stream
async_stream
, go back to Task module for more information.
Specs
async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()
Returns a stream that runs the given function fun
concurrently
on each element in enumerable
.
Works the same as async_stream/5
but with an anonymous function instead of a
module-function-arguments tuple. fun
must be a one-arity anonymous function.
Each enumerable
element is passed as argument to the given function fun
and
processed by its own task. The tasks will be linked to the current process,
similarly to async/1
.
Example
Count the code points in each string asynchronously, then add the counts together using reduce.
iex> strings = ["long string", "longer string", "there are many of these"]
iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end)
iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)
47
See async_stream/5
for discussion, options, and more examples.
async_stream(enumerable, module, function_name, args, options \\ [])
View Source (since 1.4.0)Specs
async_stream(Enumerable.t(), module(), atom(), [term()], keyword()) :: Enumerable.t()
Returns a stream where the given function (module
and function_name
)
is mapped concurrently on each element in enumerable
.
Each element of enumerable
will be prepended to the given args
and
processed by its own task. The tasks will be linked to an intermediate
process that is then linked to the current process. This means a failure
in a task terminates the current process and a failure in the current process
terminates all tasks.
When streamed, each task will emit {:ok, value}
upon successful
completion or {:exit, reason}
if the caller is trapping exits.
The order of results depends on the value of the :ordered
option.
The level of concurrency and the time tasks are allowed to run can be controlled via options (see the "Options" section below).
Consider using Task.Supervisor.async_stream/6
to start tasks
under a supervisor. If you find yourself trapping exits to handle exits
inside the async stream, consider using Task.Supervisor.async_stream_nolink/6
to start tasks that are not linked to the calling process.
Options
:max_concurrency
- sets the maximum number of tasks to run at the same time. Defaults toSystem.schedulers_online/0
.:ordered
- whether the results should be returned in the same order as the input stream. When the output is ordered, Elixir may need to buffer results to emit them in the original order. Setting this option to false disables the need to buffer at the cost of removing ordering. This is also useful when you're using the tasks only for the side effects. Note that regardless of what:ordered
is set to, the tasks will process asynchronously. If you need to process elements in order, consider usingEnum.map/2
orEnum.each/2
instead. Defaults totrue
.:timeout
- the maximum amount of time (in milliseconds or:infinity
) each task is allowed to execute for. Defaults to5000
.:on_timeout
- what to do when a task times out. The possible values are::exit
(default) - the process that spawned the tasks exits.:kill_task
- the task that timed out is killed. The value emitted for that task is{:exit, :timeout}
.
Example
Let's build a stream and then enumerate it:
stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
The concurrency can be increased or decreased using the :max_concurrency
option. For example, if the tasks are IO heavy, the value can be increased:
max_concurrency = System.schedulers_online() * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)
If you do not care about the results of the computation, you can run
the stream with Stream.run/1
. Also set ordered: false
, as you don't
care about the order of the results either:
stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
Stream.run(stream)
Attention: async + take
Given items in an async stream are processed concurrently, doing
async_stream
followed by Enum.take/2
may cause more items than
requested to be processed. Let's see an example:
1..100
|> Task.async_stream(fn i ->
Process.sleep(100)
IO.puts(to_string(i))
end)
|> Enum.take(10)
For a machine with 8 cores, the above will process 16 items instead
of 10. The reason is that async_stream/5
always have 8 elements
processing at once. So by the time Enum
says it got all elements
it needed, there are still 6 elements left to be processed.
The solution here is to use Stream.take/2
instead of Enum.take/2
to filter elements before-hand:
1..100
|> Stream.take(10)
|> Task.async_stream(fn i ->
Process.sleep(100)
IO.puts(to_string(i))
end)
|> Enum.to_list()
If for some reason you cannot take the elements before hand,
you can use :max_concurrency
to limit how many elements
may be over processed at the cost of reducing concurrency.