기본적으로 

Input -> PCollection -> PCollection -> Output (workflow) 를 가집니다. 

 

Google Cloud Platform Console에 들어가보면 Stream, Batch 각각에 대해 다양한 템플릿을 제공하고 있습니다. 프로젝트 방향에 맞게 데이터파이프라인을 구축하는데 많은 도움이 됩니다. 

 

임시 위치는 구글 문서를 참고하면 스테이징된 파이프라인 작업 라고 나와있다. 실제 파이프라인 작업과 유사한 상태를 의미한다고 생각합니다. 

 

ㅡ. Custom Template 

구글 문서를 참고해보면 런타임 매개변수를 사용해서 파이프라인 내에서 실행되는 함수(e.g, DoFn) 전달해서 템플릿 기반의 파이프라인을 실행 및 맞춤 설정할 수 있다고 소개 되어 있습니다. 

즉, 런타임 매개변수를 지원하도록 파이프라인 코드를 수정해야 합니다. 

방법은 3가지가 소개되어 있는데

1. 파이프라인 옵션에 ValueProvider를 사용합니다. 

2. 파이프라인 매개변수화 경우에는 런타임 매개변수를 허용하는 I/O 메서드를 호출합니다.

3. 런타임 매개변수를 허용하는 DoFn 객체를 사용합니다. 

 

인터페이스로 제공되고 있는건 3가지 인데

RuntimeValueProvider, StaticValueProvider, NestedValueProvider 가 있습니다. 

 

Template을 만들건 아니기 때문에 넘어가겠습니다.  

*ref : https://cloud.google.com/dataflow/docs/guides/templates/creating-templates?hl=ko#python_1 

 

ㅡ. Dataflow(Apache Beam) 

Transform 해주는 함수들에 대해서 살펴보겠습니다. 

 

Pardo

데이터 변환 및 하나의 이상의 PCollection을 반환합니다. 

필터링, 다른 자료형으로 변환, 데이터세트에서 각 요소 추출, 데이터세트의 개별 연산을 수행합니다. 

 

GroupByKey

Key-Value 형태로 되어있어서 Collection을 처리하기 위한 Transform 입니다. Key를 기준으로 Value들을 하나로 묶습니다. 

e.g) ('cat', 1), ... ('and',6) -> [('cat'), [1,6,7] ... )]

 

 

Combine

 

GoGroupByKey

GoGroupByKey 마찬가지로 Key를 기준으로 값을 합쳐주고 동일한 Key의 데이터셋이 여러개 존재할때 사용하는것이 좋습니다. 

 

Flatten 

여러 종류의 PCollection 객체를 하나의 PCollection으로 합칩니다. 

 

Partition

큰 PCollection을 고정된 숫자의 PCollection으로 분할할때 사용합니다. 

Pipeline I/O, Windowing, Trigger 

함수들이 있습니다.

 

ㅡ. Apache Beam

 

1
2
3
4
5
6
pipeline_options = PipelineOptions(
    project='fluid-crane-284202',
    runner='dataflow',
    temp_location='gs://dataflow-sample-ig/temp'
)
 
cs

asd

Modifying a pipeline to use stream processing

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 lines = p | 'read' >> ReadFromText(known_args.input)
  ...
 
  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(six.text_type))
            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
            | 'group' >> beam.GroupByKey()
            | 'count' >> beam.Map(count_ones))
  ...
 
  output = counts | 'format' >> beam.Map(format_result)
 
  # Write the output using a "Write" transform that has side effects.
  output | 'write' >> WriteToText(known_args.output)
cs

 

GCP에서 Cloud Dataflow는 병렬로 실행시키는 일종의 흐름을 제어하는 서비스이다. 어떤 형태로 병렬 처리할건지에 대한 구조를 PipeLine으로 잡아 놓는것이 Apache Beam 형태이다.

 

Apache Beam의 구조 : 조대협님 블로그에서 발췌

Beam 상태의 PCollection 라는 개체를 인자로 받아서 PTransform 이라는 모듈이 변형하고 Output으로 PCollection을 내보내는 구조가 됩니다. 

 

wordcount를 동작시켜보면 Apache Beam이 사용하는 함수의 순서는 ReadFromText -> FlatMap -> combiners.Count.PerElement -> Map -> WriteToText

 

코드를 작성해보겠습니다. python 

 

1
2
3
4
5
from __future__ import print_function
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import logging
cs

가장 먼저 import 문을 추가하였습니다.

 

1
2
3
4
5
pipeline_options = PipelineOptions(
   project='[project-id]',
    runner='dataflow',
   temp_location='gs://[bucket-name]/[folder-name]'
)
cs

환경설정 셋팅을 코드로 작성합니다. 

1
with beam.Pipeline(options=pipeline_options) as p:
cs