IT08 :: 프로젝트 마감
남은 2틀동안 프로젝트를 마감하는 과정을 포스팅 (크로스 체크.. 는 덤)
데이터 정제 처리가 완벽하게 안되서 수정해야 했다.
ㅡ. Category_feature(카테고리별 특징들을 보여줍니다.) - Not Yet
ㅡ. bigQuery(category_feature)

ㅡ. apache-beam(category_feature)
#apache_beam from apache_beam.options.pipeline_options import PipelineOptions import apache_beam as beam pipeline_options = PipelineOptions( project='fluid-crane-284202', runner='dataflow', temp_location='gs://dataflow-sample-ig/temp2' ) def pcollection_dofn_methods_basic(test=None): import apache_beam as beam class generate_key_value(beam.DoFn) : def __init__(self, delimiter=','): self.delimiter = delimiter self.pre_processing = [] self.window = beam.window.GlobalWindow() def process(self, text): self.pre_processing.append(text) def finish_bundle(self): dict = {} for i in range(len(self.pre_processing)): split = self.pre_processing[i].split(',') #split[1] = split[1][1:] #split[2] = split[2][1:] if split[0] not in dict : dict[split[0]] = {} for i in range(len(self.pre_processing)): split = self.pre_processing[i].split(',') if split[1] not in dict[split[0]] : dict[split[0]][split[1]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) if not dict[split[0]][split[1]]: dict[split[0]][split[1]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) if not dict[split[0]][split[1]] : dict[split[0]][split[1]]['score'] = tempi dict[split[0]][split[1]]['count'] = 1 else : dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi dict[split[0]][split[1]]['count'] += 1 yield beam.utils.windowed_value.WindowedValue( value=dict, timestamp=0, windows=[self.window], ) def generate_key(element, delimiter=',') : splited = element.split(delimiter) splited[1] = splited[1][1:] return [(f"{splited[0]},{splited[1]}", int(splited[2]))] def toString(text) : csvFormat = '' for category, nested in text.items(): for feature, two_nested in nested.items(): csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",") yield csvFormat def make_tablerow(element): yield element def generate_value(element): element_list = list(element) splited = element[0].split(',') splited[1] = splited[1][1:] return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)] def make_tablerow(element) : if element is not None : splited = element.split(',') array_index = 0 for splited_index in range(int(len(splited)/4)) : tablerow = { 'category' : splited[array_index], 'feature' : splited[array_index+1], 'score' : splited[array_index+2], 'count' : splited[array_index+3], } print(tablerow) yield tablerow array_index = array_index +4 #with beam.Pipeline() as pipeline: with beam.Pipeline(options=pipeline_options) as pipeline: p1 = ( pipeline | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv", skip_header_lines=1) | beam.ParDo(generate_key_value(',')) | beam.FlatMap(toString) ) p2 = ( p1 | 'Loading' >> beam.ParDo(make_tablerow) | beam.io.WriteToBigQuery( 'fluid-crane-284202:prototyping_dataset.feature_basic', schema='category:STRING, feature:STRING, score:STRING, count:STRING', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) ) if __name__ == '__main__' : pcollection_dofn_methods_basic()
ㅡ. dataflow(category_feature)

ㅡ. Category_commit (언어별 commit, language_byte 보여줍니다.) - Not Yet
ㅡ. apache-beam(category_commit)
#apache_beam from apache_beam.options.pipeline_options import PipelineOptions import apache_beam as beam pipeline_options = PipelineOptions( project='fluid-crane-284202', runner='dataflow', temp_location='gs://dataflow-sample-ig/temp2' ) def pcollection_dofn_methods_basic(test=None): import apache_beam as beam def make_tablerow(element) : if element is not None : splited = element.split(',') array_index = 0 for splited_index in range(int(len(splited)/4)) : tablerow = { 'category' : splited[array_index], 'language' : splited[array_index+1], 'commit' : splited[array_index+2], 'language_bytes' : splited[array_index+3], } yield tablerow array_index = array_index + 4 def toString(text) : csvFormat = '' for category, nested in text.items(): for commit, two_nested in nested.items(): csvFormat += (category + "," + commit + "," + str(two_nested['commit']) + "," + str(two_nested['language_bytes'])+",") yield csvFormat class generate_key_value(beam.DoFn) : def __init__(self, delimiter=','): self.delimiter = delimiter self.pre_processing = [] self.window = beam.window.GlobalWindow() def process(self, text): self.pre_processing.append(text) def finish_bundle(self): dict = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') if split[0] not in dict : dict[split[0]] = {} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') if split[1] not in dict[split[0]] : dict[split[0]][split[1]] ={} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') if not dict[split[0]][split[1]]: dict[split[0]][split[1]]={} for i in range(len(self.pre_processing)) : split = self.pre_processing[i].split(',') tempi = int(split[2]) tempj = float(split[3]) if not dict[split[0]][split[1]] : dict[split[0]][split[1]]['commit'] = tempi dict[split[0]][split[1]]['language_bytes'] = tempj else : dict[split[0]][split[1]]['commit'] += tempi dict[split[0]][split[1]]['language_bytes'] += tempj print(self.pre_processing) yield beam.utils.windowed_value.WindowedValue( value=dict, timestamp=0, windows=[self.window], ) with beam.Pipeline(options=pipeline_options) as pipeline: #with beam.Pipeline() as pipeline: p1 = ( pipeline | 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/commit_output.csv", skip_header_lines=1) # | beam.Create([ # '1,web,vrml,751,31181212', # '2,web,jsp,919,2231212', # '3,web,vrml,500,30012', # '4,web,php,300,12334' # ]) # | beam.Map(print) | 'Split category' >> beam.ParDo(generate_key_value(",")) | beam.FlatMap(toString) ) p2 = ( p1 | 'Loading' >> beam.ParDo(make_tablerow) | beam.io.WriteToBigQuery( 'fluid-crane-284202:prototyping_dataset.commit_basic', schema='category:STRING, language:STRING, commit:STRING, language_bytes:STRING', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) ) if __name__ == '__main__' : pcollection_dofn_methods_basic()
ㅡ. dataflow(category_commit)

ㅡ. bigQuery

데이터가 bigQuery에 적재되면, rest API를 불러와서 Web 프로젝트에서 활용하였습니다.
'IT' 카테고리의 다른 글
IT10 :: 프로젝트 유지보수 (0) | 2020.10.02 |
---|---|
IT09 :: 2020 : 정보처리기사 2회 후기 (0) | 2020.08.28 |
IT07 :: 프록시 서버란? (0) | 2020.08.26 |
IT06 :: 정보처리기사 (패킷 문제) (0) | 2020.08.15 |
IT05 :: 정보처리기사 문제 정리 - 9급 전산직 컴퓨터일반 (0) | 2020.08.11 |
댓글
이 글 공유하기
다른 글
-
IT10 :: 프로젝트 유지보수
IT10 :: 프로젝트 유지보수
2020.10.02 -
IT09 :: 2020 : 정보처리기사 2회 후기
IT09 :: 2020 : 정보처리기사 2회 후기
2020.08.28 -
IT07 :: 프록시 서버란?
IT07 :: 프록시 서버란?
2020.08.26 -
IT06 :: 정보처리기사 (패킷 문제)
IT06 :: 정보처리기사 (패킷 문제)
2020.08.15
댓글을 사용할 수 없습니다.