アナログ金木犀

つれづれなるまままにつれづれする

Rxを使えば非同期か同期かを意識しなくてよくなるというのはどういうことか

この件、多少説明した方がいいのではと思ったので記事にしました。

一応Androidの話です。題材はなんでもよかったんですがリポジトリパターンを題材としてみました。

あらかじめ言っておくと、コードの細かいツッコミは置いておいてもらえると。伝えたいのは 非同期か同期かを意識しなくていいとはどういうことか です。

Rxのなかった世界のころ

非同期がゆえに...

こういうリポジトリのinterfaceを実装したいと考えます。

interface CourseRepository {
  fun resolveById(id: CourseId): Course

  fun resolveAll(): List<Course>
}

でも実際は CourseAPIで取得するので実装は下記のようにならざるを得ないわけです。 (blockingGetすればええやんって思う人もいるかもしれませんが、Androidではmain threadでhttp通信したらCrashします)

class CourseRepositoryImpl {
  
  val service = Executors.newSingleThreadExecutor()
  
  fun resolveById(id: CourseId): Future<Course> =
    service.submit(Callable {
          return HTTP通信してResponseをCourseに変換したもの
    })
  

  fun resoveAll(): Future<List<Course>> =
    service.submit(Callable {
          return HTTP通信してResponseをList<Course>に変換したもの
    })  
  
}

そのため、泣く泣く 外部に提供するinterfaceを

interface CourseRepository {
  fun resolveById(id: CourseId): Future<Course>

  fun resolveAll(): Future<List<Course>>
}

のように Future で包んであげなければなりません。

キャッシュしてみる

HTTP通信したものをキャッシュする実装を考えます。

class CourseRepositoryImpl : CourseRepository {
  
  val service = Executors.newSingleThreadExecutor()
  val cache = Cache() // なんらかのCacheクラス
  
  fun resolveById(id: CourseId): Future<Course> =
    service.submit(Callable {
          if (cache[id] == null) {
            val course = HTTP通信してResponseをCourseに変換したもの
            cache[id] = course
            return course
          } else {
            return cache[id]
          }
    })
   :
}

Future型で返さないといけないのもあって、キャッシュを使うときも非同期用のスレッドで実行されます。無駄ですがしょうがないです。

ほかのRepositoryを実装してみる

次に下記のRepositoryを実装しようと考えます。

interface LevelRepository {
  fun resolveById(id: LevelId): Level

  fun resolveAll(): List<Level>
}

Level関連のデータは同期で取得できるのでこのままのinterfaceで良しとします。

改めて先の CourseRepository と見比べて見ましょう。

interface CourseRepository {
  fun resolveById(id: CourseId): Future<Course>

  fun resolveAll(): Future<List<Course>>
}

実装を隠すために interface きってるのに内部実装が非同期なんだなというのが丸わかりです。

そしてわかるだけならともかく resolveById とか似たような名前なのに使い方は全く違います。LevelRepository.resolveById では呼び出せばそのまま値が返ってきます。それに対して CourseRepository.resolveById は Thread & Handler 使うなりして非同期で取ってきたCourseなどをMainThreadでつかうようにする処理が必要になってしまします。

返り値の型を見ればわかるのですが、おなじ resolveById でまったく使い方が異なるので命名でもしっかり分けるようにします。今回は非同期の時は request というprefixをつける運用にします。

interface CourseRepository {
  fun requestResolveById(id: CourseId): Future<Course>

  fun requestResolveAll(): Future<List<Course>>
}

元の理想形からは遠くなりました。ただそれでも、名前だけでmain threadに戻さないといけないんだなってのはわかるので最高ではないですが以前よりはbetterでしょう。

betterな選択肢を選んできたつもりですが、以上では解決できないしょうがない課題がいくつか残りました。

  1. interfaceを使う側が非同期か同期かで(interfaceの内部実装の都合によって)いちいち処理を変えないといけない
  2. 無駄に非同期処理にせざるを得ない箇所がでてくる

いっそLevelRepositoryの各メソッドも全てFutureで包めばいいやという解決策もありですが、その場合はより課題 ( 2 ) の方が深刻になってきます。

Rxを導入する

Rxを導入します。この場合は全て Single や Maybe などの Rxの型で包む運用とします。

interface CourseRepository {
  fun resolveById(id: CourseId): Maybe<Course>

  fun resolveAll(): Single<List<Course>>
}
interface LevelRepository {
  fun resolveById(id: LevelId): Maybe<Level>

  fun resolveAll(): Single<Level>
}

これによって先の課題2つをまるっと解決できます。

interfaceを使う側が非同期か同期かでいちいち処理を変えないといけない問題について。

まず、interfaceを使う側は XXXRepository.resoveById(id).observeOn(uiScheduler).subscribe() とすれば、interfaceの内部実装が非同期だろうが同期だろうが常に処理をすれば済みます。もちろんinterfaceの内部実装が非同期か同期かを知るすべはありませんし、意識する必要もありません。

次に無駄に非同期処理にせざるを得ない箇所がでてくる問題について。

これについても subscibeOn によって処理スレッドを実装時に指定できるのでオールおっけーです。

例えば先ほどのキャッシュありの実装は下記のようになります。

class CourseRepositoryImpl : CourseRepository {
  
  fun resolveById(id: CourseId): Maybe<Course> =
    if (cache[id] == null) {
      return Single.create(HTTP通信してResponseをList<Course>に変換する処理)
        .doOnSuccess(キャッシュに保存)
        .toMaybe
        .observeOn(ioScheduler)    // 非同期用のスケジューラを設定
    } else {
      return Maybe.just(cache[id]) // 非同期用のスケジューラは設定しなくて良い
    }
    :
}

処理によって選択することができるので無駄に非同期処理みたいなことはなくなります。

まとめ

  • Rx(など)を使わなかったら下記の課題がある
    • interfaceの各メソッドの内部処理によって使う側が非同期か同期かを考えて実装しなくていけなくて辛い
    • 必要のない非同期処理がが入ってしまうことがある
  • Rxを使うことで処理によって、使う側が非同期か同期を知らずに実装できる (一部運用は必要)

kotlinのcoroutineが出てきて、ExecutorやThread Handlerを使うのが面倒という課題があってRxを使ってた人と、この記事に書いたような課題も感じてRxを使ってた人でギャップがあるなぁと感じた昨今です。

coroutineは確かに素晴らしいですが、上記のような課題を解決(するためのもの)/(できるもの)ではないので、自分の場合は Rx を coroutine に置き換えるというのが表現として厳しいなぁと思った次第であります。

coroutineがダメだといういう気はなく、Rxのここが良いんだよというのが伝われば幸いです。

追記

申し訳ありません。kagurazakaさんの方法で確かにできそうです!

ただ、このツイートのメンションで話してますが、 coroutineでは非同期前提のものを同期でも扱えるのに対し、Rxは非同期同期関係なく扱えるみたいな違いはあるのかなと。 そして今回のケースは自分的にはやはり後者の方が用途としては妥当なのかなと思いました。まぁでもkurokawaさんのいうように気持ち良さとかのレベルの話であるのは違いないです。

ツイッターでうまく議論できない子なので、できれば今度対面でいろんな方と話したいなと思った次第です!ぜひお話しさせてくださいませ!