[JAVA] Spring AMQP & WebSocket & Stomp (SockJS) --Message queuing with RabbitMQ

Preface

・ Spring has just started recently, and there may be strange descriptions (I have verified the operation for the time being) ・ Since it is a collection of googled results, I think that it is quoted from various places without knowing it (mainly the official website, Qiita and Stack Overflow) ・ Basic, personal memo purpose ・ The code is just an act of exposure ・ You can do it in the framework class here.

Purpose

I wanted to chat with RabbitMQ via Spring. This time, we only implemented the basic functions and did not consider security.

Library and version

Spring Boot 1.5.3 Spring AMQP Spring MVC Spring WebSocket RabbitMQ 3.6.9 stomp-websocket 2.3.3 sockjs-client 1.1.2

What to prepare

  1. AmqpConfig class
  2. A class that inherits AbstractWebSocketMessageBrokerConfigurer and implements WebSocketConfigurer
  3. RestController
  4. HTML by yourself

Conceptual diagram

amqp.png

At login, use RabbiAdmin to create a Queue on RabbitMQ and bind it to Exchange. Spring WebSocket relays transmission and reception between Stomp and RabbitMQ.

Implementation

AMQP configuration class

AmqpConfig.java


@Configuration
public class AmqpConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);

    @Bean
    public ConnectionFactory connectionFactory() {
        
        CachingConnectionFactory connectionFactory =  new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    //Necessary when sending a message, I forgot to write
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    //Required when sending commands to create a Queue or Exchange to RabbitMQ
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    //This guy will route it for you.
    @Bean
    public DirectExchange direct() {
        return new DirectExchange("direct");
    }
}

WebSocket setting class

WebSocketConfig.java


@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

    //WebSocket endpoint
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat").withSockJS();
    }

    //Relay communication between Stomp and RabbitMQ
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
        //Receive a request on the Rest controller
        registry.setApplicationDestinationPrefixes("/app");
        registry.setPathMatcher(new AntPathMatcher("."));
    }

    //This didn't mean
    /*
    //Apply HTTP session check to WebSocket
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new TextWebSocketHandler(), "/chat")
            .addInterceptors(new HttpSessionHandshakeInterceptor())
                .withSockJS();
    }
    */
    
    @Override
    public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
        messageConverters.add(new StringMessageConverter());
        return false;
    }
}

RestController class

ChatRestController.java


@RestController
public class ChatRestController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);
    
    @Autowired
    private AmqpAdmin amqpAdmin;
    
    //I forgot this too
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private DirectExchange direct;
    
    //Create a queue at login
    @RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String login(@RequestBody String json, Locale locale) {
        ...
        addQueue(new Queue(username));
        ...
    }
    
    public void addQueue(Queue queue) {
        amqpAdmin.declareQueue(queue);
    }

    //Enter the room (bind your Queue to Exchange)
    @RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
    public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
        String responseJson = "";
        ObjectMapper mapper = new ObjectMapper();
        try {
            RoomEntry room = mapper.readValue(json, RoomEntry.class);
            amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        return responseJson;
    }
    
    @MessageMapping("/message")
    public void sendMessage(String jsonText, Principal principal) {
        //LOGGER.debug(jsonText);
        ObjectMapper mapper = new ObjectMapper();
        String result = "";
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
        try {
            ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
            json.setFrom(principal.getName());
            json.setTime(sdf.format(new Date()));
            result = mapper.writeValueAsString(json);
            rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
        }
        catch(JsonParseException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(JsonProcessingException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
        catch(IOException ex) {
            LOGGER.error(ex.getMessage(), ex.getCause(), ex);
        }
    }
}

Client JS

chat.js


var mq_username = "guest",
    mq_password = "guest",
    mq_vhost    = "/",
    mq_url      = 'chat',
    mq_exchange = '/app/message'
    mq_queue    = '/queue/';
var client;
//Successful connection callback
function on_connect() {
    //Start Queue Listen
    client.subscribe(mq_queue + $("#user").val(), on_message);
    //Connect (enter) button click event
    $("#btn-room").on("click", function(e) {
        enterRoom();
    });
    //Submit button click event
    $("#btn-chat").on("click", function(e) {
        sendMessage(); 
    });
}

function on_connect_error() {
    console.log('Connection failed');
}

//Receive message
function on_message(msg) {
    console.log("messge reveived");
    console.log(msg.body);
    var obj = JSON.parse(msg.body);
    $(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}

//Send Messege
function sendMessage() {
    console.log("send message");
    var msg = new Object();
    msg.message = $("#message-input").val();
    msg.to = $("#send-room").val();
    client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}

//Enter the room
function enterRoom() {
    $.ajax({
        url: "api/enterroom",
        type:'POST',
        dataType: 'json',
        contentType: "application/json; charset=UTF-8",
        data: JSON.stringify({room : $("#enter-room").val()}),
        timeout:10000,
        beforeSend: function(xhr) {
            xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
        },
        success: function(data) {
        },
        error: function(XMLHttpRequest, textStatus, errorThrown) {
        }
    });
}

$(function() {
    Stomp.WebSocketClass = SockJS;
    client = Stomp.client(mq_url);
    //Connect to server
    client.connect(
        mq_username,
        mq_password,
        on_connect,
        on_connect_error,
        mq_vhost
    );
});

Sample project

https://github.com/yossypandamaster/sample-spring-amqp

Execution result

user1(Chrome) chatchrome.png user2(Firefox) chatfirefox.png

Recommended Posts

Spring AMQP & WebSocket & Stomp (SockJS) --Message queuing with RabbitMQ
Process Communication using AMQP with RabbitMQ
Message cooperation started with Spring Boot