マイペースなRailsおじさん

Ruby、Ruby on Rails、オブジェクト指向設計を主なテーマとして扱います。だんだん大きくなっていくRuby on Rails製プロダクトのメンテナンス性を損なわない方法を考えたり考えなかったりしている人のブログです。

非同期処理のことを考えていたら頭がバグった

この記事

Rubyで非同期処理しよ~、って思ってちょっと調べ始めた。調べれば調べるほど訳わかんなくなって来てしまい、頭が飽和してきた。もうなんかほんとによくわかんない。今の頭の中のダンプをとっておき、ちょっと頭をリフレッシュしたくなってきた。 という感じでただ吐き出すだけなので、この記事の信憑性は低いです。

Rubyの非同期処理API

Process

プロセスを作れる。つまりfork(2)できる。他の2つと比べるとメモリ使用量、生成コストともに大きい。

コピーオンライトがあるのでメモリをそのままコピーするわけではない。賢い。

Rubyで並列処理(ここでは時間的に厳密に同時に処理を行うことを指す)をするための唯一の方法。

Thread

スレッドを作れる。しかし、タイムアウト、I/O待ちを除いてマルチコアで並列処理ができない。な、なんだってー!というやりとりは、Rubyだとよくある話。Global VM Lock(GVL)という制約があるので、同じコアでしか処理できない。

別スレッドとして動いてはくれるので、同じコアだけど同時に動いてるように見える。並行処理だけど並列処理じゃない。

Fiber

軽量スレッドを作れる。作った軽量スレッドの実行、停止、再開をプログラマが指示する。動かしている間は呼び出し元をブロッキングする。

つまり別スレッドなんだけど、並列処理してくれるわけではないし、自動的に並行処理してくれるわけでもない。

使い分け

Rubyでの計算を高速化したい

Process。

IO待ちの時間を削減したい

Thread, Fiber, あるいはまた別の方法。

Threadは並列処理してくれない。でもIO待ちのときは並列に動ける。てことはIO待ちが発生するようなタスクだったらThreadが有効活用できそうだっていう話。Processでももちろんできるけど、Threadのほうがコストが小さい。Pumaはこれ。

でもよく考えると、Unixにはselectってやつがある。こいつを使うと、IO待ちになっている複数の場所を監視して、使えるようになったやつを選択できる。ということは、IO待ちの時間を有効に使おうと思ったとき、そもそもThreadを使わなくてもいいんじゃないかって話になってくる。

Fiberは、処理を実行途中で停止できるので、あ!IO待ちになりそうだ、Fiber.yieldでぬけよう。って感じのことができる。selectでIO待ちを回避できていることがわかれば、resumeする。これでブロッキングせずに処理を続けられる。selectをしたときに使える口が見つからなかったときは、新しいタスクを開始してあげれば、計算リソースを無駄にせずに済む。

コールバックを使う、というやり方をしてもよい。計算リソースの使われ方はだいたい同じ。IO待ちじゃなくなったときに実行する処理を登録しておく。ただ、コールバックはよくコールバック地獄っていう言葉で揶揄されるようにどうも人類には早すぎるらしい。

あれ、コールバック地獄への有効打って他にもないっけ

なんかこのへんでほんとに訳わかんなくなってきてしまった。よく理解していない知識を頭の中にとどめておくのって良くないですね。

EventMachine

というgemがある。このひと全く知らないのだけど、jsのコールバック付きの関数群みたいな関数群みたいなやつを提供してくれるやつっぽい。

Observerパターン

このタスクが終わったらこれやっといてね、っていうのを登録してくやつ。 これの実行タスクとして非同期処理を登録してやると、あとの後続処理をオブザーバーにかけるので、コールバック使わなくて済むのではなかったか。

EventMachineとObserverパターン組み合わせれば最強じゃね?みたいなことを思った。それがすべての元凶。

Future

ScalaのFuture。JSでいうPromise。非同期で処理して、後続処理を登録できる。 あれ、Observerパターンと似てる。Scalaでは登録された動作に対してスレッドを作る。作った瞬間勝手にどこかで処理されている、みたいな感じ。

2つのWebAPIからのデータを取得してまとめる、みたいなことをしようとしたとするじゃないですか。そうすると、外部との通信のコストを考えると非同期でやりたいなーと思うわけですね。

thread_a = Thread.new { api_a_call }
thread_b = Thread.new { api_b_call }
pp "I got #{thread_a.value} #{thread_b.value}"

こんな。なんかこれがダサいと思ったんですね。 APIは実際はこんな関数を入れます。時間が止まればいいです。

def api_a_call
  sleep 10
  'answer_from_api_a'
end

def api_b_call
  sleep 10
  'answer_from_api_b'
end

2つのAPIを並列に取得してあげると、10秒で帰ってきてくれるはずです。しかし、これってthread_a.valueがthread_bの定義よりも先にきてはいけないんですね。いけないというか、意図しない待ち方をして、20秒かかってしまうんです。

ここで、Futureを使ったらこんな書き方ができるんですね。

Future.new { api_a_call }
      .zip { Future.new { api_b_call } }
      .onComplete { |a, b| pp "I got #{a} and #{b}" }

わあかっこいい!素敵! いやでもこれFutureってどうやって実装するんだろう難しいなー。みたいなことを考えていました。

ReactiveExtentions

これってScalaのFutureの強い版ではなかったっけ。なんかAPI仕様がてんこ盛りで過ごそう。Futureとどう違うの君は。

だめだ、あたまがバグってきた

Fiber使わなくても、JSみたいなコールバッグを扱うAPIがあって、それを地獄に堕ちずに使える方法があるならそっちのほうがいいのでは。というようなことを思ったのです。

そうしたら、心当たりがあったのだけど、中途半端な知識しかなくて調べれば調べるほどわけわからなくなってきた。 ちょっと整理したいけど、溜め込みすぎたので一旦脳を休めたい。