スポンサーリンク

2016年7月15日

[Elixir]GenStageのExampleを実行してみる

Goal

  • GenStageのExamplesを試してみる

Dev-Environment

OS: Windows8.1
Erlang: Eshell V8.0, OTP-Version 18.0
Elixir: v1.3.0

Context

Caution:

2016/07/15に本記事を作成しました。
このときのGenStageのバージョンはv0.3です。
また、英語力不足や誤解している部分があると思います。
その点、ご了承の上で本記事をお読みください。

What is GenStage?

Quote:

GenStage is a specification for exchanging events between producers and consumers.
GenStageはプロデューサとコンシューマの間でイベントを交換する仕様とのこと…よく分からんorz
現在、提供されている機能について
  • Experimental.GenStage
プロデューサとコンシューマのステージを実現するためのビヘイビア。
(普通にGenStageのビヘイビアです)
  • Experimental.DynamicSupervisor
このモジュールは、(普通の)スーパバイザのsimple_one_for_one戦略の代替品として使える。
それに加えて、ステージパイプラインでイベント毎に新しいプロセスを起動するステージコンシューマを簡単に作って、使うことができる。
さらっと説明を見るだけでは分かりませんでした。
ちなみに、GenStageは、将来Elixirのリリースに含まれる予定みたいです。
だから、Experimental(実験)が付いているみたいです。
(現状はエイリアスしてね的なことが書いてありました)
ようは、下記のような記述を追加しておけばいいみたいです。

Example:

alias Experimental.{DynamicSupervisor, GenStage}
何はともあれ、とりあえず使ってみましょう。
(弱小プログラマだから説明だけじゃ分からないだけ・・・)

Install

前提として、GenStageを使いたいのであれば、Elixir v1.3以上が必要です。
mix.exsファイルへ下記のように:gen_stage追加してください。

File: mix.exs

defmodule GenStageExample.Mixfile do
  ...

  def application do
    [applications: [:logger,
                 :gen_stage]] # Add it
  end

  defp deps do
    [{:gen_stage, "~> 0.1"}] # Add it
  end
end
パッケージとして取得します。

Example:

> mix deps.get
Running dependency resolution
Dependency resolution completed
  gen_stage: 0.3.0
* Getting gen_stage (Hex package)
  Checking package (https://repo.hex.pm/tarballs/gen_stage-0.3.0.tar)
  Fetched package
リリースに入ればこの作業はいらなくなるでしょう。
とりあえず、使う準備は完了です。

Using

Exampleがあるみたいです。elixir-lang/gen_stageに全部で4つのファイルが置いてありますね。
  • dynamic_supervisor.exs
  • gen_event.exs
  • producer_consumer.exs
  • stream_stage.exs
これを参考に試してみましょう。
  • stream_stage
まずは一番簡単そうな、stream_stage.exsのような内容をiexから実行してみます。

Example:

> iex -S mix

## エイリアスの定義
iex> alias Experimental.GenStage

## 実行例1
iex> {:ok, stage} = File.read!("example.txt") |> String.split("\r\n") |> Stream.map(&{String.length(&1), &1}) |> GenStage.from_enumerable(consumers: :permanent)
{:ok, #PID<0.295.0>}

iex> GenStage.stream([stage]) |> Enum.sort |> Enum.take(2) |> IO.inspect
[{3, "aaa"}, {3, "bbb"}]

## 実行例2
iex> {:ok, stage} = File.read!("example.txt") |> String.split("\r\n") |> Stream.map(&{String.length(&1), &1}) |> GenStage.from_enumerable(consumers: :permanent)

iex> GenStage.stream([stage]) |> Enum.sort(&(&1 > &2)) |> Enum.take(2) |> IO.inspect
[{8, "hugehuge"}, {8, "hogehoge"}]

## 二度目は実行できない
iex> GenStage.stream([stage]) |> Enum.sort(&(&1 > &2)) |> Enum.take(2) |> IO.inspect
** (exit) exited in: Experimental.GenStage.close_stream(%{})
    ** (EXIT) no process
    (gen_stage) lib/gen_stage.ex:1077: Experimental.GenStage.close_stream/1
       (elixir) lib/stream.ex:1129: Stream.do_resource/5
       (elixir) lib/enum.ex:1635: Enum.reduce/3
       (elixir) lib/enum.ex:1968: Enum.sort/2
ファイル読込みで使っているファイルの内容は下記になります。
(特筆すべきことは書いていません)

File: example.txt

aaa
bbb
ccc
ddd
hogehoge
hugehuge
forbar
  • producer_consumer
続いて、[A]->[B]->[C]とステージで流れを持つイベントとしてパイプラインに設定してみます。
アプリケーションとしての開始地点。

lib/producer_consumer/app.ex

defmodule ProducerConsumer.App do
  def start do
    alias Experimental.{GenStage}
    alias ProducerConsumer.{StageA, StageB, StageC}

    {:ok, a} = GenStage.start_link(StageA, 0)
    {:ok, b} = GenStage.start_link(StageB, 2)
    {:ok, c} = GenStage.start_link(StageC, :ok)

    GenStage.sync_subscribe(b, to: a)
    GenStage.sync_subscribe(c, to: b)
    Process.sleep(:infinity)
  end
end
プロデューサとしてのStageAモジュール。

lib/producer_consumer/stage_a.ex

alias Experimental.{GenStage}

defmodule ProducerConsumer.StageA do
  use GenStage

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    IO.inspect {__MODULE__, demand, counter}

    events = Enum.to_list(counter..counter+demand-1)
    {:noreply, events, counter + demand}
  end
end
プロデューサとコンシューマとしてのStageBモジュール。

lib/producer_consumer/stage_b.ex

alias Experimental.{GenStage}

defmodule ProducerConsumer.StageB do
  use GenStage

  def init(number) do
    {:producer_consumer, number}
  end

  def handle_events(events, _from, number) do
    IO.inspect {__MODULE__, events, number}

    events = Enum.map(events, fn(event) -> event * number end)
    {:noreply, events, number}
  end
end
コンシューマとしてのStageCモジュール。

lib/producer_consumer/stage_c.ex

alias Experimental.{GenStage}

defmodule ProducerConsumer.StageC do
  use GenStage

  def init(:ok) do
    {:consumer, :the_state_does_not_matter}
  end

  def handle_events(events, _from, state) do
    :timer.sleep(1000)

    IO.inspect {__MODULE__, events, state}

    {:noreply, [], state}
  end
end
実行してみると出力結果は下記のようになりました。

Example:

iex> ProducerConsumer.App.start
{ProducerConsumer.StageA, 1000, 0}
{ProducerConsumer.StageB,
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
  22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
  41, 42, 43, 44, 45, 46, 47, ...], 2}
{ProducerConsumer.StageA, 500, 1000}
{ProducerConsumer.StageB,
 [500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514,
  515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529,
  530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544,
  545, 546, 547, ...], 2}
{ProducerConsumer.StageA, 500, 1500}
{ProducerConsumer.StageC,
 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40
  42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78,
  80, 82, 84, 86, 88, 90, 92, 94, ...], :the_state_does_not_matter}
{ProducerConsumer.StageB,
 [1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012,
  1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023, 1024, 1025,
  1026, 1027, 1028, 1029, 1030, 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038,
  1039, 1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, ...], 2}
{ProducerConsumer.StageA, 500, 2000}
{ProducerConsumer.StageC,
 [1000, 1002, 1004, 1006, 1008, 1010, 1012, 1014, 1016, 1018, 1020, 1022, 1024,
  1026, 1028, 1030, 1032, 1034, 1036, 1038, 1040, 1042, 1044, 1046, 1048, 1050,
  1052, 1054, 1056, 1058, 1060, 1062, 1064, 1066, 1068, 1070, 1072, 1074, 1076,
  1078, 1080, 1082, 1084, 1086, 1088, 1090, 1092, 1094, ...],
 :the_state_does_not_matter}
{ProducerConsumer.StageB,
 [1500, 1501, 1502, 1503, 1504, 1505, 1506, 1507, 1508, 1509, 1510, 1511, 1512,
  1513, 1514, 1515, 1516, 1517, 1518, 1519, 1520, 1521, 1522, 1523, 1524, 1525,
  1526, 1527, 1528, 1529, 1530, 1531, 1532, 1533, 1534, 1535, 1536, 1537, 1538,
  1539, 1540, 1541, 1542, 1543, 1544, 1545, 1546, 1547, ...], 2}
{ProducerConsumer.StageA, 500, 2500}
{ProducerConsumer.StageC,
 [2000, 2002, 2004, 2006, 2008, 2010, 2012, 2014, 2016, 2018, 2020, 2022, 2024,
  2026, 2028, 2030, 2032, 2034, 2036, 2038, 2040, 2042, 2044, 2046, 2048, 2050,
  2052, 2054, 2056, 2058, 2060, 2062, 2064, 2066, 2068, 2070, 2072, 2074, 2076,
  2078, 2080, 2082, 2084, 2086, 2088, 2090, 2092, 2094, ...],
 :the_state_does_not_matter}
{ProducerConsumer.StageB,
 [2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012,
  2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025,
  2026, 2027, 2028, 2029, 2030, 2031, 2032, 2033, 2034, 2035, 2036, 2037, 2038,
  2039, 2040, 2041, 2042, 2043, 2044, 2045, 2046, 2047, ...], 2}
{ProducerConsumer.StageA, 500, 3000}

...長いので省略

Note:

StageAモジュールに仕込んでいるIO.inspectの値を見てみます。
start_linkで与えているcounterの値は0です。これはそのまま出ています。
最初の出力されているdemandの値は1000です。これどこからきたのでしょう・・・

def handle_demand(demand, counter) when demand > 0 do
  IO.inspect {__MODULE__, demand, counter}

  events = Enum.to_list(counter..counter+demand-1)
  {:noreply, events, counter + demand}
end

Note:

:producer、:consumer、:producer_consumerのどれでもinit/1の実装は必須
:producerタイプを指定した場合、handle_demand/2を実装する必要あり
:consumer、:producer_consumerタイプを指定した場合、handle_events/3を実装する必要あり
流れるように連続でイベントを発生させることができるってことか???
  • dynamic_supervisor
DynamicSupervisorを使った例。
(カウンタとして動作するプロデューサにコンシューマとして一つ以上のDynamicSupervisorを使用する例)

lib/my_dynamic_supervisor/app.ex

defmodule MyDynamicSupervisor.App do
  def start do
    import Supervisor.Spec
    alias MyDynamicSupervisor.{Counter, Consumer}

    children = [
      worker(Counter, [0]),
      worker(Consumer, [], id: 1)
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end

  def exec do
    start
    Process.sleep(:infinity)
  end
end

lib/my_dynamic_supervisor/counter.ex

alias Experimental.{GenStage}

defmodule MyDynamicSupervisor.Counter do
  use GenStage

  def start_link(initial) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(initial) do
    {:producer, initial}
  end

  def handle_demand(demand, counter) when demand > 0 do
    IO.inspect {__MODULE__, demand, counter}

    events = Enum.to_list(counter..counter+demand-1)
    {:noreply, events, counter + demand}
  end
end

lib/my_dynamic_supervisor/consumer.ex

alias Experimental.{GenStage, DynamicSupervisor}

defmodule MyDynamicSupervisor.Consumer do
  use DynamicSupervisor

  def start_link do
    DynamicSupervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    alias MyDynamicSupervisor.{Printer, Counter}

    children = [
      worker(Printer, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, subscribe_to: [Counter]}
  end
end

lib/my_dynamic_supervisor/printer.ex

defmodule MyDynamicSupervisor.Printer do
  def start_link(event) do
    :timer.sleep(1000)

    Task.start_link(fn ->
      IO.inspect {self(), event}
    end)
  end
end

Example:

iex> MyDynamicSupervisor.App.exec
{#PID<0.154.0>, 0}
{#PID<0.155.0>, 1}
{#PID<0.156.0>, 2}
{#PID<0.157.0>, 3}
{#PID<0.158.0>, 4}
{#PID<0.159.0>, 5}
{#PID<0.160.0>, 6}

...
  • gen_event
GenEventを置き換えた例。
(GenStageを使用する方法では、並行性を活用し、バッファサイズとバックプレッシャーに対してより多くの柔軟性が提供されている)

File: lib/replacement_gen_event/app.ex

defmodule ReplacementGenEvent.App do
  alias ReplacementGenEvent.{Broadcaster, Consumer}

  def start do
    import Supervisor.Spec

    children = [
      worker(Broadcaster, []),
      worker(Consumer, [], id: 1),
      worker(Consumer, [], id: 2),
      worker(Consumer, [], id: 3),
      worker(Consumer, [], id: 4),
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end

  def exec do
    start

    Broadcaster.sync_notify(1)
    Broadcaster.sync_notify(2)
    Broadcaster.sync_notify(3)
    Broadcaster.sync_notify(4)
    Broadcaster.sync_notify(5)

    Process.sleep(2000)
  end
end

File: lib/replacement_gen_event/broadcaster.ex

alias Experimental.{GenStage}

defmodule ReplacementGenEvent.Broadcaster do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def sync_notify(event, timeout \\ 5000) do
    GenStage.call(__MODULE__, {:notify, event}, timeout)
  end

  def init(:ok) do
    {:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_call({:notify, event}, from, {queue, demand}) do
    dispatch_events(:queue.in({from, event}, queue), demand, [])
  end

  def handle_demand(incoming_demand, {queue, demand}) do
    dispatch_events(queue, incoming_demand+demand, [])
  end

  defp dispatch_events(queue, demand, events) do
    with d when d > 0 <- demand,
      {{:value, {from, event}}, queue} <- :queue.out(queue) do
        GenStage.reply(from, :ok)
        dispatch_events(queue, demand-1, [event | events])
      else
        _ -> {:noreply, Enum.reverse(events), {queue, demand}}
      end
  end
end

File: lib/replacement_gen_event/consumer.ex

alias Experimental.{GenStage}

defmodule ReplacementGenEvent.Consumer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    alias ReplacementGenEvent.Broadcaster
    {:consumer, :ok, subscribe_to: [Broadcaster]}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      IO.inspect {self(), event}
    end
    {:noreply, [], state}
  end
end

Example:

iex> ReplacementGenEvent.App.exec
{#PID<0.154.0>, 1}
{#PID<0.155.0>, 1}
{#PID<0.153.0>, 1}
{#PID<0.156.0>, 1}
{#PID<0.154.0>, 2}
{#PID<0.155.0>, 2}
{#PID<0.153.0>, 2}
{#PID<0.156.0>, 2}
{#PID<0.154.0>, 3}
{#PID<0.155.0>, 3}
{#PID<0.153.0>, 3}
{#PID<0.156.0>, 3}
{#PID<0.154.0>, 4}
{#PID<0.155.0>, 4}
{#PID<0.153.0>, 4}
{#PID<0.156.0>, 4}
{#PID<0.154.0>, 5}
{#PID<0.155.0>, 5}
{#PID<0.153.0>, 5}
{#PID<0.156.0>, 5}
:ok
とりあえず一旦ここまでです。
一通り試してみて分かった。
全然、理解できてないと言うことが・・・
ドキュメントの方をもう少し詳しく読むことと、ソースコードの方も読むしかないかな。

Bibliography

人気の投稿