GCP23 :: dataflow 코드작성하기

위와 같은 데이터가 있습니다. react 안에서 feature 별 횟수를 묶을 생각입니다.
from apache_beam.options.pipeline_options import PipelineOptions import apache_beam as beam
파이프 라인 환경설정을 구성하기 위해서 PipelineOptions 를 사용합니다.
pipeline_options = PipelineOptions( project=[project-id], runner='dataflow', temp_location=[bucket-location] )
기본 설정들을 셋팅 합니다.
python 으로 코드를 작성하였습니다. main function 을 pcollection_dofn_methods_basic 으로 만들었습니다.
def pcollection_dofn_methods_basic(test=None): import apache_beam as beam #Cloud 상에서 apache_beam을 실행시키려면 다음 코드를 입력
파이프라인 작성을 위해서
with beam.Pipeline(options=pipeline_options) as pipeline:
그리고 bucket으로 부터 extract 하는 코드를 작성하였습니다.
with beam.Pipeline(options=pipeline_options) as pipeline: p1 = ( pipeline | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv", skip_header_lines=1) #skip_header_lines 는 첫번째 row는 무시해라 )
Local 환경에서 테스트 하기 위해서 임시로 데이터를 생성해서 Count 하는 코드를 작성했습니다
f-format을 사용해서 원하는 format을 만들어주고, CombinerPerKey로 key값을 기준으로 value들의 sum을 구해주는 코드입니다.
with beam.Pipeline() as pipeline: p1 = ( pipeline # | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv", # skip_header_lines=1) | beam.Create([ 'react, Material, 2', 'react, LearningCurve, 9', 'react, LearningCurve, 4', 'react, Material, 2', 'react, Awareness, 2', 'react, Awareness, 6', ]) | beam.ParDo(generate_key) | beam.CombinePerKey(sum) | beam.FlatMap(make_list) ) p2 = ( p1 | 'Loading' >> beam.ParDo(make_tablerow) | beam.io.WriteToBigQuery( '[project-id]:[bigQuery-dataset].[bigQuery-table]', schema='category:STRING, feature:STRING, score:STRING', #넣을 테이블의 컬럼에 맞춰야합니다. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) )
ㅡ. generate_key
def generate_key(element, delimiter=',') : splited = element.split(delimiter) return [(f"{splited[0]},{splited[1]}", int(splited[2]))]
ㅡ. CombinePerKey
key값들을 기준으로 value들의 sum을 구해줍니다.
ㅡ. make_list
def make_list(element): element_list = list(element) yield element_list
tuple 형식의 데이터들을 list 로 변경했습니다. tuple 자체가 immutable 하기 때문에 데이터 수정을 위해서 list 변형하는 함수를 추가하였습니다.
ㅡ. make_tablerow
before) 이전에 Apache-beam에서 제공해주는 함수를 사용했을 때는 단순히 아래와 같이 하면 됬지만
def make_tablerow(element) : splited = element[0].split(',') splited[1] = splited[1][1:] tablerow = { 'category' : splited[0], 'feature' : splited[1], 'score' : element[1], } print(tablerow) return tablerow
after) 새롭게 score, count 컬럼을 만들어주고 table row 로 추가하려고 몇가지 수정하였습니다.
def make_tablerow(element) : print(element) if element is not None : splited = element.split(',') array_index = 0 for splited_index in range(int(len(splited)/4)) : tablerow = { 'category' : splited[array_index], 'feature' : splited[array_index+1], 'score' : splited[array_index+2], 'count' : splited[array_index+3], } yield tablerow array_index = array_index +4
ㅡ. toString
def toString(text) : csvFormat = '' for category, nested in text.items(): for feature, two_nested in nested.items(): csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",") yield csvFormat
dict 형식인것을 table row에 쉽게 넣기 위해서 수정하였습니다.
ㅡ. generate_key_value
class generate_key_value(beam.DoFn) : def __init__(self, delimiter=','): self.delimiter = delimiter self.pre_processing = [] self.window = beam.window.GlobalWindow() def process(self, text): self.pre_processing.append(text) def finish_bundle(self): dict = {} for i in range(len(self.pre_processing)): split = self.pre_processing[i].split(',') #split[1] = split[1][1:] #split[2] = split[2][1:] if split[0] not in dict : dict[split[0]] = {} for i in range(len(self.pre_processing)): split = self.pre_processing[i].split(',') if split[1] not in dict[split[0]] : dict[split[0]][split[1]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) if not dict[split[0]][split[1]]: dict[split[0]][split[1]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) if not dict[split[0]][split[1]] : dict[split[0]][split[1]]['score'] = tempi dict[split[0]][split[1]]['count'] = 1 else : dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi dict[split[0]][split[1]]['count'] += 1 print(dict) yield beam.utils.windowed_value.WindowedValue( value=self.pre_processing, timestamp=0, windows=[self.window], )
카테고리 별 특징에 대한 점수와 분포횟수를 알고 싶어 위 코드를 추가했습니다.
ㅡ. Test(Local)

테이블 형식으로 만들어 줍니다. category, feature, score column을 가진 테이블을 생성하게 됩니다.

feature_basic이란 테이블을 bigQuery에서 생성하였습니다.
#apache_beam from apache_beam.options.pipeline_options import PipelineOptions import apache_beam as beam pipeline_options = PipelineOptions( project='project-id', runner='dataflow', temp_location='bucket-location' ) def pcollection_dofn_methods_basic(test=None): import apache_beam as beam class generate_key_value(beam.DoFn) : def __init__(self, delimiter=','): self.delimiter = delimiter self.pre_processing = [] self.window = beam.window.GlobalWindow() def process(self, text): self.pre_processing.append(text) def finish_bundle(self): dict = {} for i in range(len(self.pre_processing)): split = self.pre_processing[i].split(',') #split[1] = split[1][1:] #split[2] = split[2][1:] if split[0] not in dict : dict[split[0]] = {} for i in range(len(self.pre_processing)): split = self.pre_processing[i].split(',') if split[1] not in dict[split[0]] : dict[split[0]][split[1]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) if not dict[split[0]][split[1]]: dict[split[0]][split[1]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) if not dict[split[0]][split[1]] : dict[split[0]][split[1]]['score'] = tempi dict[split[0]][split[1]]['count'] = 1 else : dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi dict[split[0]][split[1]]['count'] += 1 yield beam.utils.windowed_value.WindowedValue( value=dict, timestamp=0, windows=[self.window], ) def generate_key(element, delimiter=',') : splited = element.split(delimiter) splited[1] = splited[1][1:] return [(f"{splited[0]},{splited[1]}", int(splited[2]))] def toString(text) : csvFormat = '' for category, nested in text.items(): for feature, two_nested in nested.items(): csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",") yield csvFormat def make_tablerow(element): yield element def generate_value(element): element_list = list(element) splited = element[0].split(',') splited[1] = splited[1][1:] return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)] def make_tablerow(element) : if element is not None : splited = element.split(',') array_index = 0 for splited_index in range(int(len(splited)/4)) : tablerow = { 'category' : splited[array_index], 'feature' : splited[array_index+1], 'score' : splited[array_index+2], 'count' : splited[array_index+3], } print(tablerow) yield tablerow array_index = array_index +4 with beam.Pipeline(options=pipeline_options) as pipeline: p1 = ( pipeline | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv", skip_header_lines=1) | beam.ParDo(generate_key_value(',')) | beam.FlatMap(toString) ) p2 = ( p1 | 'Loading' >> beam.ParDo(make_tablerow) | beam.io.WriteToBigQuery( 'project-id:bigQuery-dataset.bigQuery-table', schema='category:STRING, feature:STRING, score:STRING, count:STRING', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) ) if __name__ == '__main__' : pcollection_dofn_methods_basic()
전체코드를 실행시켜봅시다.



실제 dataflow로 처리된 데이터들이 bigQuery table에 잘적재되는것을 확인할 수 있습니다.
'Cloud > Cloud.Dataflow' 카테고리의 다른 글
GCP25 :: dataflow 코드 작성하기 3 (0) | 2020.08.24 |
---|---|
GCP24 :: dataflow 코드 작성하기 2 (0) | 2020.08.24 |
GCP15 :: Apache-Beam ParDo (0) | 2020.08.20 |
댓글
이 글 공유하기
다른 글
-
GCP25 :: dataflow 코드 작성하기 3
GCP25 :: dataflow 코드 작성하기 3
2020.08.24 -
GCP24 :: dataflow 코드 작성하기 2
GCP24 :: dataflow 코드 작성하기 2
2020.08.24 -
GCP15 :: Apache-Beam ParDo
GCP15 :: Apache-Beam ParDo
2020.08.20
댓글을 사용할 수 없습니다.