Code Beam async_stream

nested_async_stream

defmodule ConcurrencyTest do
  def nested_async_stream({range, concurrency}) do
    chunk_size = 100
    outer_concurrency = div(concurrency, chunk_size)

    range
    |> Stream.chunk_every(chunk_size)
    |> Task.async_stream(
      fn chunk ->
        chunk
        |> Task.async_stream(
          fn _ ->
            Process.sleep(10)
            1
          end,
          max_concurrency: chunk_size,
          ordered: false,
          timeout: :infinity
        )
        |> Stream.run()
      end,
      max_concurrency: outer_concurrency,
      ordered: false,
      timeout: :infinity
    )
    |> Stream.run()
  end
end
tests = 100..2000//100
  |> Enum.map(fn concurrency -> {1..1_000_000, concurrency} end)
  |> Enum.map(fn {_, concur} = args -> 
    start = System.system_time(:millisecond)
    ConcurrencyTest.async_stream_test(args)
    stop = System.system_time(:millisecond)
    %{concurrency: concur, duration: stop - start}
  end)