Lorsque j'ai contribué à Keda, j'avais besoin de connaître le comportement de Service Bus, ce que je n'avais pas beaucoup touché, alors j'ai fait une rapide enquête. La bibliothèque utilise les éléments suivants.
Queue Client
Authentifiez-vous pour que la file d'attente puisse être utilisée. ServiceBus et Queue sont créés à l'avance. Puisque ConnectionString
de l'espace de noms ServiceBus est requis pour la connexion, il est acquis et transmis à partir de la variable d'environnement. Une instance de NameSpace
est créée en passant ConnectionString
, et NameSpace
crée un client Queue avec la méthode NewQueue
. C'est très simple et agréable.
func main() {
fmt.Println("Azure ServiceBus Queue Sender")
connectionString := os.Getenv("ConnectionString")
queueName := os.Getenv("queueName")
if len(os.Args) != 2 {
log.Fatalf("Specify the counter parameter. e.g. send 100 Parameter length: %d\n", len(os.Args))
}
count, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Fatalf("count should be integer : %s", os.Args[1])
}
// Create a client to communicate with a Service Bus Namespace
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connectionString))
if err != nil {
log.Fatal("Cannot create a client for the Service Bus Namespace", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Create a client to communicate with the queue
q, err := ns.NewQueue(queueName)
if err != nil {
log.Fatal("Cannot create a client for the queue", err)
}
Send Message
Pour envoyer un message, utilisez la méthode Send
sur l'objet Queue. Si vous voulez envoyer en masse, une erreur `func (q * Queue) SendBatch (ctx context.Context, iterator BatchIterator) 'existe, vous pouvez donc l'utiliser.
for i := 0; i < count; i++ {
err = q.Send(ctx, servicebus.NewMessageFromString("Hello!"))
fmt.Printf("Send Hello %d \n", i)
if err != nil {
log.Fatal(err)
}
}
Receive Message
Recevoir un message est également très simple, si vous passez un gestionnaire pour le rappel, le gestionnaire sera exécuté lorsque le message arrivera. Bien qu'il semble asynchrone pendant un moment, ReceiveOne
se comporte comme un blocage jusqu'à ce que le contrôle revienne au gestionnaire. Le point est la dernière méthode message.Complete ()
. Par défaut, la file d'attente Service Bus est PeekLock (https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock). Lorsque vous recevez un message avec «Recevoir», le message est verrouillé et invisible pour les autres clients. Et si le processus réussit, le message sera supprimé lorsque «Complete» sera émis. Si cela ne fonctionne pas, émettre ʻAbandon` supposera que l'exécution du message a échoué. S'il échoue un certain nombre de fois, le message est automatiquement transmis à DeadQueue.
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(string(message.Data))
return message.Complete(ctx)
}))
DeadLetter
L'envoi d'un message à DeadLetter est automatiquement transmis à DeadLetter dans les cas suivants: Déplacement des messages vers le DLQ ..
Cependant, il peut y avoir d'autres cas où vous voudrez peut-être l'envoyer à DeadLetter pour la commodité de votre application. Dans ce cas, utilisez le DeadLetter ()
du Message pour transférer vers DeadLetter. Ce comportement ne fonctionne qu'en mode PeekLock
et ne peut pas être utilisé en mode ReceiveAndDelete
, ce qui sera expliqué plus loin.
Personnellement, DeadLetterWithInfo ()
est meilleur que DeadLetter ()
. La raison est que vous pouvez transmettre une propriété personnalisée et l'avoir comme métadonnées, par exemple, pourquoi vous avez entré DeadLetter. J'ai également passé des propriétés telles que ʻerror et
servicebus.ErrorInternalError`, mais je ne suis toujours pas sûr de savoir où regarder ces métadonnées. Au moins, ce serait mieux si vous pouviez ajouter des informations de chargement.
err = q.ReceiveOne(
ctx,
servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
fmt.Println(message.LockToken.String()) // It will be null when the ReceiveAndDelete mode
fmt.Println(string(message.Data))
fmt.Println("Waiting...")
time.Sleep(time.Second * 10)
fmt.Println("Done.")
m := make(map[string]string)
m["rootCause"] = "hagechabin"
return message.DeadLetterWithInfo(ctx, errors.New("go to deadletter"), servicebus.ErrorInternalError, m) // Doesn't work for ReceiveAndDelete
}))
Si vous le regardez dans Service Bus Explorer, vous pouvez lire les métadonnées fermement.
DeadLetterOnMessageExpiration
Vous pouvez le configurer pour qu'il accède automatiquement à la lettre morte lorsque le message expire, mais ce n'est pas la valeur par défaut. Il est bon de le définir pour l'instance appelée QueueManager
. Cependant, si vous faites cela deux fois, vous obtiendrez une erreur «Conflit». En fait, il semble bon de créer une file d'attente avec un modèle ARM lors de la création d'un ServiceBus. Cette API semble être une API ARM, et si vous regardez Create
, elle a BODY. Passez probablement le modèle ARM à Body. Au fait, Put n'a pas de corps, mais il le créera s'il n'existe pas, donc cela semble être mieux lors de l'écriture d'un test. Si vous le supprimez et le faites avec Put, je ne pense pas que vous obtiendrez l'erreur Conflict
.
qm := ns.NewQueueManager()
qe, err := qm.Put(ctx, "abc", servicebus.QueueEntityWithDeadLetteringOnMessageExpiration()) // Doesn't work
if err != nil {
fmt.Println("error :", err.Error())
} else {
fmt.Println("endity anme: ", qe.Name)
}
Une fois définis, vous pouvez voir que les paramètres ont été modifiés comme suit.
ReceiveAndDelete Mode
PeekLock
met d'abord le verrou sur le message, puis le supprime avec Complete
, mais ReceiveAndDelete
supprime le message de la file d'attente lorsqu'il est reçu. Vous ne pouvez donc pas le transférer vous-même vers DeadLetter. L'API Queue de DeadLette est également disponible, mais comme elle est en lecture seule, je ne veux pas que vous écriviez vous-même le processus d'envoi à DeadLetter avec ReceiveAndDelete
. Si vous voulez vraiment le faire, vous devez créer une file d'attente personnalisée et l'exécuter là-bas. La modification des paramètres est facile, spécifiez simplement les options suivantes lors de la création d'un client de file d'attente.
q, err := ns.NewQueue(queueName, servicebus.QueueWithReceiveAndDelete())
Active Message Count
En fait, toute cette enquête visait à enquêter sur le nombre de messages. Utiliser * entity.CountDetails.ActiveMessageCount
obtiendra le nombre de messages dans la file d'attente, mais le problème est qu'il compte également le nombre de messages actuellement verrouillés.
J'écris un KEDA Scaler, j'ai donc besoin d'un certain nombre de messages qui ne sont pas actuellement traités. Que devrais-je faire? Malheureusement, aucune méthode n'a été trouvée jusqu'à présent. J'ai utilisé Peek
et il y avait LockToken
, alors j'ai essayé de limiter cela au nombre qui n'apparaît pas dans nil
, mais il semble que le côté récepteur était verrouillé. Les chiffres sont différents, donc cela semble un peu différent.
S'il s'agit de votre propre application personnalisée, lorsque vous recevez le message, écrivez l'ID sur la carte, supprimez-la avec Complete ou Abandan, et reportez-vous à celle-ci lorsque vous l'obtenez avec Peek. Cependant, la structure de l'application que j'écris est abstraite, et la logique d'échelle et le scaler pour les ressources individuelles sont abstraits, et s'il s'agit de StorageQueue, etc., il peut être compté correctement. Donc ça n'a pas d'importance.
En guise de contre-mesure, si vous définissez ReceiveAndDelete
du côté qui reçoit le message, le nombre de décomptes sera raisonnable, mais généralement vous voulez utiliser PeekLock
, donc c'est délicat. J'aimerais continuer à enquêter et mettre à jour ce blog lorsque je le découvrirai.
m := ns.NewQueueManager()
ctx2 := context.Background()
for {
entity, _ := m.Get(ctx2, queueName)
fmt.Println("ActiveMessageCount: ", *entity.CountDetails.ActiveMessageCount)
ctx := context.Background()
iterator, _ := q.Peek(ctx)
var c int
for {
item, _ := iterator.Next(ctx)
if item != nil && item.LockToken != nil {
fmt.Printf("lockToken: %s \n", item.LockToken.String())
} else {
if item != nil {
fmt.Println("lockToken: nil")
c++
} else {
fmt.Println("item is nil")
}
}
if item != nil {
body, _ := json.Marshal(item)
fmt.Println(string(body))
}
if item == nil {
iterator.Done()
break
}
}
fmt.Println("Count: ", c)
time.Sleep(time.Second * 2)
}
Je n'ai pas pu vérifier comment obtenir le décompte autre que le message verrouillé, ce qui était le but de l'enquête, mais je pouvais confirmer la connaissance périphérique de ServiceBus et le comportement de Go SDK, alors je l'ai résumé dans le blog. J'ai mis le code source dans la ressource, vous pouvez donc l'essayer si vous le souhaitez.
Resource
Recommended Posts