Java로 ETL 수행 (Sample Code)
Dataflow for Java
- Dataflow는 Serverless 로 제공되며 통합 스트리밍 및 일괄 데이터 처리를 지원하고 있습니다. Apache-Beam 모델을 활용해 코드 레벨에서 데이터 처리를 수행할 수 있습니다.
- 해당 세미나에서는 Dataflow를 Java로 코드 구성하고 실행해보는 과정을 살펴보겠습니다.
Prerequisites
- JDK
- Apache Maven
- IntelliJ IDEA Community Edition
- Spring Initializer
Local Machine
Step1
아래 명령어를 통해 java, mvn 버전 확인을 진행합니다.
- $ java --version
- $ mvn --version
- java version 확인 시
- mvn version 확인 시
Model
Apache Beam Model 활용한 ETL 작업을 수행합니다. 금번 세미나 에서는 Java를 다루고 다음번에는 Go를 활용한 ETL 작업도 진행해보겠습니다.
지원 Language : Java, Python, Go
PipeLine
아래에 보면 가장 기본적인 PipeLine을 보실 수 있는데 원천 데이터 소스에서 데이터를 읽어서 transform 작업을 수행하는데 이것을 일반적으로 Pcollection이라고 부릅니다. Transform한 작업의 결과물 즉, Pcollection을 Output Bucket에 작성하게 됩니다.
파이프라인 디자인을 어떻게 하면 더 좋을까? 관심 있으신분은 하기 링크를 클릭하여 읽어보세요!
https://beam.apache.org/documentation/pipelines/design-your-pipeline/
각 Feature에 대한 사용법 및 특징 소개
- pipeline 을 생성합니다.
// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();
// Then create the pipeline.
Pipeline p = Pipeline.create(options);
- 외부 데이터를 읽어오는데 Beam's Source API 를 활용할 수 있습니다. 데이터 읽어와서
public static void main(String[] args) {
// Create the pipeline.
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).create();
Pipeline p = Pipeline.create(options);
// Create the PCollection 'lines' by applying a 'Read' transform.
PCollection<String> lines = p.apply(
}
- 읽어온 데이터를 기반으로 Transform 을 수행하게 됩니다.
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform])
.apply([Second Transform])
.apply([Third Transform])
- Transform 한 데이터를 Pipeline I/O 를 활용해 외부 storage로 보내게 됩니다.
//Read
PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt"));
//Output
output.apply(TextIO.write().to("gs://some/outputData"));
Step2
- Apache Beam 및 Java SDK를 사용하여 워크로드를 아래와 같이 구성할 수 있습니다. 가장 기본이 되는 word count 예제입니다.
- 예를들어
test:test:test:testing in progress:testing in progress:testing completed:done
와 같은 텍스트가 존재한다고 할 때 단어의 수를 카운트하여 처리할 수 있습니다.
:
를 기준으로 텍스트 파일을 분할한 다음 단어를 추출 하고 계산하여 결과를 지정한 후에 output.txt 파일로 출력하게 됩니다.
Spring Boot 시작하기
Maven pom.xml
- 실습 버전에서는 2.23.0 을 사용하였습니다.
<properties>
<java.version>11</java.version>
<beam.version>2.23.0</beam.version>
</properties>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
Main
package com.bachinalabs.beamdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BeamdemoApplication {
public static void main(String[] args) {
SpringApplication.run(BeamdemoApplication.class, args);
}
}
SplitWords
- input과 output collections 모두
PCollection<String>
형태가 됩니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
public class SplitWords extends PTransform<PCollection<String>, PCollection<String>> {
@Override
public PCollection<String> expand(PCollection<String> line) {
// Convert line of text into individual lines
PCollection<String> lines = line.apply(
ParDo.of(new SplitWordsFn()));
return lines;
}
}
SplitWordsFn
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.DoFn;
public class SplitWordsFn extends DoFn<String, String> {
public static final String SPLIT_PATTERN = ":";
@ProcessElement
public void processElement(ProcessContext c) {
for(String word: c.element().split(SPLIT_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
CountWords
- KV<K,V> 형태로도 값을 담을 수 있는데 K=key, V=value 를 의미합니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert text into individual words
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the words
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.perElement());
return wordCounts;
}
}
ExtractWordsFn
- p{L}, 유니코드 프로퍼티 즉, 언어를 찾는 정규식 패턴입니다. ^=시작지점, +=1개이상, \=특수문자매칭
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.transforms.DoFn;
public class ExtractWordsFn extends DoFn<String, String> {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for(String word: c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
WordCountOptions
- input, output 위치를 지정하게 됩니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
public interface WordCountOptions extends PipelineOptions {
@Description("Path to the input file")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);
@Description("Path to the output file")
@Default.String("./src/main/resources/output.txt")
String getOutputFile();
void setOutputFile(String value);
}
WordCountRunner
- 해당 class에서 ETL 작업을 수행하는 파이프라인을 수행하게 됩니다.
package com.bachinalabs.beamdemo;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class WordCountRunner implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(WordCountOptions.class);
runWordCount(options);
}
static void runWordCount(WordCountOptions options) throws InterruptedException {
Pipeline p = Pipeline.create(options);
p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
.apply(new SplitWords())
.apply(new CountWords())
.apply("FormatResults", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
.apply("WriteCounts", TextIO.write().to(options.getOutputFile()));
p.run().waitUntilFinish();
}
}
'GCP' 카테고리의 다른 글
AWS redshift 와 GCP bigQuery 차이 (0) | 2021.05.27 |
---|---|
GCP41 :: 구글 클라우드 플랫폼 입문 (0) | 2020.10.07 |
GCP40 :: 베스천호스트 연결 (0) | 2020.10.02 |
GCP39 :: Media(번역) - BigQuery (0) | 2020.10.02 |
GCP38 :: OS로그인 (0) | 2020.09.28 |
댓글
이 글 공유하기
다른 글
-
AWS redshift 와 GCP bigQuery 차이
AWS redshift 와 GCP bigQuery 차이
2021.05.27 -
GCP41 :: 구글 클라우드 플랫폼 입문
GCP41 :: 구글 클라우드 플랫폼 입문
2020.10.07 -
GCP40 :: 베스천호스트 연결
GCP40 :: 베스천호스트 연결
2020.10.02 -
GCP39 :: Media(번역) - BigQuery
GCP39 :: Media(번역) - BigQuery
2020.10.02