GCP23 :: dataflow 코드작성하기
위와 같은 데이터가 있습니다. react 안에서 feature 별 횟수를 묶을 생각입니다.
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
파이프 라인 환경설정을 구성하기 위해서 PipelineOptions 를 사용합니다.
pipeline_options = PipelineOptions(
project=[project-id],
runner='dataflow',
temp_location=[bucket-location]
)
기본 설정들을 셋팅 합니다.
python 으로 코드를 작성하였습니다. main function 을 pcollection_dofn_methods_basic 으로 만들었습니다.
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam #Cloud 상에서 apache_beam을 실행시키려면 다음 코드를 입력
파이프라인 작성을 위해서
with beam.Pipeline(options=pipeline_options) as pipeline:
그리고 bucket으로 부터 extract 하는 코드를 작성하였습니다.
with beam.Pipeline(options=pipeline_options) as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
skip_header_lines=1) #skip_header_lines 는 첫번째 row는 무시해라
)
Local 환경에서 테스트 하기 위해서 임시로 데이터를 생성해서 Count 하는 코드를 작성했습니다
f-format을 사용해서 원하는 format을 만들어주고, CombinerPerKey로 key값을 기준으로 value들의 sum을 구해주는 코드입니다.
with beam.Pipeline() as pipeline:
p1 = (
pipeline
# | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
# skip_header_lines=1)
| beam.Create([
'react, Material, 2',
'react, LearningCurve, 9',
'react, LearningCurve, 4',
'react, Material, 2',
'react, Awareness, 2',
'react, Awareness, 6',
])
| beam.ParDo(generate_key)
| beam.CombinePerKey(sum)
| beam.FlatMap(make_list)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'[project-id]:[bigQuery-dataset].[bigQuery-table]',
schema='category:STRING, feature:STRING, score:STRING', #넣을 테이블의 컬럼에 맞춰야합니다.
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
ㅡ. generate_key
def generate_key(element, delimiter=',') :
splited = element.split(delimiter)
return [(f"{splited[0]},{splited[1]}", int(splited[2]))]
ㅡ. CombinePerKey
key값들을 기준으로 value들의 sum을 구해줍니다.
ㅡ. make_list
def make_list(element):
element_list = list(element)
yield element_list
tuple 형식의 데이터들을 list 로 변경했습니다. tuple 자체가 immutable 하기 때문에 데이터 수정을 위해서 list 변형하는 함수를 추가하였습니다.
ㅡ. make_tablerow
before) 이전에 Apache-beam에서 제공해주는 함수를 사용했을 때는 단순히 아래와 같이 하면 됬지만
def make_tablerow(element) :
splited = element[0].split(',')
splited[1] = splited[1][1:]
tablerow = {
'category' : splited[0],
'feature' : splited[1],
'score' : element[1],
}
print(tablerow)
return tablerow
after) 새롭게 score, count 컬럼을 만들어주고 table row 로 추가하려고 몇가지 수정하였습니다.
def make_tablerow(element) :
print(element)
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'feature' : splited[array_index+1],
'score' : splited[array_index+2],
'count' : splited[array_index+3],
}
yield tablerow
array_index = array_index +4
ㅡ. toString
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for feature, two_nested in nested.items():
csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")
yield csvFormat
dict 형식인것을 table row에 쉽게 넣기 위해서 수정하였습니다.
ㅡ. generate_key_value
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
#split[1] = split[1][1:]
#split[2] = split[2][1:]
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['score'] = tempi
dict[split[0]][split[1]]['count'] = 1
else :
dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
dict[split[0]][split[1]]['count'] += 1
print(dict)
yield beam.utils.windowed_value.WindowedValue(
value=self.pre_processing,
timestamp=0,
windows=[self.window],
)
카테고리 별 특징에 대한 점수와 분포횟수를 알고 싶어 위 코드를 추가했습니다.
ㅡ. Test(Local)
테이블 형식으로 만들어 줍니다. category, feature, score column을 가진 테이블을 생성하게 됩니다.
feature_basic이란 테이블을 bigQuery에서 생성하였습니다.
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='project-id',
runner='dataflow',
temp_location='bucket-location'
)
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
#split[1] = split[1][1:]
#split[2] = split[2][1:]
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['score'] = tempi
dict[split[0]][split[1]]['count'] = 1
else :
dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
dict[split[0]][split[1]]['count'] += 1
yield beam.utils.windowed_value.WindowedValue(
value=dict,
timestamp=0,
windows=[self.window],
)
def generate_key(element, delimiter=',') :
splited = element.split(delimiter)
splited[1] = splited[1][1:]
return [(f"{splited[0]},{splited[1]}", int(splited[2]))]
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for feature, two_nested in nested.items():
csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")
yield csvFormat
def make_tablerow(element):
yield element
def generate_value(element):
element_list = list(element)
splited = element[0].split(',')
splited[1] = splited[1][1:]
return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)]
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'feature' : splited[array_index+1],
'score' : splited[array_index+2],
'count' : splited[array_index+3],
}
print(tablerow)
yield tablerow
array_index = array_index +4
with beam.Pipeline(options=pipeline_options) as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
skip_header_lines=1)
| beam.ParDo(generate_key_value(','))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'project-id:bigQuery-dataset.bigQuery-table',
schema='category:STRING, feature:STRING, score:STRING, count:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
if __name__ == '__main__' :
pcollection_dofn_methods_basic()
전체코드를 실행시켜봅시다.
실제 dataflow로 처리된 데이터들이 bigQuery table에 잘적재되는것을 확인할 수 있습니다.
'Cloud > Cloud.Dataflow' 카테고리의 다른 글
GCP25 :: dataflow 코드 작성하기 3 (0) | 2020.08.24 |
---|---|
GCP24 :: dataflow 코드 작성하기 2 (0) | 2020.08.24 |
GCP15 :: Apache-Beam ParDo (0) | 2020.08.20 |
댓글
이 글 공유하기
다른 글
-
GCP25 :: dataflow 코드 작성하기 3
GCP25 :: dataflow 코드 작성하기 3
2020.08.24 -
GCP24 :: dataflow 코드 작성하기 2
GCP24 :: dataflow 코드 작성하기 2
2020.08.24 -
GCP15 :: Apache-Beam ParDo
GCP15 :: Apache-Beam ParDo
2020.08.20