GCP25 :: dataflow 코드 작성하기 3
batch 처리에 대한 apache-beam 코드 수행으로 마지막입니다.
1,2 와 크게 다르지 않은 형태이지만 example 수준에서 봐주시면 감사하겠습니다.
2020/08/24 - [Cloud] - dataflow 코드작성하기
2020/08/24 - [Cloud/Cloud.Dataflow] - dataflow 코드 작성하기 2
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='project-id',
runner='dataflow',
temp_location='bucket-location'
)
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'language' : splited[array_index+1],
'commit' : splited[array_index+2],
'language_bytes' : splited[array_index+3],
}
yield tablerow
array_index = array_index + 4
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for commit, two_nested in nested.items():
n = two_nested['language_bytes'] * 0.0000001
n = round(n,3)
csvFormat += (category + "," + commit + "," + str(two_nested['commit']) + "," + str(n)+",")
yield csvFormat
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] ={}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]]={}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
tempj = int(split[3])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['commit'] = tempi
dict[split[0]][split[1]]['language_bytes'] = tempj
else :
dict[split[0]][split[1]]['commit'] += tempi
dict[split[0]][split[1]]['language_bytes'] += tempj
print(self.pre_processing)
yield beam.utils.windowed_value.WindowedValue(
value=dict,
timestamp=0,
windows=[self.window],
)
with beam.Pipeline() as pipeline:
p1 = (
pipeline
| beam.Create([
'web, vrml, 751, 31181212',
'web, jsp, 919, 2231212',
'web, vrml, 500, 30012',
'web, php, 300, 12334'
])
| 'Split category' >> beam.ParDo(generate_key_value(","))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.Map(print)
)
if __name__ == '__main__' :
pcollection_dofn_methods_basic()
Local환경에서 코드를 실행시켰을 때 다음 결과를 확인할 수 있습니다.
이제 GCP dataflow 에서 실행해봅시다. 그전에 bigQuery 테이블에 담을 것이기 때문에 table을 생성했습니다.
with beam.Pipeline(options=pipeline_options) as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/commit_output.csv",
skip_header_lines=1)
| 'Split category' >> beam.ParDo(generate_key_value(","))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'fluid-crane-284202:prototyping_dataset.commit_basic',
schema='category:STRING, language:STRING, commit:STRING, language_bytes:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
으로 실행시키면...!
'Cloud > Cloud.Dataflow' 카테고리의 다른 글
GCP24 :: dataflow 코드 작성하기 2 (0) | 2020.08.24 |
---|---|
GCP23 :: dataflow 코드작성하기 (0) | 2020.08.24 |
GCP15 :: Apache-Beam ParDo (0) | 2020.08.20 |
댓글
이 글 공유하기
다른 글
-
GCP24 :: dataflow 코드 작성하기 2
GCP24 :: dataflow 코드 작성하기 2
2020.08.24 -
GCP23 :: dataflow 코드작성하기
GCP23 :: dataflow 코드작성하기
2020.08.24 -
GCP15 :: Apache-Beam ParDo
GCP15 :: Apache-Beam ParDo
2020.08.20