쿠쿠쿠쿠...

Dataflow for Java

  • Dataflow는 Serverless 로 제공되며 통합 스트리밍 및 일괄 데이터 처리를 지원하고 있습니다. Apache-Beam 모델을 활용해 코드 레벨에서 데이터 처리를 수행할 수 있습니다.
  • 해당 세미나에서는 Dataflow를 Java로 코드 구성하고 실행해보는 과정을 살펴보겠습니다.

Prerequisites

  • JDK
  • Apache Maven
  • IntelliJ IDEA Community Edition
  • Spring Initializer

Local Machine

Step1

아래 명령어를 통해 java, mvn 버전 확인을 진행합니다.

  • $ java --version
  • $ mvn --version
  • java version 확인 시

  • mvn version 확인 시

Model
Apache Beam Model 활용한 ETL 작업을 수행합니다. 금번 세미나 에서는 Java를 다루고 다음번에는 Go를 활용한 ETL 작업도 진행해보겠습니다.
지원 Language : Java, Python, Go

PipeLine
아래에 보면 가장 기본적인 PipeLine을 보실 수 있는데 원천 데이터 소스에서 데이터를 읽어서 transform 작업을 수행하는데 이것을 일반적으로 Pcollection이라고 부릅니다. Transform한 작업의 결과물 즉, Pcollection을 Output Bucket에 작성하게 됩니다.

 


파이프라인 디자인을 어떻게 하면 더 좋을까? 관심 있으신분은 하기 링크를 클릭하여 읽어보세요!
https://beam.apache.org/documentation/pipelines/design-your-pipeline/

각 Feature에 대한 사용법 및 특징 소개

  1. pipeline 을 생성합니다.
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);
  1. 외부 데이터를 읽어오는데 Beam's Source API 를 활용할 수 있습니다. 데이터 읽어와서
public static void main(String[] args) {
  // Create the pipeline.
  PipelineOptions options =
      PipelineOptionsFactory.fromArgs(args).create();
  Pipeline p = Pipeline.create(options);

  // Create the PCollection 'lines' by applying a 'Read' transform.
  PCollection<String> lines = p.apply(
}
  1. 읽어온 데이터를 기반으로 Transform 을 수행하게 됩니다.
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
  1. Transform 한 데이터를 Pipeline I/O 를 활용해 외부 storage로 보내게 됩니다.
//Read
PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));

//Output
output.apply(TextIO.write().to("gs://some/outputData"));

Step2

  • Apache Beam 및 Java SDK를 사용하여 워크로드를 아래와 같이 구성할 수 있습니다. 가장 기본이 되는 word count 예제입니다.
  • 예를들어 test:test:test:testing in progress:testing in progress:testing completed:done 와 같은 텍스트가 존재한다고 할 때 단어의 수를 카운트하여 처리할 수 있습니다.

  • : 를 기준으로 텍스트 파일을 분할한 다음 단어를 추출 하고 계산하여 결과를 지정한 후에 output.txt 파일로 출력하게 됩니다.

Spring Boot 시작하기

Maven pom.xml

  • 실습 버전에서는 2.23.0 을 사용하였습니다.
<properties>
  <java.version>11</java.version>
  <beam.version>2.23.0</beam.version>
</properties>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>${beam.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-direct-java</artifactId>
  <version>${beam.version}</version>
  <scope>runtime</scope>
</dependency>

Main

package com.bachinalabs.beamdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BeamdemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(BeamdemoApplication.class, args);
    }

}

SplitWords

  • input과 output collections 모두 PCollection<String> 형태가 됩니다.
package com.bachinalabs.beamdemo;

import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

public class SplitWords extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> expand(PCollection<String> line) {

        // Convert line of text into individual lines
        PCollection<String> lines = line.apply(
                ParDo.of(new SplitWordsFn()));

        return lines;
    }
}

SplitWordsFn

package com.bachinalabs.beamdemo;

import org.apache.beam.sdk.transforms.DoFn;

public class SplitWordsFn extends DoFn<String, String> {

    public static final String SPLIT_PATTERN = ":";

    @ProcessElement
    public void processElement(ProcessContext c) {
        for(String word: c.element().split(SPLIT_PATTERN)) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}

CountWords

  • KV<K,V> 형태로도 값을 담을 수 있는데 K=key, V=value 를 의미합니다.
package com.bachinalabs.beamdemo;

import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {


    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

        // Convert text into individual words
        PCollection<String> words = lines.apply(
                ParDo.of(new ExtractWordsFn()));

        // Count the words
        PCollection<KV<String, Long>> wordCounts =
                words.apply(Count.perElement());

        return wordCounts;
    }
}

ExtractWordsFn

  • p{L}, 유니코드 프로퍼티 즉, 언어를 찾는 정규식 패턴입니다. ^=시작지점, +=1개이상, \=특수문자매칭
package com.bachinalabs.beamdemo;

import org.apache.beam.sdk.transforms.DoFn;

public class ExtractWordsFn extends DoFn<String, String> {

    public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

    @ProcessElement
    public void processElement(ProcessContext c) {
        for(String word: c.element().split(TOKENIZER_PATTERN)) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}

WordCountOptions

  • input, output 위치를 지정하게 됩니다.
package com.bachinalabs.beamdemo;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

public interface WordCountOptions extends PipelineOptions {

    @Description("Path to the input file")
    @Default.String("./src/main/resources/input.txt")
    String getInputFile();
    void setInputFile(String value);

    @Description("Path to the output file")
    @Default.String("./src/main/resources/output.txt")
    String getOutputFile();
    void setOutputFile(String value);
}

WordCountRunner

  • 해당 class에서 ETL 작업을 수행하는 파이프라인을 수행하게 됩니다.
package com.bachinalabs.beamdemo;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class WordCountRunner implements CommandLineRunner {

    @Override
    public void run(String... args) throws Exception {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(WordCountOptions.class);
        runWordCount(options);
    }

    static void runWordCount(WordCountOptions options) throws InterruptedException {

        Pipeline p = Pipeline.create(options);

        p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
                .apply(new SplitWords())
                .apply(new CountWords())
                .apply("FormatResults", MapElements
                        .into(TypeDescriptors.strings())
                        .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
                .apply("WriteCounts", TextIO.write().to(options.getOutputFile()));

        p.run().waitUntilFinish();
    }
}

 

'GCP' 카테고리의 다른 글

AWS redshift 와 GCP bigQuery 차이  (0) 2021.05.27
GCP41 :: 구글 클라우드 플랫폼 입문  (0) 2020.10.07
GCP40 :: 베스천호스트 연결  (0) 2020.10.02
GCP39 :: Media(번역) - BigQuery  (0) 2020.10.02
GCP38 :: OS로그인  (0) 2020.09.28