Java로 ETL 수행 (Sample Code)

Dataflow for Java
- Dataflow는 Serverless 로 제공되며 통합 스트리밍 및 일괄 데이터 처리를 지원하고 있습니다. Apache-Beam 모델을 활용해 코드 레벨에서 데이터 처리를 수행할 수 있습니다.
- 해당 세미나에서는 Dataflow를 Java로 코드 구성하고 실행해보는 과정을 살펴보겠습니다.
Prerequisites
- JDK
- Apache Maven
- IntelliJ IDEA Community Edition
- Spring Initializer
Local Machine
Step1
아래 명령어를 통해 java, mvn 버전 확인을 진행합니다.
- $ java --version
- $ mvn --version
- java version 확인 시

- mvn version 확인 시

Model
Apache Beam Model 활용한 ETL 작업을 수행합니다. 금번 세미나 에서는 Java를 다루고 다음번에는 Go를 활용한 ETL 작업도 진행해보겠습니다.
지원 Language : Java, Python, Go

PipeLine
아래에 보면 가장 기본적인 PipeLine을 보실 수 있는데 원천 데이터 소스에서 데이터를 읽어서 transform 작업을 수행하는데 이것을 일반적으로 Pcollection이라고 부릅니다. Transform한 작업의 결과물 즉, Pcollection을 Output Bucket에 작성하게 됩니다.

파이프라인 디자인을 어떻게 하면 더 좋을까? 관심 있으신분은 하기 링크를 클릭하여 읽어보세요!
https://beam.apache.org/documentation/pipelines/design-your-pipeline/
각 Feature에 대한 사용법 및 특징 소개
- pipeline 을 생성합니다.
// Start by defining the options for the pipeline. PipelineOptions options = PipelineOptionsFactory.create(); // Then create the pipeline. Pipeline p = Pipeline.create(options);
- 외부 데이터를 읽어오는데 Beam's Source API 를 활용할 수 있습니다. 데이터 읽어와서
public static void main(String[] args) { // Create the pipeline. PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); // Create the PCollection 'lines' by applying a 'Read' transform. PCollection<String> lines = p.apply( }
- 읽어온 데이터를 기반으로 Transform 을 수행하게 됩니다.
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform])
- Transform 한 데이터를 Pipeline I/O 를 활용해 외부 storage로 보내게 됩니다.
//Read PCollection<String> lines = p.apply(TextIO.read().from("gs://some/inputData.txt")); //Output output.apply(TextIO.write().to("gs://some/outputData"));
Step2
- Apache Beam 및 Java SDK를 사용하여 워크로드를 아래와 같이 구성할 수 있습니다. 가장 기본이 되는 word count 예제입니다.
- 예를들어
test:test:test:testing in progress:testing in progress:testing completed:done
와 같은 텍스트가 존재한다고 할 때 단어의 수를 카운트하여 처리할 수 있습니다.

:
를 기준으로 텍스트 파일을 분할한 다음 단어를 추출 하고 계산하여 결과를 지정한 후에 output.txt 파일로 출력하게 됩니다.
Spring Boot 시작하기

Maven pom.xml
- 실습 버전에서는 2.23.0 을 사용하였습니다.
<properties> <java.version>11</java.version> <beam.version>2.23.0</beam.version> </properties> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> <scope>runtime</scope> </dependency>
Main
package com.bachinalabs.beamdemo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class BeamdemoApplication { public static void main(String[] args) { SpringApplication.run(BeamdemoApplication.class, args); } }
SplitWords
- input과 output collections 모두
PCollection<String>
형태가 됩니다.
package com.bachinalabs.beamdemo; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; public class SplitWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> expand(PCollection<String> line) { // Convert line of text into individual lines PCollection<String> lines = line.apply( ParDo.of(new SplitWordsFn())); return lines; } }
SplitWordsFn
package com.bachinalabs.beamdemo; import org.apache.beam.sdk.transforms.DoFn; public class SplitWordsFn extends DoFn<String, String> { public static final String SPLIT_PATTERN = ":"; @ProcessElement public void processElement(ProcessContext c) { for(String word: c.element().split(SPLIT_PATTERN)) { if (!word.isEmpty()) { c.output(word); } } } }
CountWords
- KV<K,V> 형태로도 값을 담을 수 있는데 K=key, V=value 를 의미합니다.
package com.bachinalabs.beamdemo; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; public class CountWords extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> { @Override public PCollection<KV<String, Long>> expand(PCollection<String> lines) { // Convert text into individual words PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the words PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement()); return wordCounts; } }
ExtractWordsFn
- p{L}, 유니코드 프로퍼티 즉, 언어를 찾는 정규식 패턴입니다. ^=시작지점, +=1개이상, \=특수문자매칭
package com.bachinalabs.beamdemo; import org.apache.beam.sdk.transforms.DoFn; public class ExtractWordsFn extends DoFn<String, String> { public static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; @ProcessElement public void processElement(ProcessContext c) { for(String word: c.element().split(TOKENIZER_PATTERN)) { if (!word.isEmpty()) { c.output(word); } } } }
WordCountOptions
- input, output 위치를 지정하게 됩니다.
package com.bachinalabs.beamdemo; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; public interface WordCountOptions extends PipelineOptions { @Description("Path to the input file") @Default.String("./src/main/resources/input.txt") String getInputFile(); void setInputFile(String value); @Description("Path to the output file") @Default.String("./src/main/resources/output.txt") String getOutputFile(); void setOutputFile(String value); }
WordCountRunner
- 해당 class에서 ETL 작업을 수행하는 파이프라인을 수행하게 됩니다.
package com.bachinalabs.beamdemo; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptors; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class WordCountRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(WordCountOptions.class); runWordCount(options); } static void runWordCount(WordCountOptions options) throws InterruptedException { Pipeline p = Pipeline.create(options); p.apply("Reading Text", TextIO.read().from(options.getInputFile())) .apply(new SplitWords()) .apply(new CountWords()) .apply("FormatResults", MapElements .into(TypeDescriptors.strings()) .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply("WriteCounts", TextIO.write().to(options.getOutputFile())); p.run().waitUntilFinish(); } }


'GCP' 카테고리의 다른 글
AWS redshift 와 GCP bigQuery 차이 (0) | 2021.05.27 |
---|---|
GCP41 :: 구글 클라우드 플랫폼 입문 (0) | 2020.10.07 |
GCP40 :: 베스천호스트 연결 (0) | 2020.10.02 |
GCP39 :: Media(번역) - BigQuery (0) | 2020.10.02 |
GCP38 :: OS로그인 (0) | 2020.09.28 |
댓글
이 글 공유하기
다른 글
-
AWS redshift 와 GCP bigQuery 차이
AWS redshift 와 GCP bigQuery 차이
2021.05.27 -
GCP41 :: 구글 클라우드 플랫폼 입문
GCP41 :: 구글 클라우드 플랫폼 입문
2020.10.07 -
GCP40 :: 베스천호스트 연결
GCP40 :: 베스천호스트 연결
2020.10.02 -
GCP39 :: Media(번역) - BigQuery
GCP39 :: Media(번역) - BigQuery
2020.10.02ㅡ. 스탠다드SQL 빅쿼리는 2011 표준화된 안시 SQL 에 따르는 Standard SQL을 더 선호합니다. 빅쿼리를 사용할때 자동적으로 생성하고 스케쥴해주고 쿼리잡을 실행해줍니다. 빅쿼리는 두가지 모드로 실행됩니다. :: interactive, batch Interactive(on-demand) : 가능한한 많이 실행됩니다. 제한이 따릅니다. Batch queries : 빅쿼리 공유 리소스 풀에서 유후 자원을 사용할 수 있도록 즉시 배치 쿼리가 대기열에 저장되고 시작됩니다. 단 몇분만에 실행됩니다. interactive 모드와 다르게 동시 요금 제한에 따른 제약이 없습니다. ㅡ. 빅쿼리 테이블 타입 모든 비구커리 테이블은 컬럼 네임을 가지고 있는 스키마 형태로 정의도비니다. 컬럼 네임에는 데이터 타입…
댓글을 사용할 수 없습니다.