・ Spring has just started recently, and there may be strange descriptions (I have verified the operation for the time being) ・ Since it is a collection of googled results, I think that it is quoted from various places without knowing it (mainly the official website, Qiita and Stack Overflow) ・ Basic, personal memo purpose ・ The code is just an act of exposure ・ You can do it in the framework class here.
I wanted to chat with RabbitMQ via Spring. This time, we only implemented the basic functions and did not consider security.
Spring Boot 1.5.3 Spring AMQP Spring MVC Spring WebSocket RabbitMQ 3.6.9 stomp-websocket 2.3.3 sockjs-client 1.1.2
At login, use RabbiAdmin to create a Queue on RabbitMQ and bind it to Exchange. Spring WebSocket relays transmission and reception between Stomp and RabbitMQ.
AmqpConfig.java
@Configuration
public class AmqpConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfig.class);
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
//Necessary when sending a message, I forgot to write
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
//Required when sending commands to create a Queue or Exchange to RabbitMQ
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
//This guy will route it for you.
@Bean
public DirectExchange direct() {
return new DirectExchange("direct");
}
}
WebSocketConfig.java
@Configuration
@EnableWebSocket
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer implements WebSocketConfigurer {
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
//WebSocket endpoint
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat").withSockJS();
}
//Relay communication between Stomp and RabbitMQ
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/exchange/", "/queue/").setRelayHost("localhost");
//Receive a request on the Rest controller
registry.setApplicationDestinationPrefixes("/app");
registry.setPathMatcher(new AntPathMatcher("."));
}
//This didn't mean
/*
//Apply HTTP session check to WebSocket
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new TextWebSocketHandler(), "/chat")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.withSockJS();
}
*/
@Override
public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
messageConverters.add(new StringMessageConverter());
return false;
}
}
ChatRestController.java
@RestController
public class ChatRestController {
private static final Logger LOGGER = LoggerFactory.getLogger(ChatRestController.class);
@Autowired
private AmqpAdmin amqpAdmin;
//I forgot this too
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private DirectExchange direct;
//Create a queue at login
@RequestMapping(value="/api/login", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
public String login(@RequestBody String json, Locale locale) {
...
addQueue(new Queue(username));
...
}
public void addQueue(Queue queue) {
amqpAdmin.declareQueue(queue);
}
//Enter the room (bind your Queue to Exchange)
@RequestMapping(value="/api/enterroom", method=RequestMethod.POST, produces="application/json; charset=UTF-8")
public String enterRoom(@RequestBody String json, Principal principal, Locale locale) {
String responseJson = "";
ObjectMapper mapper = new ObjectMapper();
try {
RoomEntry room = mapper.readValue(json, RoomEntry.class);
amqpAdmin.declareBinding(BindingBuilder.bind(new Queue(principal.getName())).to(direct).with(room.getRoom()));
}
catch(JsonProcessingException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
catch(IOException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
return responseJson;
}
@MessageMapping("/message")
public void sendMessage(String jsonText, Principal principal) {
//LOGGER.debug(jsonText);
ObjectMapper mapper = new ObjectMapper();
String result = "";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd hh:mm:ss");
try {
ChatMessage json = mapper.readValue(jsonText, ChatMessage.class);
json.setFrom(principal.getName());
json.setTime(sdf.format(new Date()));
result = mapper.writeValueAsString(json);
rabbitTemplate.convertAndSend(direct.getName(), json.getTo(), result);
}
catch(JsonParseException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
catch(JsonProcessingException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
catch(IOException ex) {
LOGGER.error(ex.getMessage(), ex.getCause(), ex);
}
}
}
chat.js
var mq_username = "guest",
mq_password = "guest",
mq_vhost = "/",
mq_url = 'chat',
mq_exchange = '/app/message'
mq_queue = '/queue/';
var client;
//Successful connection callback
function on_connect() {
//Start Queue Listen
client.subscribe(mq_queue + $("#user").val(), on_message);
//Connect (enter) button click event
$("#btn-room").on("click", function(e) {
enterRoom();
});
//Submit button click event
$("#btn-chat").on("click", function(e) {
sendMessage();
});
}
function on_connect_error() {
console.log('Connection failed');
}
//Receive message
function on_message(msg) {
console.log("messge reveived");
console.log(msg.body);
var obj = JSON.parse(msg.body);
$(".chat").append('<li class="left clearfix"><div class="chat-body clearfix"><div class="header"><strong class="primary-font">' + obj.from + '</strong><small class="pull-right text-muted">' + obj.time + '</small><p>' + obj.message + '</p></div></div></li>');
}
//Send Messege
function sendMessage() {
console.log("send message");
var msg = new Object();
msg.message = $("#message-input").val();
msg.to = $("#send-room").val();
client.send(mq_exchange, {"content-type":"text/plain"}, JSON.stringify(msg));
}
//Enter the room
function enterRoom() {
$.ajax({
url: "api/enterroom",
type:'POST',
dataType: 'json',
contentType: "application/json; charset=UTF-8",
data: JSON.stringify({room : $("#enter-room").val()}),
timeout:10000,
beforeSend: function(xhr) {
xhr.setRequestHeader($("meta[name='_csrf_header']").attr("content"), $("meta[name='_csrf']").attr("content"));
},
success: function(data) {
},
error: function(XMLHttpRequest, textStatus, errorThrown) {
}
});
}
$(function() {
Stomp.WebSocketClass = SockJS;
client = Stomp.client(mq_url);
//Connect to server
client.connect(
mq_username,
mq_password,
on_connect,
on_connect_error,
mq_vhost
);
});
https://github.com/yossypandamaster/sample-spring-amqp
user1(Chrome) user2(Firefox)