CubicLouve

Spring_MTの技術ブログ

Java SDKでApache Beam(Dataflow)でGradleを使うサンプル

Java SDKを使ったApache Beamのパイプライン構築については公式ドキュメントなどを見るとMavenを使うことが多いのですが、Gradleを使った例が見当たらなかったので作ってみました。

github.com

個人的な感想なのですが、Mavenだと依存関係をpomで定義書くのが辛かったのですが、Gradleのほうが素直に書けてそこだけでもGradleを使うメリットはあるかなあと思っています。

Gradle経由でDataflowにdeployする定義も作っています。

apache-beam-gradle-sample/build.gradle at main · SpringMT/apache-beam-gradle-sample · GitHub

サンプルプロジェクト構成について

pipeline

apache-beam-gradle-sample/src/main/java/sample/pipeline at main · SpringMT/apache-beam-gradle-sample · GitHub

パイプラインの構築だけに責務があります。

pipelineの中には具体的な処理内容は書かず、エントリーポイントと処理を担当するクラス(ステージ)を定義するだけにしています。

transform

apache-beam-gradle-sample/src/main/java/sample/transform at main · SpringMT/apache-beam-gradle-sample · GitHub

PTransform を使い、具体的な処理内容を書きます。

ステージに相当する部分となります。

単一責任の原則に則り1クラス1処理とし、できる限り単体テストを書くようにします。

ステージを跨ぐ場合は、 PCollection を使ってデータを受け渡しを行います。

application

apache-beam-gradle-sample/src/main/java/sample/application at main · SpringMT/apache-beam-gradle-sample · GitHub

アプリケーションロジックなどをここにまとめます。

ステージを跨ぐデータのクラスを置いたりしています。

参考

qiita.com

Apache BeamでPTransformを使って分割したステップについてテストをする(Java)

簡単なまとめ

  • Apache Beamでは PTransform PCollectionを使ってパイプラインを小さいステップに分割できる
  • Apache Beamは分割してステップを簡単にテストできるテスティングフレームワークが整備されている
  • パイプラインを細かく分割して、それらのテストを書くことでパイプラインの開発がしやすくなる

Apache Beamでは Pipeline クラスのオブジェクトがデータ処理のタスク全体を管理しています。

この Pipeline の中におけるデータ処理は PTransform を使って処理を複数のステップに分割することが可能です。

PTransform によって分割されたステップ間における入出力のデータは PCollection という分散データセットとして受け渡すことが可能です。

beam.apache.org

Pipeline は一つの PTransform で記述することも可能ですが、単一責任の原則に則って一つのステップは一つの役割に分割することでコードの管理がしやすくなります。

ステップの分割を PTransformPCollection で簡単に表現できるのがApache Beamが便利なところかなと思います。(他の分散処理基盤はあんまり触ったことないですが。。。)

やっていることが小さくなると、テスト設計もしやすくなるため、テストを書きたくなるかと思います。

Apache Beamはパイプラインを簡単にテストできるテスティングフレームワークを提供しています。

ここでは、 PTransform を使った単体テストの書き方の例を示せればと思います。

以降ではJavaでの例のみとなります。

パイプラインの分割

まずはパイプラインの最初の入力と最後の出力をステップとして分けます。

最初の入力はマネージドサービスからの入力( Cloud Pub/Subなど )が多く、ここを分離しておくことで後続のステップを単体でテストしやすくなります。

最初のステップでは特にデータの内容に手を加えず、後続のステップにわたすための PCollection への簡単な変換にとどめます。

最後の出力もマネージドサービスへの出力( BigQueryなど )が多いため、ここも分離しておきます。

最後の出力も、前段でデータを加工しておき、そのデータを出力するだけのシンプルなステップにします。

なにかしらの入力を PCollection に変換するステップ
↓
処理したい内容のステップ <- ここのテストを充実させる
↓
出力ステップ

このようにステップを分け、処理の内容の実体を単体でテストできるようにします。

PTransform を使ったステップのテストの書き方

Apcahe Beamには TestPipeline というクラスが用意されています。

TestPipeline (Apache Beam 2.27.0-SNAPSHOT)

このクラスを使って、PTransform のステップのテストを書きます。

ステップ

まずはサンプルとして Foo というステップを用意します

@AllArgsConstructor
public class Foo extends PTransform<PCollection<String>, PCollection<FooDto>> {

  @Override
  public PCollection<FooDto> expand(PCollection<String> input) {                                                                                                              
    return input.apply(ParDo.of(new ParseJson()));
  }

  public static class ParseJson extends DoFn<String, FooDto> {
    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<FooDto> receiver)
        throws IOException {
      ObjectMapper objectMapper = new ObjectMapper();
      FooDto[] parsedData = objectMapper.readValue(element, FooDto[].class);
      for (FooDto d : parsedData) {
        receiver.output(d);
      }
    }
  }
}

PTransform で受け渡すデータはSerializableである必要があります。

したがってontputのデータ FooDto は Serializableなクラスとして定義します。

public class FooDto implements Serializable {                                                                                                                                 
  @JsonProperty("user_id")
  @NonNull
  private String userId;

  @JsonProperty("account_id")
  @NonNull
  private Integer accountId;
}

テストコード

上記の Foo のテストは下記のようになります。

class FooTest {
  @Test
  public void testFoo() throws Exception {
    TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);                                                                                                        

    PCollection<String> input =
        p.apply(
            Create.of(
                    "["
                        + "{\"user_id\": \"aaa\", \"account_id\": 123}"
                        + "]")
                .withCoder(StringUtf8Coder.of()));
    PCollection<FooDto> output = input.apply(new Foo());

    PAssert.that(output)
        .containsInAnyOrder(
            new FooDto("aaa", 123);

    p.run().waitUntilFinish();
  }
}

TestPipeline はテスト用のパイプラインを生成します。

ここで生成したパイプラインにテスト対象のステップをapplyすることで、このステップについてのみテストが可能になります。

PAssertPCollection の内容に関するアサーションを提供しています。

PAssert (Apache Beam 2.27.0-SNAPSHOT)

これらを使うことで、処理結果のoutputを簡単に確認することができます。

テストを作ることで、手元で動作確認をしながら開発を進めることが可能です。

まとめ

Apache Beamのパイプラインはステップを適切に分割しそれぞれのテストを書くことで格段に開発、メンテがしやすくなります。

ぜひテストも書きながらApache Beamのパイプラインを構築してみてください。

サービスを作る

雑多にまとめておく

MVPとかPocとか

InVisionやFigmaで作る。

それで、プロダクトのコアコンセプトのテスト、仮説検証できるところまでやる。

最初の文化

なぜこのサービスをやるのかを最初からチームで話す

www.ryuzee.com

blog.agile.esm.co.jp

すべてのタイミングで見直すようにする。

最初のチーム

メンバー

少人数でチームを作る。5〜6人くらいがMax。

人を増やして対応するのではなく、作業効率を上げること、やることの取捨選択によってチーム規模を維持する。

リリース後に余裕が出てきたら、改めてメンバー構成を考える。

QAメンバー

QAをメインで担当する人は最初からチームに入れる。

品質をどう作り上げていくかを最初からチームで考える。

テスト観点から仕様の漏れを突っ込む人

CI

最初からやる。 CIのパイプラインを作り、その後に最初のQAをする。

最初のリリース

なんでもいいからQAチームへ受け渡し確認してもらう。

Hello, World! だけ表示される何かだけでもよい

セキュリティ

最初からやる。 専門家がいるなら早めに相談。

評価

目標設定などはチーム全員でやる。 管理方法はなんでもいいけど、チーム全員でやるのが重要。

ユビキタス言語

scrapboxで管理が相性が良さそうと思いつつやってない。

ドメイン

ライブラリとか使わずにPOROとかPOJOみたいなもので作り上げる。

www2.slideshare.net

www2.slideshare.net

モノレポ

最初はまとめておく。

分割はあとで

DB

難しいが、DB分割は最初のほうでしっかりやっておく。

ツール群

お金使って便利なものを積極的に使う

IDEとか。

文化と暗黙知と属人性

属人性は排除していく。

ドキュメントで解決する部分としない部分がある。

解決しない部分として、根底に流れるコンセプトは一回書いただけでは浸透しないとかそういう部分。

これはちゃんと資料に起こしつつ、繰り返し事ある毎に言及し、文化として醸成していく。

暗黙知として知識ベースのものは、ちゃんとドキュメント化する

最初の一歩

仮説段階

開発者として考えることはドメインモデルの構築にどこまで時間を割けるか

  • プロトはあくまで PO デザイナーで InVisionやFigma でのmockだけでできる限りやる
  • その間に開発者は、プロトのではなく、サービス自体のドメインモデルを構築する

プロトを作る工数に意外と時間が取られる その時間をドメインモデル構築に当てたほうが軸が定まってサービス開発がうまくいくのでは?という感触がある

読み切れていない資料

ドメイン関連が多い

blog.masuqat.net

www.graat.co.jp

little-hands.hatenablog.com

speakerdeck.com

creators-note.chatwork.com

qiita.com

masuda220.hatenablog.com

blog.shibayu36.org

docker build時のキャッシュの判定

メモ書き

スタートはdispatchersから https://github.com/moby/moby/blob/cf0ce96eb129ebcc7d07f0f47a8683c16f228c7d/builder/dockerfile/dispatchers.go

ADD や COPY

https://github.com/moby/moby/blob/cf0ce96eb129ebcc7d07f0f47a8683c16f228c7d/builder/dockerfile/dispatchers.go#L94-L133

createCopyInstructionでキャッシュなどの判定も含めて行う

performCopyがファイルの変更を行う本体 https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/dockerfile/internals.go#L160

createCopyInstruction

https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/dockerfile/copy.go#L110-L132

getCopyInfosForSourcePathsからgetCopyInfoForSourcePathがよばれて、copyInfoを作成する。 https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/dockerfile/copy.go#L134-L151

getCopyInfoForSourcePathでもろもろ生成される。 https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/dockerfile/copy.go#L40-L45

キャッシュの判定で使われるハッシュ値はここで生成される

https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/remotecontext/lazycontext.go#L37-L85

https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/remotecontext/filehash.go

moby/versioning.go at 46cdcd206c56172b95ba5c77b827a722dab426c5 · moby/moby · GitHub

performCopy

キャッシュの判定には上記のcreateCopyInstructionで作られたcopyInstructionを元に行う

probeCacheが担う https://github.com/moby/moby/blob/46cdcd206c56172b95ba5c77b827a722dab426c5/builder/dockerfile/internals.go#L428-L437

でキャッシュがあればそれを使い、そうでなければ新しくlayerを作る。

"プログラミングの基礎" を読んだ

2ヶ月かけてプログラミングの基礎という本を読みました。

この本を読んだきっかけ

この本は2013年くらいに @kis におすすめされた本でした。

購入日は2013/05/29

f:id:Spring_MT:20201208095045p:plain

買ってすぐ読んだ記憶があるのですが、2章くらいまでで読むのをやめた気がします。

その後、数回本を開いた記憶があるのですが、どの場合も途中で読むのを止めていました。

で、2020年になってLeetCodeなどをやり始めたのですが、解法やアルゴリズムは理解できるがプログラムに落とし込めない(特に再帰が想像できない)という課題がでてきました。

問題に一対一対応させて覚えていくという力技よりは、もっと根本となるコードを書く能力を高めないと先がないなあと思って、色々本を読んでみました。

最初はSICPとかを眺めて見ましたが、理解が進まず挫折。。。

その後、SICPより読みやすくてなにかないかと探してみたところ、再度この本を取ることにしました。

きっかけもやっぱり @kis

nowokay.hatenablog.com

(タイトルは若干あおり気味ですが、、、)

このブログを読んで改めてプログラミングの基礎を手にとりました。

読み進め方

今までなぜ読みきれなかったかを振り返ってみると、単に読んでいるだけだと何が問題なのか実感が得にくく、だんだんモチベーションが下がるという感じだった気がしました。

そこで、今回は下記を実践しました。

  • 演習は全部やる
  • 時間を決めてその時間を集中して手を動かしながら本を読む

今は夜に時間が取れないので、業務開始前 9:00〜10:00 を確保し、その間に手を動かしながらやっていきました。

それで、大体2ヶ月で読み切りました。

この本から学んだこと

課題に思っていたプログラムに落とし込めない問題(特に再帰)に関しては、自分が何がわかっていなかったかがわかるようになりました。

  • 再帰的なデータ構造(型)を知り理解できた(気がする)
  • 再帰的なデータ構造(型)とプログラムの構造が対応できることを理解できた(気がする)
  • 構造に従わない再帰であっても、部分問題に分割すること、停止性を考えることでプログラムに落とし込みができるようになった

データ構造が重要なのはなんとなく思っていたのですが、この本を読むまではデータ構造をプログラムの構造に対応させる方法がわからなかったからだと気が付きました。

この本ではOCamlを使っていくのですが、パターンマッチの力をやっと理解できました。

この本の良かった点

  • 日本語がわかりやすい
  • 章の構成がよく、読み進めやすい
  • 説明が丁寧で理解しやすい
  • 単語の説明がすごく平易な表現になっていてわかりやすい(多相性 -> どのような型でもよい性質 などなど)
  • 演習の答えが全て揃っている

次に

同じく @kis に勧められていたプログラミング言語の基礎概念を読みつつ、再度プログラムを書く練習をしていきます。

LeetCodeはOCamlサポートしてないけど、他の言語でトライしてみる予定です。

雑感

もっと早く読んでおけばよかったと思う反面、課題がないまま読んでもまた読むのを止めると思うのでこのタイミングでよかったんだろうと思っています。

自分はCSの勉強を大学でしてこなかったで、こういう本があると自力でCSの勉強をする取っ掛かりとしてはとてもいいのではないかなと思いました。

2020/07から2020/10までのGKEのリリースノートからGKEの機能で気になる部分をまとめる

セキュリティ関連は書いてないです。

kubernetesのアップデート内容には触れていません。

July 2, 2020

https://cloud.google.com/kubernetes-engine/docs/release-notes#july_2_2020

NodeLocal DNSCacheがGA

Setting up NodeLocal DNSCache  |  Kubernetes Engine Documentation

既存のクラスタでもonにできる

名前解決が速くなるのonにしておくのがよいかと

GKE Node System ConfigurationがGA

https://cloud.google.com/sdk/gcloud/reference/beta/container/node-pools/create#--system-config-from-file

Nodeの設定を指定できる機能

Linux kernel parameters (sysctls) や kubelet configsもある程度設定できる

net.core.somaxconn とかも設定できる

Nodeを作成するときに、gcloud コマンド経由でファイル(JSON or YAML)を指定する

July 17, 2020

https://cloud.google.com/kubernetes-engine/docs/release-notes#july_17_2020

SSL Policiesがexternal Ingress and multi-cluster Ingressでβ 、Custom health checks が external, internal, and multi-cluster Ingressでβ(1.17.6-gke.11以上かな)

下記のリンクに全部まとまっている

https://cloud.google.com/kubernetes-engine/docs/how-to/ingress-features

SSLPolicyを予めGCP上で作成しておき、それをFrontendConfigで設定して、それをIngressの設定で指定する

https://cloud.google.com/load-balancing/docs/ssl-policies-concepts

apiVersion: networking.gke.io/v1beta1
kind: FrontendConfig
metadata:
  name: my-frontend-config
spec:
  sslPolicy: gke-ingress-ssl-policy # 

apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  annotations:
    networking.gke.io/v1beta1.FrontendConfig: "my-frontend-config"

health checkのほうはBackendConfigで設定する

BackendConfig CRDがGKE 1.16-gke.3以上でGA

IAP, timeouts, affinity, user-defined request headerなどの機能が追加されている

IAPもか

GKE バージョン 1.16-gke.3 以降を使用している場合は、cloud.google.com/backend-config アノテーションを使うようにすること。

GKE 1.17.6-gke.7以上のクラスタで、新規 Serviceをデプロイすると、NEGを使うContainer-native Ingress が デフォルトに

cloud.google.com/neg: '{"ingress": true}' のannotaionがデフォルトでつくので、自分で書く必要がなくなる

あくまで 新規 Serviceのみなので要注意

https://cloud.google.com/kubernetes-engine/docs/concepts/ingress#container-native_load_balancing

CMEKがGKEでGA

July 28, 2020 (R25)

https://cloud.google.com/kubernetes-engine/docs/release-notes#july_28_2020_r25

デフォルトのマシンタイプがE2に

https://cloud.google.com/compute/docs/machine-types?hl=ja#machine_types

Google Cloudに安価な汎用仮想マシン「E2ファミリー」を追加 | TechCrunch Japan

E2は継続利用割引がないので注意してください。

https://cloud.google.com/compute/docs/machine-types?hl=ja#machine_type_comparison

August 21, 2020

https://cloud.google.com/kubernetes-engine/docs/release-notes#august_21_2020

Internal load balancer Serviceが GKE 1.17.9-gke.600以上でGA

https://cloud.google.com/kubernetes-engine/docs/how-to/internal-load-balancing

type: LoadBalancer のServiceのannotationで cloud.google.com/load-balancer-type: "Internal" で使えるようになる

この internal load balancer ServiceではGlobal access と configurable subnets が使える

https://cloud.google.com/kubernetes-engine/docs/how-to/internal-load-balancing#global_access

https://cloud.google.com/kubernetes-engine/docs/how-to/internal-load-balancing#lb_subnet

リージョンまたいでinternal load balancer Serviceが使える

GKE versions 1.17.9-gke.600以上で Dataplane V2がβ

https://cloud.google.com/blog/products/containers-kubernetes/bringing-ebpf-and-cilium-to-google-kubernetes-engine

Dataplane V2があれば、Network policy loggingも使える

https://cloud.google.com/kubernetes-engine/docs/how-to/network-policy-logging

RFC 1918以外のプライベートIPアドレスが利用可能に

https://cloud.google.com/kubernetes-engine/docs/how-to/alias-ips#enable_reserved_ip_ranges

使える範囲は下記を参照

https://cloud.google.com/vpc/docs/vpc#valid-ranges

August 28, 2020

https://cloud.google.com/kubernetes-engine/docs/release-notes#august_28_2020

どのリージョンからもprivate clusterのマスター( Contol plane )へアクセス可能にする

デフォルトは同じリージョンからしかアクセスできない

https://cloud.google.com/kubernetes-engine/docs/how-to/private-clusters#cp-global-access

September 8, 2020

https://cloud.google.com/kubernetes-engine/docs/release-notes#september_8_2020

Error状態のclusterを自動的に削除する

Errorの定義は下記を参照

https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1/projects.locations.clusters#status

TaintBasedEvictionsがGKEの 1.18 でGA

あれ、このタイミングなのか

Kubernetes 1.13: SIG Scheduling の変更内容 - チェシャ猫の消滅定理

September 25, 2020 (R31)

https://cloud.google.com/kubernetes-engine/docs/release-notes#september_25_2020_r31

Node Auto-Provisioningで CMEK、Secure Boot and Integrity Monitoring、Boot disk type and sizeをデフォルトでセットしてくれるようになった

今まではセットしてくれてなかったってことか。。

October 02, 2020 (R32)

https://cloud.google.com/kubernetes-engine/docs/release-notes#october_02_2020_r32

GKE 1.18以上でオートスケールのプロファイルのロジックが optimize-utilization になる

クラスタをより積極的にスケールダウンしてくれるようになる。

リソースの使用率を最大化するぽい。

https://cloud.google.com/kubernetes-engine/docs/concepts/cluster-autoscaler#autoscaling_profiles

macOSでKarabiner-Elementsが認識されなくなったら試すこと

  1. リカバリモードで立ち上げる
    • Mac の電源を入れた直後に、「command (⌘)」と「R」の 2 つのキーを押し続ける
  2. ユーティリティは選択せずにメーニューバーからユーティリティ -> ターミナルを選択
    • ターミナルが立ち上がるはず
  3. spctl kext-consent add G43BCU2T37 を実行する