[JAVA] Spring AMQP & WebSocket & Stomp (SockJS) - Mise en file d'attente des messages à l'aide de RabbitMQ

Préface

・ Le printemps vient de commencer récemment, et il peut y avoir des descriptions étranges (j'ai vérifié l'opération pour le moment) ・ Puisqu'il s'agit d'une collection de résultats googlés, je pense qu'il est cité de divers endroits sans le savoir (principalement le site officiel, Qiita et Stack Overflow) ・ But de mémo personnel de base ・ Le code n'est qu'un acte d'exposition ・ Vous pouvez le faire dans la classe framework ici.

Objectif

Je voulais discuter avec Rabbit MQ via Spring. Cette fois, nous n'avons implémenté que les fonctions de base et n'avons pas tenu compte de la sécurité.

Bibliothèque et version

Spring Boot 1.5.3 Spring AMQP Spring MVC Spring WebSocket RabbitMQ 3.6.9 stomp-websocket 2.3.3 sockjs-client 1.1.2

Quoi préparer

  1. Classe AmqpConfig
  2. Une classe qui hérite de AbstractWebSocketMessageBrokerConfigurer et implémente WebSocketConfigurer
  3. RestController
  4. HTML par vous-même

Diagramme conceptuel

amqp.png

Lors de la connexion, utilisez RabbiAdmin pour créer une file d'attente sur RabbitMQ et la lier à Exchange. Spring Web Socket relaie la transmission et la réception entre Stomp et Rabbit MQ.

la mise en oeuvre

Classe de configuration AMQP

AmqpConfig.java


@Configuration
public class AmqpConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        
        CachingConnectionFactory connectionFactory =  new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    //Nécessaire lors de l'envoi d'un message, j'ai oublié d'écrire
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    //Requis lors de l'envoi de commandes pour créer une file d'attente ou un échange vers Rabbit MQ
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    //Ce type vous le fera suivre.
    @Bean
    public DirectExchange direct() {
        return new DirectExchange("direct");
    }
}

Classe de configuration WebSocket

WebSocketConfig.java


@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

    //Point de terminaison WebSocket
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat").withSockJS();
    }

    //Communication relais entre Stomp et Rabbit MQ
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
        //Recevoir la demande sur le contrôleur de repos
        registry.setApplicationDestinationPrefixes("/app");
        registry.setPathMatcher(new AntPathMatcher("."));
    }

    //Cela ne voulait pas dire
    /*
    //Appliquer la vérification de session HTTP à WebSocket
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new TextWebSocketHandler(), "/chat")
            .addInterceptors(new HttpSessionHandshakeInterceptor())
                .withSockJS();
    }
    */
    
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        messageConverters.add(new StringMessageConverter());
        return false;
    }
}

Classe RestController

ChatRestController.java


@RestController
public class ChatRestController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);
    
    @Autowired
    private AmqpAdmin amqpAdmin;
    
    //J'ai oublié ça aussi
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private DirectExchange direct;
    
    //Créer une file d'attente lors de la connexion
    @RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String login(@RequestBody String json, Locale locale) {
        ...
        addQueue(new Queue(username));
        ...
    }
    
    public void addQueue(Queue queue) {
        amqpAdmin.declareQueue(queue);
    }

    //Entrez dans la salle (liez votre file d'attente à Exchange)
    @RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
        String responseJson = "";
        ObjectMapper mapper = new ObjectMapper();
        try {
            RoomEntry room = mapper.readValue(json, RoomEntry.class);
            amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        return responseJson;
    }
    
    @MessageMapping("/message")
    public void sendMessage(String jsonText, Principal principal) {
        //LOGGER.debug(jsonText);
        ObjectMapper mapper = new ObjectMapper();
        String result = "";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
        try {
            ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
            json.setFrom(principal.getName());
            json.setTime(sdf.format(new Date()));
            result = mapper.writeValueAsString(json);
            rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
        }
        catch(JsonParseException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
    }
}

Client JS

chat.js


var mq_username = "guest",
    mq_password = "guest",
    mq_vhost    = "/",
    mq_url      = 'chat',
    mq_exchange = '/app/message'
    mq_queue    = '/queue/';
var client;
//Rappel de connexion réussi
function on_connect() {
    //Démarrer l'écoute de la file d'attente
    client.subscribe(mq_queue + $("#user").val(), on_message);
    //Événement de clic sur le bouton Connexion (entrée)
    $("#btn-room").on("click", function(e) {
        enterRoom();
    });
    //Soumettre l'événement de clic sur le bouton
    $("#btn-chat").on("click", function(e) {
        sendMessage(); 
    });
}

function on_connect_error() {
    console.log('Connection failed');
}

//Recevoir un message
function on_message(msg) {
    console.log("messge reveived");
    console.log(msg.body);
    var obj = JSON.parse(msg.body);
    $(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}

//Envoyer un message
function sendMessage() {
    console.log("send message");
    var msg = new Object();
    msg.message = $("#message-input").val();
    msg.to = $("#send-room").val();
    client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}

//Entrez dans la pièce
function enterRoom() {
    $.ajax({
        url: "api/enterroom",
        type:'POST',
        dataType: 'json',
        contentType: "application/json; charset=UTF-8",
        data: JSON.stringify({room : $("#enter-room").val()}),
        timeout:10000,
        beforeSend: function(xhr) {
            xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
        },
        success: function(data) {
        },
        error: function(XMLHttpRequest, textStatus, errorThrown) {
        }
    });
}

$(function() {
    Stomp.WebSocketClass = SockJS;
    client = Stomp.client(mq_url);
    //Connecter au serveur
    client.connect(
        mq_username,
        mq_password,
        on_connect,
        on_connect_error,
        mq_vhost
    );
});

Exemple de projet

https://github.com/yossypandamaster/sample-spring-amqp

Résultat d'exécution

user1(Chrome) chatchrome.png user2(Firefox) chatfirefox.png

Recommended Posts

Spring AMQP & WebSocket & Stomp (SockJS) - Mise en file d'attente des messages à l'aide de RabbitMQ
Communication de processus utilisant AMQP avec RabbitMQ
La coopération des messages a commencé avec Spring Boot