簡単なまとめ
- Apache Beamでは
PTransform
PCollection
を使ってパイプラインを小さいステップに分割できる
- Apache Beamは分割してステップを簡単にテストできるテスティングフレームワークが整備されている
- パイプラインを細かく分割して、それらのテストを書くことでパイプラインの開発がしやすくなる
Apache Beamでは Pipeline
クラスのオブジェクトがデータ処理のタスク全体を管理しています。
この Pipeline
の中におけるデータ処理は PTransform
を使って処理を複数のステップに分割することが可能です。
PTransform
によって分割されたステップ間における入出力のデータは PCollection
という分散データセットとして受け渡すことが可能です。
beam.apache.org
Pipeline
は一つの PTransform
で記述することも可能ですが、単一責任の原則に則って一つのステップは一つの役割に分割することでコードの管理がしやすくなります。
ステップの分割を PTransform
と PCollection
で簡単に表現できるのがApache Beamが便利なところかなと思います。(他の分散処理基盤はあんまり触ったことないですが。。。)
やっていることが小さくなると、テスト設計もしやすくなるため、テストを書きたくなるかと思います。
Apache Beamはパイプラインを簡単にテストできるテスティングフレームワークを提供しています。
ここでは、 PTransform
を使った単体テストの書き方の例を示せればと思います。
以降ではJavaでの例のみとなります。
パイプラインの分割
まずはパイプラインの最初の入力と最後の出力をステップとして分けます。
最初の入力はマネージドサービスからの入力( Cloud Pub/Subなど )が多く、ここを分離しておくことで後続のステップを単体でテストしやすくなります。
最初のステップでは特にデータの内容に手を加えず、後続のステップにわたすための PCollection
への簡単な変換にとどめます。
最後の出力もマネージドサービスへの出力( BigQueryなど )が多いため、ここも分離しておきます。
最後の出力も、前段でデータを加工しておき、そのデータを出力するだけのシンプルなステップにします。
なにかしらの入力を PCollection に変換するステップ
↓
処理したい内容のステップ <- ここのテストを充実させる
↓
出力ステップ
このようにステップを分け、処理の内容の実体を単体でテストできるようにします。
PTransform
を使ったステップのテストの書き方
Apcahe Beamには TestPipeline
というクラスが用意されています。
TestPipeline (Apache Beam 2.27.0-SNAPSHOT)
このクラスを使って、PTransform
のステップのテストを書きます。
ステップ
まずはサンプルとして Foo
というステップを用意します
@AllArgsConstructor
public class Foo extends PTransform<PCollection<String>, PCollection<FooDto>> {
@Override
public PCollection<FooDto> expand(PCollection<String> input) {
return input.apply(ParDo.of(new ParseJson()));
}
public static class ParseJson extends DoFn<String, FooDto> {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<FooDto> receiver)
throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
FooDto[] parsedData = objectMapper.readValue(element, FooDto[].class);
for (FooDto d : parsedData) {
receiver.output(d);
}
}
}
}
PTransform
で受け渡すデータはSerializableである必要があります。
したがってontputのデータ FooDto
は Serializableなクラスとして定義します。
public class FooDto implements Serializable {
@JsonProperty("user_id")
@NonNull
private String userId;
@JsonProperty("account_id")
@NonNull
private Integer accountId;
}
テストコード
上記の Foo
のテストは下記のようになります。
class FooTest {
@Test
public void testFoo() throws Exception {
TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
PCollection<String> input =
p.apply(
Create.of(
"["
+ "{\"user_id\": \"aaa\", \"account_id\": 123}"
+ "]")
.withCoder(StringUtf8Coder.of()));
PCollection<FooDto> output = input.apply(new Foo());
PAssert.that(output)
.containsInAnyOrder(
new FooDto("aaa", 123);
p.run().waitUntilFinish();
}
}
TestPipeline
はテスト用のパイプラインを生成します。
ここで生成したパイプラインにテスト対象のステップをapplyすることで、このステップについてのみテストが可能になります。
PAssert
は PCollection
の内容に関するアサーションを提供しています。
PAssert (Apache Beam 2.27.0-SNAPSHOT)
これらを使うことで、処理結果のoutputを簡単に確認することができます。
テストを作ることで、手元で動作確認をしながら開発を進めることが可能です。
まとめ
Apache Beamのパイプラインはステップを適切に分割しそれぞれのテストを書くことで格段に開発、メンテがしやすくなります。
ぜひテストも書きながらApache Beamのパイプラインを構築してみてください。