CubicLouve

Spring_MTの技術ブログ

Apache BeamでPTransformを使って分割したステップについてテストをする(Java)

簡単なまとめ

  • Apache Beamでは PTransform PCollectionを使ってパイプラインを小さいステップに分割できる
  • Apache Beamは分割してステップを簡単にテストできるテスティングフレームワークが整備されている
  • パイプラインを細かく分割して、それらのテストを書くことでパイプラインの開発がしやすくなる

Apache Beamでは Pipeline クラスのオブジェクトがデータ処理のタスク全体を管理しています。

この Pipeline の中におけるデータ処理は PTransform を使って処理を複数のステップに分割することが可能です。

PTransform によって分割されたステップ間における入出力のデータは PCollection という分散データセットとして受け渡すことが可能です。

beam.apache.org

Pipeline は一つの PTransform で記述することも可能ですが、単一責任の原則に則って一つのステップは一つの役割に分割することでコードの管理がしやすくなります。

ステップの分割を PTransformPCollection で簡単に表現できるのがApache Beamが便利なところかなと思います。(他の分散処理基盤はあんまり触ったことないですが。。。)

やっていることが小さくなると、テスト設計もしやすくなるため、テストを書きたくなるかと思います。

Apache Beamはパイプラインを簡単にテストできるテスティングフレームワークを提供しています。

ここでは、 PTransform を使った単体テストの書き方の例を示せればと思います。

以降ではJavaでの例のみとなります。

パイプラインの分割

まずはパイプラインの最初の入力と最後の出力をステップとして分けます。

最初の入力はマネージドサービスからの入力( Cloud Pub/Subなど )が多く、ここを分離しておくことで後続のステップを単体でテストしやすくなります。

最初のステップでは特にデータの内容に手を加えず、後続のステップにわたすための PCollection への簡単な変換にとどめます。

最後の出力もマネージドサービスへの出力( BigQueryなど )が多いため、ここも分離しておきます。

最後の出力も、前段でデータを加工しておき、そのデータを出力するだけのシンプルなステップにします。

なにかしらの入力を PCollection に変換するステップ
↓
処理したい内容のステップ <- ここのテストを充実させる
↓
出力ステップ

このようにステップを分け、処理の内容の実体を単体でテストできるようにします。

PTransform を使ったステップのテストの書き方

Apcahe Beamには TestPipeline というクラスが用意されています。

TestPipeline (Apache Beam 2.27.0-SNAPSHOT)

このクラスを使って、PTransform のステップのテストを書きます。

ステップ

まずはサンプルとして Foo というステップを用意します

@AllArgsConstructor
public class Foo extends PTransform<PCollection<String>, PCollection<FooDto>> {

  @Override
  public PCollection<FooDto> expand(PCollection<String> input) {                                                                                                              
    return input.apply(ParDo.of(new ParseJson()));
  }

  public static class ParseJson extends DoFn<String, FooDto> {
    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<FooDto> receiver)
        throws IOException {
      ObjectMapper objectMapper = new ObjectMapper();
      FooDto[] parsedData = objectMapper.readValue(element, FooDto[].class);
      for (FooDto d : parsedData) {
        receiver.output(d);
      }
    }
  }
}

PTransform で受け渡すデータはSerializableである必要があります。

したがってontputのデータ FooDto は Serializableなクラスとして定義します。

public class FooDto implements Serializable {                                                                                                                                 
  @JsonProperty("user_id")
  @NonNull
  private String userId;

  @JsonProperty("account_id")
  @NonNull
  private Integer accountId;
}

テストコード

上記の Foo のテストは下記のようになります。

class FooTest {
  @Test
  public void testFoo() throws Exception {
    TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);                                                                                                        

    PCollection<String> input =
        p.apply(
            Create.of(
                    "["
                        + "{\"user_id\": \"aaa\", \"account_id\": 123}"
                        + "]")
                .withCoder(StringUtf8Coder.of()));
    PCollection<FooDto> output = input.apply(new Foo());

    PAssert.that(output)
        .containsInAnyOrder(
            new FooDto("aaa", 123);

    p.run().waitUntilFinish();
  }
}

TestPipeline はテスト用のパイプラインを生成します。

ここで生成したパイプラインにテスト対象のステップをapplyすることで、このステップについてのみテストが可能になります。

PAssertPCollection の内容に関するアサーションを提供しています。

PAssert (Apache Beam 2.27.0-SNAPSHOT)

これらを使うことで、処理結果のoutputを簡単に確認することができます。

テストを作ることで、手元で動作確認をしながら開発を進めることが可能です。

まとめ

Apache Beamのパイプラインはステップを適切に分割しそれぞれのテストを書くことで格段に開発、メンテがしやすくなります。

ぜひテストも書きながらApache Beamのパイプラインを構築してみてください。