GCP24 :: dataflow 코드 작성하기 2
이전과 비슷한 로직으로 코드를 작성하겠습니다. 다만 조금 더 다양한 Apache-beam 함수를 써서 파이프라인을 구축해보겠습니다.
기본 셋팅은 이전 포스팅을 참고해주세요!
2020/08/24 - [Cloud] - dataflow 코드작성하기
Local환경에서 테스팅 하기 위해서 기본 Data Set을 생성하였습니다.
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Gardening plants' >> beam.Create([
'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',
])
이 데이터 셋을 가지고 트랜스폼 할것은 어떤 카테고리, 몇년도, 랭기지가 몇번 사용되었는지 확인해볼 것입니다.
ㅡ. Local 에서 데이터 파이프라인
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Gardening plants' >> beam.Create([
'Iot,c c++ python,2015',
'Web,java spring,2016',
'Iot,c c++ spring,2017',
'Iot,c c++ spring,2017',
])
| 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
| 'toString' >> beam.FlatMap(toString)
| 'setTable' >> beam.ParDo(make_tablerow)
| beam.Map(print) \
)
ㅡ. split_category_advanced
class split_category_advanced(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.k = 1
self.pre_processing = []
self.window = beam.window.GlobalWindow()
self.year_dict = {}
self.category_index = 0
self.language_index = 1
self.year_index = 2;
self.result = []
def setup(self):
print('setup')
def start_bundle(self):
print('start_bundle')
def finish_bundle(self):
# category 를 우선 배정
print('finish_bundle')
for ppc_index in range(len(self.pre_processing)) :
if self.category_index == 0 or self.category_index%3 == 0 :
if self.pre_processing[self.category_index] not in self.year_dict :
self.year_dict[self.pre_processing[self.category_index]] = {}
# year 별로 어떤 language 가 나오는지 체크
if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
# { category : { year : {} } }
if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
# { category : { year : c : { }, c++ : { }, java : { }}}
language = self.pre_processing[self.year_index-1].split(' ')
for lang_index in range(len(language)) :
if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
else :
self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][
language[lang_index]] += 1
self.year_index = self.year_index + 3
self.category_index = self.category_index + 1
yield beam.utils.windowed_value.WindowedValue(
value=self.year_dict,
#value = self.pre_processing,
timestamp=0,
windows=[self.window],
)
beam.DoFn을 실행시키게 되면 다음 순서로 def 가 동작하게 됩니다.
setup -> start_bundle -> process -> finish_bundle 입니다.
사진과 같이 라이프사이클을 가지고 있습니다.
따라서, process에서 한 row 처리하는것에 대한 병렬처리를 수행하는 로직을 작성하고 이것들을 모아서 finish_bundle 에서 처리하게 될것입니다.
ㅡ. toString
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for year in nested:
for language in nested[year]:
csvFormat += (category + "," + str(year) + "," + language + "," + str(nested[year][language]) + ",")
yield csvFormat
ㅡ. make_tablerow
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'year' : splited[array_index+1],
'language' : splited[array_index+2],
'count' : splited[array_index+3],
}
yield tablerow
array_index = array_index + 4
Local 에서 테스팅을 수행했으니, Cloud Dataflow 로 실행시켜서 bigQuery 테이블에 적재해봅시다. 코드를 수정하고...
with beam.Pipeline(options=pipeline_options) as pipeline:
results = (
pipeline
| 'Read' >> beam.io.ReadFromText("[bucket-location]", skip_header_lines=1)
| 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
| 'toString' >> beam.FlatMap(toString)
| 'setTable' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'[project-id]:[bigQuery-dataset].[bigQuery-table]',
schema='category:STRING, year:STRING, language:STRING, count:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
'Cloud > Cloud.Dataflow' 카테고리의 다른 글
GCP25 :: dataflow 코드 작성하기 3 (0) | 2020.08.24 |
---|---|
GCP23 :: dataflow 코드작성하기 (0) | 2020.08.24 |
GCP15 :: Apache-Beam ParDo (0) | 2020.08.20 |
댓글
이 글 공유하기
다른 글
-
GCP25 :: dataflow 코드 작성하기 3
GCP25 :: dataflow 코드 작성하기 3
2020.08.24 -
GCP23 :: dataflow 코드작성하기
GCP23 :: dataflow 코드작성하기
2020.08.24 -
GCP15 :: Apache-Beam ParDo
GCP15 :: Apache-Beam ParDo
2020.08.20