남은 2틀동안 프로젝트를 마감하는 과정을 포스팅 (크로스 체크.. 는 덤)

데이터 정제 처리가 완벽하게 안되서 수정해야 했다.

ㅡ. Category_feature(카테고리별  특징들을 보여줍니다.) - Not Yet

ㅡ. bigQuery(category_feature)

dataflow로 처리된 데이터를 bigQuery table에 담을 것입니다. 

ㅡ. apache-beam(category_feature)

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='fluid-crane-284202',
runner='dataflow',
temp_location='gs://dataflow-sample-ig/temp2'
)
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
#split[1] = split[1][1:]
#split[2] = split[2][1:]
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)):
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['score'] = tempi
dict[split[0]][split[1]]['count'] = 1
else :
dict[split[0]][split[1]]['score'] = dict[split[0]][split[1]]['score'] + tempi
dict[split[0]][split[1]]['count'] += 1
yield beam.utils.windowed_value.WindowedValue(
value=dict,
timestamp=0,
windows=[self.window],
)
def generate_key(element, delimiter=',') :
splited = element.split(delimiter)
splited[1] = splited[1][1:]
return [(f"{splited[0]},{splited[1]}", int(splited[2]))]
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for feature, two_nested in nested.items():
csvFormat += (category + "," + feature + "," + str(two_nested['score']) + "," +str(two_nested['count'])+",")
yield csvFormat
def make_tablerow(element):
yield element
def generate_value(element):
element_list = list(element)
splited = element[0].split(',')
splited[1] = splited[1][1:]
return [(f"{splited[0]}, {splited[1]}, {element_list[1]}",1)]
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'feature' : splited[array_index+1],
'score' : splited[array_index+2],
'count' : splited[array_index+3],
}
print(tablerow)
yield tablerow
array_index = array_index +4
#with beam.Pipeline() as pipeline:
with beam.Pipeline(options=pipeline_options) as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/feature_output.csv",
skip_header_lines=1)
| beam.ParDo(generate_key_value(','))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'fluid-crane-284202:prototyping_dataset.feature_basic',
schema='category:STRING, feature:STRING, score:STRING, count:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
if __name__ == '__main__' :
pcollection_dofn_methods_basic()

ㅡ. dataflow(category_feature)

 

dataflow(category-feature) 성공

 

 

 

 

 

ㅡ. Category_commit (언어별 commit, language_byte 보여줍니다.) - Not Yet

ㅡ. apache-beam(category_commit)

#apache_beam
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam
pipeline_options = PipelineOptions(
project='fluid-crane-284202',
runner='dataflow',
temp_location='gs://dataflow-sample-ig/temp2'
)
def pcollection_dofn_methods_basic(test=None):
import apache_beam as beam
def make_tablerow(element) :
if element is not None :
splited = element.split(',')
array_index = 0
for splited_index in range(int(len(splited)/4)) :
tablerow = {
'category' : splited[array_index],
'language' : splited[array_index+1],
'commit' : splited[array_index+2],
'language_bytes' : splited[array_index+3],
}
yield tablerow
array_index = array_index + 4
def toString(text) :
csvFormat = ''
for category, nested in text.items():
for commit, two_nested in nested.items():
csvFormat += (category + "," + commit + "," + str(two_nested['commit']) + "," + str(two_nested['language_bytes'])+",")
yield csvFormat
class generate_key_value(beam.DoFn) :
def __init__(self, delimiter=','):
self.delimiter = delimiter
self.pre_processing = []
self.window = beam.window.GlobalWindow()
def process(self, text):
self.pre_processing.append(text)
def finish_bundle(self):
dict = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if split[0] not in dict :
dict[split[0]] = {}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if split[1] not in dict[split[0]] :
dict[split[0]][split[1]] ={}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
if not dict[split[0]][split[1]]:
dict[split[0]][split[1]]={}
for i in range(len(self.pre_processing)) :
split = self.pre_processing[i].split(',')
tempi = int(split[2])
tempj = float(split[3])
if not dict[split[0]][split[1]] :
dict[split[0]][split[1]]['commit'] = tempi
dict[split[0]][split[1]]['language_bytes'] = tempj
else :
dict[split[0]][split[1]]['commit'] += tempi
dict[split[0]][split[1]]['language_bytes'] += tempj
print(self.pre_processing)
yield beam.utils.windowed_value.WindowedValue(
value=dict,
timestamp=0,
windows=[self.window],
)
with beam.Pipeline(options=pipeline_options) as pipeline:
#with beam.Pipeline() as pipeline:
p1 = (
pipeline
| 'extract' >> beam.io.ReadFromText("gs://external_source_bucket/commit_output.csv",
skip_header_lines=1)
# | beam.Create([
# '1,web,vrml,751,31181212',
# '2,web,jsp,919,2231212',
# '3,web,vrml,500,30012',
# '4,web,php,300,12334'
# ])
# | beam.Map(print)
| 'Split category' >> beam.ParDo(generate_key_value(","))
| beam.FlatMap(toString)
)
p2 = (
p1
| 'Loading' >> beam.ParDo(make_tablerow)
| beam.io.WriteToBigQuery(
'fluid-crane-284202:prototyping_dataset.commit_basic',
schema='category:STRING, language:STRING, commit:STRING, language_bytes:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)
)
if __name__ == '__main__' :
pcollection_dofn_methods_basic()

ㅡ. dataflow(category_commit)

commit dataflow 성공

ㅡ. bigQuery

bigQuery 테이블을 생성해두고 dataflow를 실행하면 데이터가 들어오게 됩니다. 

데이터가 bigQuery에 적재되면, rest API를 불러와서 Web 프로젝트에서 활용하였습니다.