bigQuery table 에서 event stream 을 수행하면서 publisher 의 topic에 message를 전송하고 이를 apache-beam 상에서 코드로 동작시켜 ETL을 수행하는것까지 진행!!

 

event stream은 local에서 compute engine 동작시켜서 stream이 끝날 때까지 수행하는것을 역할로 잡았습니다. 

미리 public-data-set 의 stackoverflow dataset을 bigQuery table로 Import 했습니다. 

 

제가 이 테스팅을 진행하는건 tags 와 view_count를 가지고 중복되는 tags 이름에 총 몇번의 view_count를 가지고 있는지 확인하려고 합니다. (물론 | 기준으로 split 해야합니다.)

pub/sub에서 stream 으로 들어오는것을 ParDo 에서 한 row 씩 처리할 것입니다. 

 

SELECT *
FROM `fluid-crane-284202.prototyping_dataset_stackoverflow.stackoverflow_posts
`
LIMIT 10000

10000줄로 제한하였다. 

 

from google.cloud import pubsub_v1
from google.cloud import bigquery
import time
# TODO(developer)
project_id = "fluid-crane-284202"
topic_id = "test"

# Construct a BigQuery client object.
client = bigquery.Client()

# Configure the batch to publish as soon as there is ten messages,
# one kilobyte of data, or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_messages=10,  # default 100
    max_bytes=1024,  # default 1 MB
    max_latency=1,  # default 10 ms'

)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)

query = """
    SELECT *
FROM `fluid-crane-284202.prototyping_dataset_stackoverflow.stackoverflow_posts`
LIMIT 10000
"""
query_job = client.query(query)

# Resolve the publish future in a separate thread.
def callback(topic_message):
    message_id = topic_message.result()
    print(message_id, end=" finish")

print("The query data:")
for row in query_job:
    data = u"tags={}, view_count={}".format(row[18], row[19])
    print(data)
    data = data.encode("utf-8")
    time.sleep(1)

    topic_message = publisher.publish(topic_path, data=data)
    topic_message.add_done_callback(callback)


print("Published messages with batch settings.")

#result

The query data:
tags=android|url|android-intent|intentfilter|launch, view_count=91060
tags=java|eclipse, view_count=4272
tags=winforms|deployment|clickonce, view_count=2534 ... 

 

이런 결과를 볼 수 있다. streaming 데이터를 pub/sub에 넣고 완료시에 print() 를 확인할 수 있다. 

이제 이 코드를 compute-engine에 올려서 streaming 데이터를 처리해보자!

 

vm 생성

인스턴스 옵션은 다음과 같다.

name : pubsub-ins1

리전 : us-central1

영역 : us-central1-a

머신유형 : n1-standard-1(vCPU 1개, 3.75GB 메모리)

부딩디스크 : Ubuntu 16.04 LTS

ID 및 API 액세스 : 모든 Cloud API에 대한 전체 액세스 허용

방화벽 : HTTP 트래픽 허용, HTTPS 트래픽 허용

 

으로 만들었다.

 

오른쪽에 SSH 버튼을 누르면 터미널 환경에 접속할 수 있다. 

일반 터미널과 동일한 명령어를 내릴 수 있다. 

이제 SSH로 VM에 접속해보자.

 

ssh에 접속하기 위해선 RSA키를 만들어야 한다. (default. home/.ssh/)

ssh-keygen -t rsa -C "구글 계정" 을 치면 

Enter file in which ~~

Enter passphrase ~~~ 둘다 enter 내리 연속 치면 된다. 그 밑도 마찬가지...

 

이제, 키가 저장된 위치로 이동해서 키의 공개키를 cat 으로 읽어오자.

cd ~/.ssh
ls # .pub 으로 달린 파일에 대해서 cat 을 시도할것이다.
cat [rsa-file].pub

key 값

Compute Engine에서 메타데이터로 들어가보자. 그리고 메타데이터에서 SSH키로 들어가 [ 수정 ] 을 클릭한다. 

그리고 key값을 넣어주고 새로 생긴 SSH 키를 확인해보자! 

 

이제 Compute-Engine-VM 엔진에서 외부IP주소를 확인한다. 임시 아이피로 제공된다. 

ssh -i [개인키 위치] [등록한 구글 아이디]@[외부IP] 로 접속하면 된다.

e.g) ssh -i ~/.ssh/id_rsa googleid@123.123.123

 

접속 성공

기본적으로 git과 Python이 깔려있다.

python은 3 version을 사용하고 있어서 업데이트 한다.

이미 우분투에서 가지고 있는 python 버전을 확인해보자.

which python
cd [python-path]
ls -al | grep python 
sudo update-alternatives --install [path] python [path]
e.g) sudo update-alternatives --install /usr/bin/python python /usr/bin/python3.5.2
python --version

python을 3버전으로 올리고

사전에 script를 git에 올린 상태에서 ssh로 접속한 vm에 git clone 

그리고 python3 [python-name] 으로 시작해줬다. 

(필요한 라이브러리는 pypl에서 다운받자, 처음엔 아무것도 없을것이니 pip3 부터...)

 

배치 스트림이 잘동작하는것을 확인할 수 있다. 

'GCP' 카테고리의 다른 글

GCP19 :: CPU와 메모리 관계(2)  (0) 2020.08.21
GCP18 :: LB(Load-Balancer)  (0) 2020.08.21
데이터 스튜디오 이용해 bigQuery 시각화  (0) 2020.08.20
GCP14 :: streaming 데이터 처리  (0) 2020.08.18
GCP13 :: dataflow Map 활용, Lambda  (0) 2020.08.18