Utilisez-vous Cloud Pub / Sub? Il est utilisé lorsque vous souhaitez gérer un grand nombre de messages. S'agit-il de l'ordre dans lequel les messages sont envoyés, lorsque vous souhaitez le garantir ou non? Dans le cas d'AWS, je pense que vous devriez utiliser la [FIFO Queue] de SQS (https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html). (J'avais essayé de l'utiliser dans le passé, mais je me souviens que je ne l'ai pas utilisé après tout parce que je n'avais pas encore la région de Tokyo et j'étais pressé par la limite de 3000 messages par seconde.) Je pense que c'est Cloud Pub / Sub dans GCP si cela correspond à SQS, mais malheureusement il n'y a pas de file d'attente FIFO. Si vous pensez (Version bêta) Il semble que la fonction "Spécifier l'ordre des messages" a été créée. Je me suis demandé si je pouvais m'abonner aux messages dans l'ordre de leur publication, alors j'ai essayé.
Cependant, il s'agit plutôt d'une méthode d'essai, donc pour le moment, si vous essayez ce type d'essai, vous pouvez comprendre que c'est le résultat.
--Je connais GCP.
GOOGLE_APPLICATION_CREDENTIALS
.$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"
$ go version
go version go1.15.2 linux/amd64
IDE - Goland
GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020
https://github.com/sky0621/go-publisher/tree/v0.1.0 https://github.com/sky0621/go-subscriber/tree/v0.1.0
go-publisher
Préparez 5 points de terminaison.
Les parties de topic.EnableMessageOrdering = true
et ʻOrderingKey: operationSequence,` sont en fait les codes nécessaires pour "commander des messages".
Cependant, le sujet («mon-sujet-normal») qui publie des messages avec cette source correspond à un abonnement qui n'est pas pour «l'ordre des messages», donc «l'ordre des messages» ne fonctionne pas.
main.go
package main
import (
"fmt"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/pubsub"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
project := os.Getenv("PUB_PROJECT")
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.GET("/order01", handler(project, "order01"))
e.GET("/order02", handler(project, "order02"))
e.GET("/order03", handler(project, "order03"))
e.GET("/order04", handler(project, "order04"))
e.GET("/order05", handler(project, "order05"))
e.Logger.Fatal(e.Start(":8080"))
}
func handler(project, path string) func(c echo.Context) error {
return func(c echo.Context) error {
ctx := c.Request().Context()
operationSequence := createOperationSequence()
client, err := pubsub.NewClient(ctx, project)
if err != nil {
log.Fatal(err)
}
topic := client.Topic("my-normal-topic")
defer topic.Stop()
topic.EnableMessageOrdering = true
message := &pubsub.Message{
OrderingKey: operationSequence,
Data: []byte(path + ":" + operationSequence),
}
r := topic.Publish(ctx, message)
if r == nil {
log.Fatal("failed to topic.Publish!")
}
log.Printf("%+v", r)
return c.String(http.StatusOK, path+":"+operationSequence)
}
}
func createOperationSequence() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
go-subscriber
main.go
package main
import (
"encoding/json"
"io"
"io/ioutil"
"log"
"net/http"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.POST("/", func(c echo.Context) error {
m, err := unmarshal(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
data := string(m.Message.Data)
log.Printf("fs_Received__Data:%s", data)
return c.String(http.StatusOK, "OK")
})
e.Logger.Fatal(e.Start(":8080"))
}
type PubSubMessage struct {
Message struct {
Data []byte `json:"data,omitempty"`
ID string `json:"id"`
} `json:"message"`
Subscription string `json:"subscription"`
}
func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
var m PubSubMessage
body, err := ioutil.ReadAll(r)
if err != nil {
log.Printf("ioutil.ReadAll: %v", err)
return nil, err
}
if err := json.Unmarshal(body, &m); err != nil {
log.Printf("json.Unmarshal: %v", err)
return nil, err
}
return &m, nil
}
C'est assez dur.
#!/usr/bin/env bash
set -euox pipefail
for i in {0..15}
do
for j in {1..5}
do
curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
sleepenh 0.005
done
sleepenh 0.005
done
Un service avec une bouche REST est placé sur Cloud Run, et lorsque l'accès arrive, il est publié sur Cloud Pub / Sub, et l'abonnement préparé en tant que type push est également placé sur Cloud Run. Passer la demande au service avec.
Les messages atteignent les points de terminaison à plusieurs reprises dans l'ordre suivant, et bien qu'ils soient dans cet ordre dans le journal de l'éditeur, ils ne sont pas dans un ordre particulier dans le journal des abonnés. /order01 /order02 /order03 /order04 /order05
Bien sûr, c'est dans l'ordre sur le journal émis par le shell qui atteint le point final.
Puisqu'il y a une condition qui «existe dans la même région», spécifiez la région du sujet.
Bien entendu, "Spécifier l'ordre des messages" doit être "Activé".
Après l'avoir essayé plusieurs fois, il y a eu des cas où l'ordre d'abonnement a été modifié au début, mais après cela, l'ordre était correct. L'ordre est plus stable que la combinaison Sujet / Abonnement normale, qui était plutôt dans le désordre.
Même dans le mode "spécification de l'ordre des messages", l'ordre a été changé, donc je ne pouvais pas dire "cela peut être réalisé" pour le sujet. .. .. Que ce soit une question de savoir comment l'essayer, ou peut-être la version bêta, si vous en avez envie, découvrons-le. .. ..