batch 처리에 대한 apache-beam 코드 수행으로 마지막입니다. 

1,2 와 크게 다르지 않은 형태이지만 example 수준에서 봐주시면 감사하겠습니다.

2020/08/24 - [Cloud] - dataflow 코드작성하기

2020/08/24 - [Cloud/Cloud.Dataflow] - dataflow 코드 작성하기 2

 

위 데이터를 가지고 category 별 language 의 commit 수, language_byte를 처리하는 코드를 작성하겠습니다. 

 

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

pipeline_options = PipelineOptions(
    project='project-id',
    runner='dataflow',
    temp_location='bucket-location'
)

def pcollection_dofn_methods_basic(test=None):
    import apache_beam as beam
    def make_tablerow(element) :
        if element is not None :
            splited = element.split(',')
            array_index = 0
            for splited_index in range(int(len(splited)/4)) :
                tablerow = {
                    'category' : splited[array_index],
                    'language' : splited[array_index+1],
                    'commit' : splited[array_index+2],
                    'language_bytes' : splited[array_index+3],
                }
                yield tablerow
                array_index = array_index + 4

    def toString(text) :
        csvFormat = ''
        for category, nested in text.items():
            for commit, two_nested in nested.items():
                n = two_nested['language_bytes'] * 0.0000001
                n = round(n,3)
                csvFormat += (category + "," + commit + "," + str(two_nested['commit']) + "," + str(n)+",")
            yield csvFormat

    class generate_key_value(beam.DoFn) :
        def __init__(self, delimiter=','):
            self.delimiter = delimiter
            self.pre_processing = []
            self.window = beam.window.GlobalWindow()

        def process(self, text):
            self.pre_processing.append(text)

        def finish_bundle(self):
            dict = {}
            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                if split[0] not in dict :
                    dict[split[0]] = {}
            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                if split[1] not in dict[split[0]] :
                    dict[split[0]][split[1]] ={}
            for i in range(len(self.pre_processing)) :
               split = self.pre_processing[i].split(',')
               tempi = int(split[2])
               if not dict[split[0]][split[1]]:
                   dict[split[0]][split[1]]={}
            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])
                tempj = int(split[3])
                if not dict[split[0]][split[1]] :
                    dict[split[0]][split[1]]['commit'] = tempi
                    dict[split[0]][split[1]]['language_bytes'] = tempj
                else :
                    dict[split[0]][split[1]]['commit'] += tempi
                    dict[split[0]][split[1]]['language_bytes'] += tempj

            print(self.pre_processing)
            yield beam.utils.windowed_value.WindowedValue(
                value=dict,
                timestamp=0,
                windows=[self.window],
            )
    with beam.Pipeline() as pipeline:
        p1 = (
            pipeline
            | beam.Create([
                'web, vrml, 751, 31181212',
                'web, jsp, 919, 2231212',
                'web, vrml, 500, 30012',
                'web, php, 300, 12334'
            ])
            | 'Split category' >> beam.ParDo(generate_key_value(","))
            | beam.FlatMap(toString)
        )
        p2 = (
            p1
            | 'Loading' >> beam.ParDo(make_tablerow)
            | beam.Map(print)
        )


if __name__ == '__main__' :
    pcollection_dofn_methods_basic()

Local환경에서 코드를 실행시켰을 때 다음 결과를 확인할 수 있습니다. 

 

language_bytes 단위 : KB로 가정

이제 GCP dataflow 에서 실행해봅시다. 그전에 bigQuery 테이블에 담을 것이기 때문에 table을 생성했습니다.

테이블 이름은 commit_basic 으로 했습니다.

 with beam.Pipeline(options=pipeline_options) as pipeline:
        p1 = (
            pipeline
            | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/commit_output.csv",
                                                skip_header_lines=1)
            | 'Split category' >> beam.ParDo(generate_key_value(","))
            | beam.FlatMap(toString)
        )
        p2 = (
            p1
            | 'Loading' >> beam.ParDo(make_tablerow)
            | beam.io.WriteToBigQuery(
            'fluid-crane-284202:prototyping_dataset.commit_basic',
            schema='category:STRING, language:STRING, commit:STRING, language_bytes:STRING',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
        )

으로 실행시키면...!

'Cloud > Cloud.Dataflow' 카테고리의 다른 글

GCP24 :: dataflow 코드 작성하기 2  (0) 2020.08.24
GCP23 :: dataflow 코드작성하기  (0) 2020.08.24
GCP15 :: Apache-Beam ParDo  (0) 2020.08.20