IT08 :: 프로젝트 마감
남은 2틀동안 프로젝트를 마감하는 과정을 포스팅 (크로스 체크.. 는 덤)
데이터 정제 처리가 완벽하게 안되서 수정해야 했다.
ㅡ. Category_feature(카테고리별 특징들을 보여줍니다.) - Not Yet
ㅡ. bigQuery(category_feature)
ㅡ. apache-beam(category_feature)
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='fluid-crane-284202',
runner='dataflow',
temp_location='gs://dataflow-sample-ig/temp2'
)
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
#split[1] = split[1][1:]
#split[2] = split[2][1:]
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['score'] = tempi
dict[split[0]][split[1]]['count'] = 1
else :
dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
dict[split[0]][split[1]]['count'] += 1
yield beam.utils.windowed_value.WindowedValue(
value=dict,
timestamp=0,
windows=[self.window],
)
def generate_key(element, delimiter=',') :
splited = element.split(delimiter)
splited[1] = splited[1][1:]
return [(f"{splited[0]},{splited[1]}", int(splited[2]))]
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for feature, two_nested in nested.items():
csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")
yield csvFormat
def make_tablerow(element):
yield element
def generate_value(element):
element_list = list(element)
splited = element[0].split(',')
splited[1] = splited[1][1:]
return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)]
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'feature' : splited[array_index+1],
'score' : splited[array_index+2],
'count' : splited[array_index+3],
}
print(tablerow)
yield tablerow
array_index = array_index +4
#with beam.Pipeline() as pipeline:
with beam.Pipeline(options=pipeline_options) as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
skip_header_lines=1)
| beam.ParDo(generate_key_value(','))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'fluid-crane-284202:prototyping_dataset.feature_basic',
schema='category:STRING, feature:STRING, score:STRING, count:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
if __name__ == '__main__' :
pcollection_dofn_methods_basic()
ㅡ. dataflow(category_feature)
ㅡ. Category_commit (언어별 commit, language_byte 보여줍니다.) - Not Yet
ㅡ. apache-beam(category_commit)
#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='fluid-crane-284202',
runner='dataflow',
temp_location='gs://dataflow-sample-ig/temp2'
)
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'language' : splited[array_index+1],
'commit' : splited[array_index+2],
'language_bytes' : splited[array_index+3],
}
yield tablerow
array_index = array_index + 4
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for commit, two_nested in nested.items():
csvFormat += (category + "," + commit + "," + str(two_nested['commit']) + "," + str(two_nested['language_bytes'])+",")
yield csvFormat
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] ={}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]]={}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
tempj = float(split[3])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['commit'] = tempi
dict[split[0]][split[1]]['language_bytes'] = tempj
else :
dict[split[0]][split[1]]['commit'] += tempi
dict[split[0]][split[1]]['language_bytes'] += tempj
print(self.pre_processing)
yield beam.utils.windowed_value.WindowedValue(
value=dict,
timestamp=0,
windows=[self.window],
)
with beam.Pipeline(options=pipeline_options) as pipeline:
#with beam.Pipeline() as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/commit_output.csv",
skip_header_lines=1)
# | beam.Create([
# '1,web,vrml,751,31181212',
# '2,web,jsp,919,2231212',
# '3,web,vrml,500,30012',
# '4,web,php,300,12334'
# ])
# | beam.Map(print)
| 'Split category' >> beam.ParDo(generate_key_value(","))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'fluid-crane-284202:prototyping_dataset.commit_basic',
schema='category:STRING, language:STRING, commit:STRING, language_bytes:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
if __name__ == '__main__' :
pcollection_dofn_methods_basic()
ㅡ. dataflow(category_commit)
ㅡ. bigQuery
데이터가 bigQuery에 적재되면, rest API를 불러와서 Web 프로젝트에서 활용하였습니다.
'IT' 카테고리의 다른 글
IT10 :: 프로젝트 유지보수 (0) | 2020.10.02 |
---|---|
IT09 :: 2020 : 정보처리기사 2회 후기 (0) | 2020.08.28 |
IT07 :: 프록시 서버란? (0) | 2020.08.26 |
IT06 :: 정보처리기사 (패킷 문제) (0) | 2020.08.15 |
IT05 :: 정보처리기사 문제 정리 - 9급 전산직 컴퓨터일반 (0) | 2020.08.11 |
댓글
이 글 공유하기
다른 글
-
IT10 :: 프로젝트 유지보수
IT10 :: 프로젝트 유지보수
2020.10.02 -
IT09 :: 2020 : 정보처리기사 2회 후기
IT09 :: 2020 : 정보처리기사 2회 후기
2020.08.28 -
IT07 :: 프록시 서버란?
IT07 :: 프록시 서버란?
2020.08.26 -
IT06 :: 정보처리기사 (패킷 문제)
IT06 :: 정보처리기사 (패킷 문제)
2020.08.15