kenju's blog

About Programming and Mathematics

『Go言語による並行処理』を読んで並行ストリーム処理のためのライブラリ "kenju/go-pipeline" を作り始めた

以下の記事で書いたように、『Go言語による並行処理』を読んだ。 第四章でストリーム処理のための言語パターンがいくつか紹介されていた。

itiskj.hatenablog.com

そこで、紹介されていたコードをベースに、いくつか自分なりの解釈を加えてライブラリ化した。

github.com

What's this?

書籍で紹介されていた機能に加えて、以下の機能を追加実装した。

  • interface だけではなく、string / int / float32 型それぞれのメソッドを用意
  • channel 間のキャンセルには独自の channel ではなく標準パッケージの context.Context 型を利用
  • Map, Reduce, Generator などのロジックも追加実装
    • Functional Programming な概念はストリーム処理と相性が良く、Map/Reduce があればかなり多くのロジックは組み立てることができるはず
  • CI/CD パイプラインのセットアップ
    • Circle CI を利用。go fmtgo lint も組み込んで開発を続けやすいように整えた

Documentation

最小限のドキュメントは godoc に書いてあるので、そちらを参照されたい。

https://godoc.org/github.com/kenju/go-pipeline

How to use this?

pipeline.Take

例えば、pipeline.Take の例。int 型のチャンネルを pipeline.GeneratorInt で生成し、任意の数を取得している。

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for v := range pipeline.TakeInt(ctx, pipeline.GeneratorInt(ctx, 1, 2, 3, 4, 5), 3) {
    fmt.Printf("%v ", v)
}

Output

1 2 3

pipeline.Map

string 型向けの pipeline.Map の使用例。channel から適宜渡されてくる string 型の値に対して、map function を適用していく。 これと pipeline.Reduce を組み合わせれば、簡単な MapReduce は書けると思う。

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mapFn := func(v string) string { return "**" + v + "**"}
for v := range pipeline.MapString(ctx, mapFn, pipeline.GeneratorString(ctx, "hello", "world")) {
    fmt.Println(v)
}

Output

**hello**
**world**

Example

以前、以下の記事で紹介したような、作業効率化のためのライブラリをメンテナンスしていたが、今回を機に concurrency 化した。その際に、go-pipeline を用いた。

itiskj.hatenablog.com

以下が使用箇所。go-groom では、pipeline.Takepipeline.FanIn を利用している。

https://github.com/kenju/go-groom/blob/c32ff7a37a8182497dd780f341b6de6da99b2c88/pipeline.go#L38

   pipelines := pipeline.Take(ctx, pipeline.FanIn(ctx, executionPipeline...), len(paths))
    for result := range pipelines {
        fmt.Printf(result.(execResult).Dir + "\n")
        if result.(execResult).Error != nil {
            fmt.Printf("\tError: %v\n", result.(execResult).Error)
        }
        fmt.Printf("\t" + result.(execResult).Out + "\n")
    }

Future's Work

基本的な機能については書ききったが、テストが書ききれていない。 また、latest version はまだ v0.1.2 であり、go-groom 以外の場所で適用の機会を探っていきながら、major を v1 にあげて production ready までもっていきたい。