쿠쿠쿠쿠...

Dataflow for Java

  • Dataflow는 Serverless 로 제공되며 통합 스트리밍 및 일괄 데이터 처리를 지원하고 있습니다. Apache-Beam 모델을 활용해 코드 레벨에서 데이터 처리를 수행할 수 있습니다.
  • 해당 세미나에서는 Dataflow를 Java로 코드 구성하고 실행해보는 과정을 살펴보겠습니다.

Prerequisites

  • JDK
  • Apache Maven
  • IntelliJ IDEA Community Edition
  • Spring Initializer

Local Machine

Step1

아래 명령어를 통해 java, mvn 버전 확인을 진행합니다.

  • $ java --version
  • $ mvn --version
  • java version 확인 시
  • mvn version 확인 시

Model
Apache Beam Model 활용한 ETL 작업을 수행합니다. 금번 세미나 에서는 Java를 다루고 다음번에는 Go를 활용한 ETL 작업도 진행해보겠습니다.
지원 Language : Java, Python, Go

PipeLine
아래에 보면 가장 기본적인 PipeLine을 보실 수 있는데 원천 데이터 소스에서 데이터를 읽어서 transform 작업을 수행하는데 이것을 일반적으로 Pcollection이라고 부릅니다. Transform한 작업의 결과물 즉, Pcollection을 Output Bucket에 작성하게 됩니다.

 


파이프라인 디자인을 어떻게 하면 더 좋을까? 관심 있으신분은 하기 링크를 클릭하여 읽어보세요!
https://beam.apache.org/documentation/pipelines/design-your-pipeline/

각 Feature에 대한 사용법 및 특징 소개

  1. pipeline 을 생성합니다.
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
// Then create the pipeline.
Pipeline p = Pipeline.create(options);
  1. 외부 데이터를 읽어오는데 Beam's Source API 를 활용할 수 있습니다. 데이터 읽어와서
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(
}
  1. 읽어온 데이터를 기반으로 Transform 을 수행하게 됩니다.
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
  1. Transform 한 데이터를 Pipeline I/O 를 활용해 외부 storage로 보내게 됩니다.
//Read
PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));
//Output
output.apply(TextIO.write().to("gs://some/outputData"));

Step2

  • Apache Beam 및 Java SDK를 사용하여 워크로드를 아래와 같이 구성할 수 있습니다. 가장 기본이 되는 word count 예제입니다.
  • 예를들어 test:test:test:testing in progress:testing in progress:testing completed:done 와 같은 텍스트가 존재한다고 할 때 단어의 수를 카운트하여 처리할 수 있습니다.
  • : 를 기준으로 텍스트 파일을 분할한 다음 단어를 추출 하고 계산하여 결과를 지정한 후에 output.txt 파일로 출력하게 됩니다.

Spring Boot 시작하기

Maven pom.xml

  • 실습 버전에서는 2.23.0 을 사용하였습니다.
<properties>
<java.version>11</java.version>
<beam.version>2.23.0</beam.version>
</properties>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>

Main

package com.bachinalabs.beamdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BeamdemoApplication {
public static void main(String[] args) {
SpringApplication.run(BeamdemoApplication.class, args);
}
}

SplitWords

  • input과 output collections 모두 PCollection<String> 형태가 됩니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
public class SplitWords extends PTransform<PCollection<String>, PCollection<String>> {
@Override
public PCollection<String> expand(PCollection<String> line) {
// Convert line of text into individual lines
PCollection<String> lines = line.apply(
ParDo.of(new SplitWordsFn()));
return lines;
}
}

SplitWordsFn

package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.DoFn;
public class SplitWordsFn extends DoFn<String, String> {
public static final String SPLIT_PATTERN = ":";
@ProcessElement
public void processElement(ProcessContext c) {
for(String word: c.element().split(SPLIT_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

CountWords

  • KV<K,V> 형태로도 값을 담을 수 있는데 K=key, V=value 를 의미합니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert text into individual words
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the words
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.perElement());
return wordCounts;
}
}

ExtractWordsFn

  • p{L}, 유니코드 프로퍼티 즉, 언어를 찾는 정규식 패턴입니다. ^=시작지점, +=1개이상, \=특수문자매칭
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.DoFn;
public class ExtractWordsFn extends DoFn<String, String> {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for(String word: c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}

WordCountOptions

  • input, output 위치를 지정하게 됩니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface WordCountOptions extends PipelineOptions {
@Description("Path to the input file")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);
@Description("Path to the output file")
@Default.String("./src/main/resources/output.txt")
String getOutputFile();
void setOutputFile(String value);
}

WordCountRunner

  • 해당 class에서 ETL 작업을 수행하는 파이프라인을 수행하게 됩니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class WordCountRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(WordCountOptions.class);
runWordCount(options);
}
static void runWordCount(WordCountOptions options) throws InterruptedException {
Pipeline p = Pipeline.create(options);
p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
.apply(new SplitWords())
.apply(new CountWords())
.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
.apply("WriteCounts", TextIO.write().to(options.getOutputFile()));
p.run().waitUntilFinish();
}
}

 

'GCP' 카테고리의 다른 글

AWS redshift 와 GCP bigQuery 차이  (0) 2021.05.27
GCP41 :: 구글 클라우드 플랫폼 입문  (0) 2020.10.07
GCP40 :: 베스천호스트 연결  (0) 2020.10.02
GCP39 :: Media(번역) - BigQuery  (0) 2020.10.02
GCP38 :: OS로그인  (0) 2020.09.28