남은 2틀동안 프로젝트를 마감하는 과정을 포스팅 (크로스 체크.. 는 덤)

데이터 정제 처리가 완벽하게 안되서 수정해야 했다.

ㅡ. Category_feature(카테고리별  특징들을 보여줍니다.) - Not Yet

ㅡ. bigQuery(category_feature)

dataflow로 처리된 데이터를 bigQuery table에 담을 것입니다. 

ㅡ. apache-beam(category_feature)

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

pipeline_options = PipelineOptions(
    project='fluid-crane-284202',
    runner='dataflow',
    temp_location='gs://dataflow-sample-ig/temp2'
)

def pcollection_dofn_methods_basic(test=None):
    import apache_beam as beam
    class generate_key_value(beam.DoFn) :
        def __init__(self, delimiter=','):
            self.delimiter = delimiter
            self.pre_processing = []
            self.window = beam.window.GlobalWindow()

        def process(self, text):
            self.pre_processing.append(text)

        def finish_bundle(self):
            dict = {}

            for i in range(len(self.pre_processing)):
                split = self.pre_processing[i].split(',')
                #split[1] = split[1][1:]
                #split[2] = split[2][1:]
                if split[0] not in dict :
                    dict[split[0]] = {}

            for i in range(len(self.pre_processing)):
                split = self.pre_processing[i].split(',')
                if split[1] not in dict[split[0]] :
                    dict[split[0]][split[1]] = {}

            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])

                if not dict[split[0]][split[1]]:
                    dict[split[0]][split[1]] = {}

            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])
                if not dict[split[0]][split[1]] :
                    dict[split[0]][split[1]]['score'] = tempi
                    dict[split[0]][split[1]]['count'] = 1
                else :
                    dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
                    dict[split[0]][split[1]]['count'] += 1


            yield beam.utils.windowed_value.WindowedValue(
                value=dict,
                timestamp=0,
                windows=[self.window],
            )
    def generate_key(element, delimiter=',') :
        splited = element.split(delimiter)
        splited[1] = splited[1][1:]


        return [(f"{splited[0]},{splited[1]}", int(splited[2]))]

    def toString(text) :
        csvFormat = ''
        for category, nested in text.items():
            for feature, two_nested in nested.items():
                csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")

        yield  csvFormat

    def make_tablerow(element):
        yield element
    def generate_value(element):
        element_list = list(element)
        splited = element[0].split(',')
        splited[1] = splited[1][1:]
        return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)]

    def make_tablerow(element) :

        if element is not None :
            splited = element.split(',')
            array_index = 0
            for splited_index in range(int(len(splited)/4)) :
                tablerow = {
                    'category' : splited[array_index],
                    'feature' : splited[array_index+1],
                    'score' : splited[array_index+2],
                    'count' : splited[array_index+3],
                }
                print(tablerow)
                yield tablerow
                array_index = array_index +4

    #with beam.Pipeline() as pipeline:
    with beam.Pipeline(options=pipeline_options) as pipeline:
        p1 = (
            pipeline
            | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
                                                            skip_header_lines=1)
            | beam.ParDo(generate_key_value(','))
            | beam.FlatMap(toString)
        )
        p2 = (
            p1
            | 'Loading' >> beam.ParDo(make_tablerow)
            | beam.io.WriteToBigQuery(
                  'fluid-crane-284202:prototyping_dataset.feature_basic',
                  schema='category:STRING, feature:STRING, score:STRING, count:STRING',
                  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                  write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )
        )


if __name__ == '__main__' :
    pcollection_dofn_methods_basic()

ㅡ. dataflow(category_feature)

 

dataflow(category-feature) 성공

 

 

 

 

 

ㅡ. Category_commit (언어별 commit, language_byte 보여줍니다.) - Not Yet

ㅡ. apache-beam(category_commit)

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

pipeline_options = PipelineOptions(
    project='fluid-crane-284202',
    runner='dataflow',
    temp_location='gs://dataflow-sample-ig/temp2'
)

def pcollection_dofn_methods_basic(test=None):
    import apache_beam as beam
    def make_tablerow(element) :
        if element is not None :
            splited = element.split(',')
            array_index = 0
            for splited_index in range(int(len(splited)/4)) :
                tablerow = {
                    'category' : splited[array_index],
                    'language' : splited[array_index+1],
                    'commit' : splited[array_index+2],
                    'language_bytes' : splited[array_index+3],
                }
                yield tablerow
                array_index = array_index + 4

    def toString(text) :
        csvFormat = ''
        for category, nested in text.items():
            for commit, two_nested in nested.items():
                csvFormat += (category + "," + commit + "," + str(two_nested['commit']) + "," + str(two_nested['language_bytes'])+",")
            yield csvFormat

    class generate_key_value(beam.DoFn) :
        def __init__(self, delimiter=','):
            self.delimiter = delimiter
            self.pre_processing = []
            self.window = beam.window.GlobalWindow()

        def process(self, text):
            self.pre_processing.append(text)

        def finish_bundle(self):
            dict = {}
            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                if split[0] not in dict :
                    dict[split[0]] = {}
            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                if split[1] not in dict[split[0]] :
                    dict[split[0]][split[1]] ={}
            for i in range(len(self.pre_processing)) :
               split = self.pre_processing[i].split(',')
               if not dict[split[0]][split[1]]:
                   dict[split[0]][split[1]]={}
            for i in range(len(self.pre_processing)) :
                split = self.pre_processing[i].split(',')
                tempi = int(split[2])
                tempj = float(split[3])
                if not dict[split[0]][split[1]] :
                    dict[split[0]][split[1]]['commit'] = tempi
                    dict[split[0]][split[1]]['language_bytes'] = tempj
                else :
                    dict[split[0]][split[1]]['commit'] += tempi
                    dict[split[0]][split[1]]['language_bytes'] += tempj

            print(self.pre_processing)
            yield beam.utils.windowed_value.WindowedValue(
                value=dict,
                timestamp=0,
                windows=[self.window],
            )
    with beam.Pipeline(options=pipeline_options) as pipeline:
    #with beam.Pipeline() as pipeline:
        p1 = (
            pipeline
            | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/commit_output.csv",
                                                skip_header_lines=1)
            # | beam.Create([
            # '1,web,vrml,751,31181212',
            # '2,web,jsp,919,2231212',
            # '3,web,vrml,500,30012',
            # '4,web,php,300,12334'
            # ])
            # | beam.Map(print)
            | 'Split category' >> beam.ParDo(generate_key_value(","))
            | beam.FlatMap(toString)
        )
        p2 = (
            p1
            | 'Loading' >> beam.ParDo(make_tablerow)
            | beam.io.WriteToBigQuery(
            'fluid-crane-284202:prototyping_dataset.commit_basic',
            schema='category:STRING, language:STRING, commit:STRING, language_bytes:STRING',
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
        )

        )


if __name__ == '__main__' :
    pcollection_dofn_methods_basic()

ㅡ. dataflow(category_commit)

commit dataflow 성공

ㅡ. bigQuery

bigQuery 테이블을 생성해두고 dataflow를 실행하면 데이터가 들어오게 됩니다. 

데이터가 bigQuery에 적재되면, rest API를 불러와서 Web 프로젝트에서 활용하였습니다.