dataflow
Java로 ETL 수행 (Sample Code)
Java로 ETL 수행 (Sample Code)
2021.11.15Dataflow for Java Dataflow는 Serverless 로 제공되며 통합 스트리밍 및 일괄 데이터 처리를 지원하고 있습니다. Apache-Beam 모델을 활용해 코드 레벨에서 데이터 처리를 수행할 수 있습니다. 해당 세미나에서는 Dataflow를 Java로 코드 구성하고 실행해보는 과정을 살펴보겠습니다. Prerequisites JDK Apache Maven IntelliJ IDEA Community Edition Spring Initializer Local Machine Step1 아래 명령어를 통해 java, mvn 버전 확인을 진행합니다. $ java --version $ mvn --version java version 확인 시 mvn version 확인 시 Model Apache B..
GCP25 :: dataflow 코드 작성하기 3
GCP25 :: dataflow 코드 작성하기 3
2020.08.24batch 처리에 대한 apache-beam 코드 수행으로 마지막입니다. 1,2 와 크게 다르지 않은 형태이지만 example 수준에서 봐주시면 감사하겠습니다. 2020/08/24 - [Cloud] - dataflow 코드작성하기 2020/08/24 - [Cloud/Cloud.Dataflow] - dataflow 코드 작성하기 2 #apache_beam from apache_beam.options.pipeline_options import PipelineOptions import apache_beam as beam pipeline_options = PipelineOptions( project='project-id', runner='dataflow', temp_location='bucket-location..
bigQuery에서 pub/sub 으로 불러오고, compute engine 에 올려서 event stream 수행하기
bigQuery에서 pub/sub 으로 불러오고, compute engine 에 올려서 event stream 수행하기
2020.08.21bigQuery table 에서 event stream 을 수행하면서 publisher 의 topic에 message를 전송하고 이를 apache-beam 상에서 코드로 동작시켜 ETL을 수행하는것까지 진행!! event stream은 local에서 compute engine 동작시켜서 stream이 끝날 때까지 수행하는것을 역할로 잡았습니다. 미리 public-data-set 의 stackoverflow dataset을 bigQuery table로 Import 했습니다. 제가 이 테스팅을 진행하는건 tags 와 view_count를 가지고 중복되는 tags 이름에 총 몇번의 view_count를 가지고 있는지 확인하려고 합니다. (물론 | 기준으로 split 해야합니다.) pub/sub에서 stream..
GCP13 :: dataflow Map 활용, Lambda
GCP13 :: dataflow Map 활용, Lambda
2020.08.18Lambda 를 사용하기 전 import apache_beam as beam def strip_header_and_newline(text): return text.strip('\n') def strip_header_and_newline2(text): return text.strip('#') with beam.Pipeline() as pipeline: plants = ( pipeline | 'Gardening plants' >> beam.Create([ '# 🍓Strawberry\n', '# 🥕Carrot\n', '# 🍆Eggplant\n', '# 🍅Tomato\n', '# 🥔Potato\n', ]) | 'Strip header' >> beam.Map(strip_header_and_newline) #| '..
GCP09 :: Apache Beam Functions
GCP09 :: Apache Beam Functions
2020.08.14Filter Filter를 위한 함수를 생성하고 datapipeline을 생성할때 해당 함수를 걸어주면 Filter 가 진행됩니다. e.g) | 'Filter' >> beam.Filter([function-name]) 굳이, 함수를 생성하지 않고도 lambda 를 생성하여 Filter를 거는 방법도 있습니다. e.g) beam.Filter(lambda plant : plant['duration'] == 'perennial' ) => key가 duration 이고 value가 perennial 인것을 Filter Map str.strip function을 사용하는데 strip가 의미하는것은 화이트스페이스, \n, tabs등을 제거해서 보여준다. 이외에도 replace 기능들을 제공해줍니다. Pardo 한줄..
GCP07 :: Dataflow(작성중)
GCP07 :: Dataflow(작성중)
2020.08.12기본적으로 Input -> PCollection -> PCollection -> Output (workflow) 를 가집니다. Google Cloud Platform Console에 들어가보면 Stream, Batch 각각에 대해 다양한 템플릿을 제공하고 있습니다. 프로젝트 방향에 맞게 데이터파이프라인을 구축하는데 많은 도움이 됩니다. 임시 위치는 구글 문서를 참고하면 스테이징된 파이프라인 작업 라고 나와있다. 실제 파이프라인 작업과 유사한 상태를 의미한다고 생각합니다. ㅡ. Custom Template 구글 문서를 참고해보면 런타임 매개변수를 사용해서 파이프라인 내에서 실행되는 함수(e.g, DoFn) 전달해서 템플릿 기반의 파이프라인을 실행 및 맞춤 설정할 수 있다고 소개 되어 있습니다. 즉, 런타임..
GCP04 :: Google Cloud Pub/Sub Dataflow를 사용한 Stream 처리방법[작성중]
GCP04 :: Google Cloud Pub/Sub Dataflow를 사용한 Stream 처리방법[작성중]
2020.08.12Cloud Storage에 포함되어 있는 csv로 bigQuery-table을 생성합니다. Google-Cloud-Platform 에서 Pub/Sub 부분에 topic을 생성합니다. BigQuery로 내보내기를 통해서 pub/sub에서 publisher 가 받은것을 Dataflow 를 통해서 bigQuery로 전달해 줄 것입니다. bigQuery로 내보내기 했을때 위와 같이 web 페이지가 로딩됩니다. 여기서 BigQuery output table 과 임시 위치를 각각 [project-id]:[schema-name].[table-name], gs://[bucket-name]/[foder-name] 으로 지정해줍니다. output table이 저장되는 위치이고 임시 위치(bucket)에서 publisher..