위와 같은 데이터가 있습니다. react 안에서 feature 별 횟수를 묶을 생각입니다. 

 

from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

파이프 라인 환경설정을 구성하기 위해서 PipelineOptions 를 사용합니다. 

pipeline_options = PipelineOptions(
    project=[project-id],
    runner='dataflow',
    temp_location=[bucket-location]
)

기본 설정들을 셋팅 합니다.

 

python 으로 코드를 작성하였습니다. main function 을 pcollection_dofn_methods_basic 으로 만들었습니다. 

def pcollection_dofn_methods_basic(test=None):
	import apache_beam as beam #Cloud 상에서 apache_beam을 실행시키려면 다음 코드를 입력

 

파이프라인 작성을 위해서 

with beam.Pipeline(options=pipeline_options) as pipeline:

그리고 bucket으로 부터 extract 하는 코드를 작성하였습니다.

    with beam.Pipeline(options=pipeline_options) as pipeline:
        p1 = (
            pipeline
            | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
                                                            skip_header_lines=1) #skip_header_lines 는 첫번째 row는 무시해라
        )

Local 환경에서 테스트 하기 위해서 임시로 데이터를 생성해서 Count 하는 코드를 작성했습니다 

f-format을 사용해서 원하는 format을 만들어주고, CombinerPerKey로 key값을 기준으로 value들의 sum을 구해주는 코드입니다. 

    with beam.Pipeline() as pipeline:
        p1 = (
            pipeline
            # | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
            #                                                 skip_header_lines=1)
            | beam.Create([
                'react, Material, 2',
                'react, LearningCurve, 9',
                'react, LearningCurve, 4',
                'react, Material, 2',
                'react, Awareness, 2',
                'react, Awareness, 6',
            ])
            | beam.ParDo(generate_key)
            | beam.CombinePerKey(sum)
            | beam.FlatMap(make_list)
        )
	   p2 = (
            p1
            | 'Loading' >> beam.ParDo(make_tablerow)
            | beam.io.WriteToBigQuery(
                  '[project-id]:[bigQuery-dataset].[bigQuery-table]',
                  schema='category:STRING, feature:STRING, score:STRING', #넣을 테이블의 컬럼에 맞춰야합니다.
                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                  write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
        )

 

ㅡ. generate_key

    def generate_key(element, delimiter=',') :
        splited = element.split(delimiter)
        return [(f"{splited[0]},{splited[1]}", int(splited[2]))]

ㅡ. CombinePerKey

key값들을 기준으로 value들의 sum을 구해줍니다. 

 

ㅡ. make_list

    def make_list(element):
        element_list = list(element)
        yield element_list

tuple 형식의 데이터들을 list 로 변경했습니다. tuple 자체가 immutable 하기 때문에 데이터 수정을 위해서 list 변형하는 함수를 추가하였습니다. 

 

ㅡ. make_tablerow

before) 이전에 Apache-beam에서 제공해주는 함수를 사용했을 때는 단순히 아래와 같이 하면 됬지만

    def make_tablerow(element) :

        splited = element[0].split(',')
        splited[1] = splited[1][1:]

        tablerow = {
            'category' : splited[0],
            'feature' : splited[1],
            'score' : element[1],
        }
        print(tablerow)
        return tablerow

after) 새롭게 score, count 컬럼을 만들어주고 table row 로 추가하려고 몇가지 수정하였습니다.

    def make_tablerow(element) :
        print(element)
        if element is not None :
            splited = element.split(',')
            array_index = 0
            for splited_index in range(int(len(splited)/4)) :
                tablerow = {
                    'category' : splited[array_index],
                    'feature' : splited[array_index+1],
                    'score' : splited[array_index+2],
                    'count' : splited[array_index+3],
                }
                yield tablerow
                array_index = array_index +4

ㅡ. toString

    def toString(text) :
        csvFormat = ''
        for category, nested in text.items():
            for feature, two_nested in nested.items():
                csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")

        yield  csvFormat

dict 형식인것을 table row에 쉽게 넣기 위해서 수정하였습니다.

 

ㅡ. generate_key_value

    class generate_key_value(beam.DoFn) :
        def __init__(self, delimiter=','):
            self.delimiter = delimiter
            self.pre_processing = []
            self.window = beam.window.GlobalWindow()

        def process(self, text):
            self.pre_processing.append(text)

        def finish_bundle(self):
            dict = {}

            for i in range(len(self.pre_processing)):
                split = self.pre_processing[i].split(',')
                #split[1] = split[1][1:]
                #split[2] = split[2][1:]
                if split[0] not in dict :
                    dict[split[0]] = {}

            for i in range(len(self.pre_processing)):
                split = self.pre_processing[i].split(',')
                if split[1] not in dict[split[0]] :
                    dict[split[0]][split[1]] = {}

            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])

                if not dict[split[0]][split[1]]:
                    dict[split[0]][split[1]] = {}

            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])
                if not dict[split[0]][split[1]] :
                    dict[split[0]][split[1]]['score'] = tempi
                    dict[split[0]][split[1]]['count'] = 1
                else :
                    dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
                    dict[split[0]][split[1]]['count'] += 1

            print(dict)
            yield beam.utils.windowed_value.WindowedValue(
                value=self.pre_processing,
                timestamp=0,
                windows=[self.window],
            )

카테고리 별 특징에 대한 점수와 분포횟수를 알고 싶어 위 코드를 추가했습니다.

 

ㅡ. Test(Local)

 

원하는 결과물을 확인할 수 있습니다.

테이블 형식으로 만들어 줍니다. category, feature, score column을 가진 테이블을 생성하게 됩니다.

feature_basic이란 테이블을 bigQuery에서 생성하였습니다. 

 

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

pipeline_options = PipelineOptions(
    project='project-id',
    runner='dataflow',
    temp_location='bucket-location'
)

def pcollection_dofn_methods_basic(test=None):
    import apache_beam as beam
    class generate_key_value(beam.DoFn) :
        def __init__(self, delimiter=','):
            self.delimiter = delimiter
            self.pre_processing = []
            self.window = beam.window.GlobalWindow()

        def process(self, text):
            self.pre_processing.append(text)

        def finish_bundle(self):
            dict = {}

            for i in range(len(self.pre_processing)):
                split = self.pre_processing[i].split(',')
                #split[1] = split[1][1:]
                #split[2] = split[2][1:]
                if split[0] not in dict :
                    dict[split[0]] = {}

            for i in range(len(self.pre_processing)):
                split = self.pre_processing[i].split(',')
                if split[1] not in dict[split[0]] :
                    dict[split[0]][split[1]] = {}

            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])

                if not dict[split[0]][split[1]]:
                    dict[split[0]][split[1]] = {}

            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])
                if not dict[split[0]][split[1]] :
                    dict[split[0]][split[1]]['score'] = tempi
                    dict[split[0]][split[1]]['count'] = 1
                else :
                    dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
                    dict[split[0]][split[1]]['count'] += 1


            yield beam.utils.windowed_value.WindowedValue(
                value=dict,
                timestamp=0,
                windows=[self.window],
            )
    def generate_key(element, delimiter=',') :
        splited = element.split(delimiter)
        splited[1] = splited[1][1:]


        return [(f"{splited[0]},{splited[1]}", int(splited[2]))]

    def toString(text) :
        csvFormat = ''
        for category, nested in text.items():
            for feature, two_nested in nested.items():
                csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")

        yield  csvFormat

    def make_tablerow(element):
        yield element
    def generate_value(element):
        element_list = list(element)
        splited = element[0].split(',')
        splited[1] = splited[1][1:]
        return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)]

    def make_tablerow(element) :

        if element is not None :
            splited = element.split(',')
            array_index = 0
            for splited_index in range(int(len(splited)/4)) :
                tablerow = {
                    'category' : splited[array_index],
                    'feature' : splited[array_index+1],
                    'score' : splited[array_index+2],
                    'count' : splited[array_index+3],
                }
                print(tablerow)
                yield tablerow
                array_index = array_index +4


    with beam.Pipeline(options=pipeline_options) as pipeline:
        p1 = (
            pipeline
            | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
                                                            skip_header_lines=1)
            | beam.ParDo(generate_key_value(','))
            | beam.FlatMap(toString)
        )
        p2 = (
            p1
            | 'Loading' >> beam.ParDo(make_tablerow)
            | beam.io.WriteToBigQuery(
                  'project-id:bigQuery-dataset.bigQuery-table',
                  schema='category:STRING, feature:STRING, score:STRING, count:STRING',
                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                  write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
        )


if __name__ == '__main__' :
    pcollection_dofn_methods_basic()

전체코드를 실행시켜봅시다.

성공

실제 dataflow로 처리된 데이터들이 bigQuery table에 잘적재되는것을 확인할 수 있습니다. 

'Cloud > Cloud.Dataflow' 카테고리의 다른 글

GCP25 :: dataflow 코드 작성하기 3  (0) 2020.08.24
GCP24 :: dataflow 코드 작성하기 2  (0) 2020.08.24
GCP15 :: Apache-Beam ParDo  (0) 2020.08.20