GCP: Repeat from Pub / Sub to Cloud Functions, Cloud Functions to Pub / Sub

Below is the architecture image of this time.

image.png

Things to prepare

・ Two Topic in Pub / Sub ┗ Name them "topic_1" and "topic_2"

・ Cloud Scheduler ┗ Set the topic to "topic_1" and the payload to "hello"

・ Two Functions with Cloud Functions ┗ Name them "function_1" and "function_2" Trigger of "function_1" sets "topic_1" of Pub / Sub Trigger of "function_2" sets "topic_2" of Pub / Sub

Contents of function_1

In the following "event_message", the character string such as the payload "hello" set by Cloud Scheduler via topic_1 is stored.

If N Cloud Schedulers are changed only in payload and set to the same settings, event_message can be judged in function_1 and subsequent processing can be performed.

def main(event, context):
    event_message = base64.b64decode(event['data']).decode('utf-8')

Then suppose you pass a list to function_2 Pub / Sub can only pass text, so you need to encode it

from google.cloud import pubsub_v1

PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()

topic_id = "topic_2" #Set the topic to pass next

topic_path = client.topic_path(PROJECT_ID, topic_id)

pub_text = ["Apple", "gorilla", "rap"]

data = pub_text.encode() #Encode here
client.publish(topic_path, data=data) #This is topic_2 to pub_text is pushed

In the last line above,'[" apple", "gorilla", "trumpet"]' is passed to topic_2 and function_2 is ignited.

Contents of function_2

In the event_message below,'[" apple", "gorilla", "trumpet"]' is included, so eval it and return it to the python list. What is the subsequent processing?

def main(event, context):
    event_message = base64.b64decode(event['data']).decode('utf-8')
    fruit_lst = eval(event_message)

Try to start function_2 in parallel

Cloud Functions can be started in parallel as shown in the official document below. https://cloud.google.com/functions/quotas?hl=ja#scalability

Therefore, if you publish function_1 to Pub / Sub while looping as follows, function_2 will get 3 fruits.

function_1.py


from google.cloud import pubsub_v1

PROJECT_ID = os.getenv('GCP_PROJECT')
client = pubsub_v1.PublisherClient()

topic_id = "topic_2"

topic_path = client.topic_path(PROJECT_ID, topic_id)

fruit_lst = ["Apple", "gorilla", "rap"]

for fruit in fruit_lst:
    data = fruit.encode()
    client.publish(topic_path, data=data)

References

https://cloud.google.com/solutions/streaming-data-from-cloud-storage-into-bigquery-using-cloud-functions?hl=ja https://cloud.google.com/functions/quotas?hl=ja#scalability

Digression

If you also encode () the list and dict, pass it to Pub / Sub, and eval it on the receiving side, it will be restored, so it was convenient when you want to perform multiple light processes on a regular basis.

Recommended Posts

GCP: Repeat from Pub / Sub to Cloud Functions, Cloud Functions to Pub / Sub
How to call Cloud API from GCP Cloud Functions
GCP: Link Functions and Pub / Sub
Emulate GCP Cloud functions locally
[GCP] How to output Cloud Functions log to Cloud Logging (Stackdriver Logging) (Python)
Ssh connect to GCP from Windows
Changes to run "Using Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." with SDK 2.1
Post a message from IBM Cloud Functions to Slack in Python
How to connect to Cloud Firestore from Google Cloud Functions with python code
How to enable python3 to run when sending jobs from GCP Cloud Composer to Dataflow
Sum from 1 to 10
[GCP] [Python] Deploy API serverless with Google Cloud Functions!
Send log data from the server to Splunk Cloud
From python to running instance on google cloud platform