defmodule ConcurrencyTest do
def nested_async_stream({range, concurrency}) do
chunk_size = 100
outer_concurrency = div(concurrency, chunk_size)
range
|> Stream.chunk_every(chunk_size)
|> Task.async_stream(
fn chunk ->
chunk
|> Task.async_stream(
fn _ ->
Process.sleep(10)
1
end,
max_concurrency: chunk_size,
ordered: false,
timeout: :infinity
)
|> Stream.run()
end,
max_concurrency: outer_concurrency,
ordered: false,
timeout: :infinity
)
|> Stream.run()
end
end
tests = 100..2000//100
|> Enum.map(fn concurrency -> {1..1_000_000, concurrency} end)
|> Enum.map(fn {_, concur} = args ->
start = System.system_time(:millisecond)
ConcurrencyTest.async_stream_test(args)
stop = System.system_time(:millisecond)
%{concurrency: concur, duration: stop - start}
end)