Do you use Cloud Pub / Sub? It is used when you want to handle a large number of messages. Is this the order in which messages are sent, when you want to guarantee it, or not? In the case of AWS, I think you should use SQS's FIFO Queue. (I had tried to use it in the past, but I remember that I didn't use it after all because I didn't have the Tokyo region yet and I was squeezed by the limit of 3000 messages per second. I think that it is Cloud Pub / Sub in GCP if it corresponds to SQS, but unfortunately there is no FIFO queue. If you think (Beta version) It seems that the function "Specify message ordering" was created. I wondered if I could subscribe to the messages in the order they were published, so I tried it.
However, it's a fairly about trial method, so for the time being, if you try this kind of trial, you can get a level of understanding that this is the result.
――I know about GCP. --I have used Cloud Pub / Sub. I don't know what it is. --Golang can be written as it is.
--Go development environment has been built locally.
--GCP contract completed.
--Cloud SDK has been set up locally.
--The key JSON file path (of the service account with all the required permissions) has been set in the local environment variable GOOGLE_APPLICATION_CREDENTIALS
.
$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"
$ go version
go version go1.15.2 linux/amd64
IDE - Goland
GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020
https://github.com/sky0621/go-publisher/tree/v0.1.0 https://github.com/sky0621/go-subscriber/tree/v0.1.0
go-publisher
Prepare 5 endpoints.
The parts of topic.EnableMessageOrdering = true
and ʻOrderingKey: operationSequence, are actually the codes required for "ordering messages". However, Topic (
" my-normal-topic "`), which publishes messages from this source, corresponds to Subscriptions that are not for "message ordering", so "message ordering" does not work.
main.go
package main
import (
"fmt"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/pubsub"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
project := os.Getenv("PUB_PROJECT")
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.GET("/order01", handler(project, "order01"))
e.GET("/order02", handler(project, "order02"))
e.GET("/order03", handler(project, "order03"))
e.GET("/order04", handler(project, "order04"))
e.GET("/order05", handler(project, "order05"))
e.Logger.Fatal(e.Start(":8080"))
}
func handler(project, path string) func(c echo.Context) error {
return func(c echo.Context) error {
ctx := c.Request().Context()
operationSequence := createOperationSequence()
client, err := pubsub.NewClient(ctx, project)
if err != nil {
log.Fatal(err)
}
topic := client.Topic("my-normal-topic")
defer topic.Stop()
topic.EnableMessageOrdering = true
message := &pubsub.Message{
OrderingKey: operationSequence,
Data: []byte(path + ":" + operationSequence),
}
r := topic.Publish(ctx, message)
if r == nil {
log.Fatal("failed to topic.Publish!")
}
log.Printf("%+v", r)
return c.String(http.StatusOK, path+":"+operationSequence)
}
}
func createOperationSequence() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
go-subscriber
main.go
package main
import (
"encoding/json"
"io"
"io/ioutil"
"log"
"net/http"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.POST("/", func(c echo.Context) error {
m, err := unmarshal(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
data := string(m.Message.Data)
log.Printf("fs_Received__Data:%s", data)
return c.String(http.StatusOK, "OK")
})
e.Logger.Fatal(e.Start(":8080"))
}
type PubSubMessage struct {
Message struct {
Data []byte `json:"data,omitempty"`
ID string `json:"id"`
} `json:"message"`
Subscription string `json:"subscription"`
}
func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
var m PubSubMessage
body, err := ioutil.ReadAll(r)
if err != nil {
log.Printf("ioutil.ReadAll: %v", err)
return nil, err
}
if err := json.Unmarshal(body, &m); err != nil {
log.Printf("json.Unmarshal: %v", err)
return nil, err
}
return &m, nil
}
It's pretty rough.
#!/usr/bin/env bash
set -euox pipefail
for i in {0..15}
do
for j in {1..5}
do
curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
sleepenh 0.005
done
sleepenh 0.005
done
Put a service with a REST mouth on Cloud Run, publish it to Cloud Pub / Sub when access comes, and the Subscription prepared with Push type is also a REST mouth put on Cloud Run Skip the request to the service with.
The messages are repeatedly hitting the endpoints in the following order, and although they are in that order on the Publisher log, they are in no particular order on the Subscriber log. /order01 /order02 /order03 /order04 /order05
Of course, it is in order on the log issued by the shell that hits the endpoint.
Since there is a condition that exists in the same region
, specify the Topic region.
Of course, "Specify message order" should be "Enabled".
After trying it several times, there were some cases where the order of subscribing was changed at the start, but after that, the order was correct. Compared to the normal Topic / Subscription combination, which was rather out of order, the order is stable.
Even in the "message order specification" mode, the order was changed, so I couldn't say "it can be realized" for the subject. .. .. Whether this is a matter of how to try it, or maybe it's the Beta version, if you feel like it, let's dig into it. .. ..