[JAVA] Spring AMQP & WebSocket & Stomp (SockJS) - Nachrichtenwarteschlange mit RabbitMQ

Vorwort

・ Der Frühling hat gerade erst begonnen, und es kann seltsame Beschreibungen geben (ich habe den Vorgang vorerst überprüft). ・ Da es sich um eine Sammlung gegoogelter Ergebnisse handelt, wird sie meiner Meinung nach an verschiedenen Stellen zitiert, ohne es zu wissen (hauptsächlich auf der offiziellen Website, Qiita und Stack Overflow). ・ Grundlegender, persönlicher Memo-Zweck ・ Der Code ist nur ein Akt der Belichtung ・ Sie können dies hier in der Framework-Klasse tun.

Zweck

Ich wollte über Spring mit Rabbit MQ chatten. Dieses Mal haben wir nur die Grundfunktionen implementiert und die Sicherheit nicht berücksichtigt.

Bibliothek und Version

Spring Boot 1.5.3 Spring AMQP Spring MVC Spring WebSocket RabbitMQ 3.6.9 stomp-websocket 2.3.3 sockjs-client 1.1.2

Was vorzubereiten?

  1. AmqpConfig-Klasse
  2. Eine Klasse, die AbstractWebSocketMessageBrokerConfigurer erbt und WebSocketConfigurer implementiert
  3. RestController
  4. HTML selbst

Konzeptdiagramm

amqp.png

Verwenden Sie bei der Anmeldung RabbiAdmin, um eine Warteschlange in RabbitMQ zu erstellen und an Exchange zu binden. Spring Web Socket leitet das Senden und Empfangen zwischen Stomp und Rabbit MQ weiter.

Implementierung

AMQP-Konfigurationsklasse

AmqpConfig.java


@Configuration
public class AmqpConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        
        CachingConnectionFactory connectionFactory =  new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    //Notwendig beim Senden einer Nachricht habe ich vergessen zu schreiben
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    //Erforderlich beim Senden von Befehlen zum Erstellen einer Warteschlange oder eines Austauschs an Rabbit MQ
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    //Dieser Typ wird es für Sie routen.
    @Bean
    public DirectExchange direct() {
        return new DirectExchange("direct");
    }
}

WebSocket-Konfigurationsklasse

WebSocketConfig.java


@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

    //WebSocket-Endpunkt
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat").withSockJS();
    }

    //Relaiskommunikation zwischen Stomp und Rabbit MQ
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
        //Empfangen Sie eine Anfrage auf dem Rest-Controller
        registry.setApplicationDestinationPrefixes("/app");
        registry.setPathMatcher(new AntPathMatcher("."));
    }

    //Das bedeutete nicht
    /*
    //Wenden Sie die HTTP-Sitzungsprüfung auf WebSocket an
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new TextWebSocketHandler(), "/chat")
            .addInterceptors(new HttpSessionHandshakeInterceptor())
                .withSockJS();
    }
    */
    
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        messageConverters.add(new StringMessageConverter());
        return false;
    }
}

RestController-Klasse

ChatRestController.java


@RestController
public class ChatRestController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);
    
    @Autowired
    private AmqpAdmin amqpAdmin;
    
    //Das habe ich auch vergessen
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private DirectExchange direct;
    
    //Erstellen Sie beim Anmelden eine Warteschlange
    @RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String login(@RequestBody String json, Locale locale) {
        ...
        addQueue(new Queue(username));
        ...
    }
    
    public void addQueue(Queue queue) {
        amqpAdmin.declareQueue(queue);
    }

    //Betreten Sie den Raum (binden Sie Ihre Warteschlange an Exchange)
    @RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
        String responseJson = "";
        ObjectMapper mapper = new ObjectMapper();
        try {
            RoomEntry room = mapper.readValue(json, RoomEntry.class);
            amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        return responseJson;
    }
    
    @MessageMapping("/message")
    public void sendMessage(String jsonText, Principal principal) {
        //LOGGER.debug(jsonText);
        ObjectMapper mapper = new ObjectMapper();
        String result = "";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
        try {
            ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
            json.setFrom(principal.getName());
            json.setTime(sdf.format(new Date()));
            result = mapper.writeValueAsString(json);
            rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
        }
        catch(JsonParseException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
    }
}

Client JS

chat.js


var mq_username = "guest",
    mq_password = "guest",
    mq_vhost    = "/",
    mq_url      = 'chat',
    mq_exchange = '/app/message'
    mq_queue    = '/queue/';
var client;
//Erfolgreicher Verbindungsrückruf
function on_connect() {
    //Starten Sie Listen for Queue
    client.subscribe(mq_queue + $("#user").val(), on_message);
    //Klickereignis zum Verbinden (Enter)
    $("#btn-room").on("click", function(e) {
        enterRoom();
    });
    //Schaltfläche zum Klicken auf die Schaltfläche "Senden"
    $("#btn-chat").on("click", function(e) {
        sendMessage(); 
    });
}

function on_connect_error() {
    console.log('Connection failed');
}

//Erhalte Nachricht
function on_message(msg) {
    console.log("messge reveived");
    console.log(msg.body);
    var obj = JSON.parse(msg.body);
    $(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}

//Messege senden
function sendMessage() {
    console.log("send message");
    var msg = new Object();
    msg.message = $("#message-input").val();
    msg.to = $("#send-room").val();
    client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}

//Gehe in den Raum hinein
function enterRoom() {
    $.ajax({
        url: "api/enterroom",
        type:'POST',
        dataType: 'json',
        contentType: "application/json; charset=UTF-8",
        data: JSON.stringify({room : $("#enter-room").val()}),
        timeout:10000,
        beforeSend: function(xhr) {
            xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
        },
        success: function(data) {
        },
        error: function(XMLHttpRequest, textStatus, errorThrown) {
        }
    });
}

$(function() {
    Stomp.WebSocketClass = SockJS;
    client = Stomp.client(mq_url);
    //Verbinden zum Server
    client.connect(
        mq_username,
        mq_password,
        on_connect,
        on_connect_error,
        mq_vhost
    );
});

Beispielprojekt

https://github.com/yossypandamaster/sample-spring-amqp

Ausführungsergebnis

user1(Chrome) chatchrome.png user2(Firefox) chatfirefox.png

Recommended Posts

Spring AMQP & WebSocket & Stomp (SockJS) - Nachrichtenwarteschlange mit RabbitMQ
Prozesskommunikation mit AMQP mit RabbitMQ
Die Nachrichtenkooperation begann mit Spring Boot