이전과 비슷한 로직으로 코드를 작성하겠습니다. 다만 조금 더 다양한 Apache-beam 함수를 써서 파이프라인을 구축해보겠습니다.

기본 셋팅은 이전 포스팅을 참고해주세요!

Local환경에서 테스팅 하기 위해서 기본 Data Set을 생성하였습니다.

    with beam.Pipeline() as pipeline:
      results = (
          | 'Gardening plants' >> beam.Create([
              'Iot,c c++ python,2015',
              'Web,java spring,2016',
              'Iot,c c++ spring,2017',
              'Iot,c c++ spring,2017',

이 데이터 셋을 가지고 트랜스폼 할것은 어떤 카테고리, 몇년도, 랭기지가 몇번 사용되었는지 확인해볼 것입니다. 

ㅡ. Local 에서 데이터 파이프라인

    with beam.Pipeline() as pipeline:
      results = (
          | 'Gardening plants' >> beam.Create([
              'Iot,c c++ python,2015',
              'Web,java spring,2016',
              'Iot,c c++ spring,2017',
              'Iot,c c++ spring,2017',
          | 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
          | 'toString' >> beam.FlatMap(toString)
          | 'setTable' >> beam.ParDo(make_tablerow)
          | beam.Map(print) \

ㅡ. split_category_advanced

    class split_category_advanced(beam.DoFn):
      def __init__(self, delimiter=','):
        self.delimiter = delimiter
        self.k = 1
        self.pre_processing = []
        self.window = beam.window.GlobalWindow()
        self.year_dict = {}
        self.category_index = 0
        self.language_index = 1
        self.year_index = 2;
        self.result = []

      def setup(self):

      def start_bundle(self):

      def finish_bundle(self):
          # category 를 우선 배정
          for ppc_index in range(len(self.pre_processing)) :
              if self.category_index == 0 or self.category_index%3 == 0 :
                  if self.pre_processing[self.category_index] not in self.year_dict :
                      self.year_dict[self.pre_processing[self.category_index]] = {}

              # year 별로 어떤 language 가 나오는지 체크
              if ppc_index + 2 == 2 or ppc_index + 2 == self.year_index :
                  # { category : { year : {} } }
                  if self.pre_processing[self.year_index] not in self.year_dict[self.pre_processing[self.category_index]] :
                         self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] = {}
                  # { category : { year : c : { }, c++ : { }, java : { }}}
                  language = self.pre_processing[self.year_index-1].split(' ')

                  for lang_index in range(len(language)) :
                        if language[lang_index] not in self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]] :
                          self.year_dict[self.pre_processing[self.category_index]][self.pre_processing[self.year_index]][language[lang_index]] = 1
                        else :
                                language[lang_index]] += 1
                  self.year_index = self.year_index + 3
              self.category_index = self.category_index + 1

          yield beam.utils.windowed_value.WindowedValue(
              #value = self.pre_processing,

beam.DoFn을 실행시키게 되면 다음 순서로 def 가 동작하게 됩니다. 

setup -> start_bundle -> process -> finish_bundle 입니다. 



사진과 같이 라이프사이클을 가지고 있습니다. 

따라서, process에서 한 row 처리하는것에 대한 병렬처리를 수행하는 로직을 작성하고 이것들을 모아서 finish_bundle 에서 처리하게 될것입니다. 


ㅡ. toString

 def toString(text) :
        csvFormat = ''
        for category, nested in text.items():
            for year in nested:
                for language in nested[year]:
                    csvFormat += (category + "," + str(year) + "," + language + "," + str(nested[year][language]) + ",")
        yield csvFormat


ㅡ. make_tablerow

    def make_tablerow(element) :
        if element is not None :
            splited = element.split(',')
            array_index = 0
            for splited_index in range(int(len(splited)/4)) :
                tablerow = {
                    'category' : splited[array_index],
                    'year' : splited[array_index+1],
                    'language' : splited[array_index+2],
                    'count' : splited[array_index+3],
                yield tablerow
                array_index = array_index + 4

원하는 결과를 확인할 수 있습니다.

Local 에서 테스팅을 수행했으니, Cloud Dataflow 로 실행시켜서 bigQuery 테이블에 적재해봅시다. 코드를 수정하고...

    with beam.Pipeline(options=pipeline_options) as pipeline:
      results = (
          | 'Read' >> beam.io.ReadFromText("[bucket-location]", skip_header_lines=1)
          | 'Split category advanced' >> beam.ParDo(split_category_advanced(','))
          | 'toString' >> beam.FlatMap(toString)
          | 'setTable' >> beam.ParDo(make_tablerow)
          | beam.io.WriteToBigQuery(
                  schema='category:STRING, year:STRING, language:STRING, count:STRING',

side input이나 복잡한 연산을 필요로 하는 과정이 아니기 때문에 굉장히 단순한 dataflow 형태가 나옵니다.
마찬가지로 bigQuery table에 잘 들어갑니다.

