- 비동기 메시징은 애플리케이션 간에 응답을 기다리지 않고 간접적으로 메시지를 전송하는 방법으로, 통신하는 애플리케이션 간의 결합도를 낮추고 확장성을 높일 수 있다.
- 스프링이 제공하는 비동기 메시징으로는 JMS, RabbitMQ, AMQP, 아프치 카프카가 있다.
JMS
- JMS는 두 개 이상의 클라이언트 간에 메시지 통신을 위한 공통 API를 정의하는 자바 표준이다.
- 자바로 비동기 메시징을 처리하는 가장 좋은 방법으로, 모든 구현 코드가 공통 인터페이스를 통해 함께 동작할 수 있게 해준다.
- 스프링은 JmsTemplate라는 템플릿 기반의 클래스를 통해 JMS를 지원하며, 메시지 기반의 POJO도 지원한다.
- JmsTemplate를 사용하면 프로듀서가 큐와 토픽에 메시지를 전송하고 컨슈머는 그 메시지들을 받을 수 있다.
- POJO는 큐나 토픽에 도착하는 메시지에 반응하여 비동기 방식으로 메시지를 수신하는 간단한 자바 객체를 말한다.
- 우선, 메시지를 전송하고 수신하려면 프로듀서와 컨슈머 간에 메시지를 전달해 주는 메시지 브로커가 필요하다.
메시지 브로커 설정
- 메시지 브로커로는 아파치 ActiveMQ와 아파치 ActiveMQ Artemis를 사용할 수 있다.
- Artemis는 ActiveMQ를 새롭게 구현한 차세대 브로커이다.
- 어떤 브로커를 선택하든 메시지를 송수신하는 코드 작성 방법에는 차이가 없으며, 브로커에 대한 연결을 생성하기 위해 스프링을 구성하는 방법만 달라진다.
Artemis
implementation 'org.springframework.boot:spring-boot-starter-artemis'
- 기본적으로 스프링은 Artemis 브로커가 localhost의 61616 포트를 리스닝하는 것으로 간주한다.
- 실무 환경에서는 해당 브로커에 대한 연결을 생성하기 위한 설정과, 브로커와 상호작용할 애플리케이션의 인증 정보 설정을 할 수 있다.
속성 | 설명 |
spring.artemis.host | 브로커의 호스트 |
spring.artemis.port | 브로커의 포트 |
spring.artemis.user | 브로커를 사용하기 위한 사용자 (optional) |
spring.artemis.password | 브로커를 사용하기 위한 사용자 암호 (optional) |
ActiveMQ
implementation 'org.springframework.boot:spring-boot-starter-activmq'
속성 | 설명 |
spring.activemq.broker-url | 브로커의 URL |
spring.activemq.user | 브로커를 사용하기 위한 사용자 (optional) |
spring.activemq.password | 브로커를 사용하기 위한 사용자 암호 (optional) |
spring.activemq.in-memory | 인메모리 브로커로 시작할 것인지의 여부 (기본값은 true) |
- 브로커의 호스트 이름과 포트를 별개의 속성으로 설정하는 대신, ActiveMQ의 브로커 주소를 broker-url 속성 하나로 지정한다.
- URL은 tcp://URL 형태로 지정해야 한다.
- 인메모리 브로커는 같은 애플리케이션에서 메시지를 쓰고 읽을 때만 유용하기 때문에, 실무 환경에서는 스프링이 인메모리 브로커로 시작하지 않도록 in-memory 속성을 false로 설정해야 한다.
메시지 전송
- JMS 스타터 의존성이 빌드에 지정되면, 메시지를 송수신하기 위해 사용할 수 있는 JmsTemplate을 스프링 부트가 자동-구성한다.
- JmsTemplate은 스프링 JMS 통합 지원의 핵심으로, JMS로 작업하는데 필요한 메시지 브로커와의 연결 및 세션 생성, 메시지 전송 도중 발생할 수 있는 예외처리와 같은 코드들을 줄여준다.
- JmsTemplate은 메시지 전송에 필요한 send()와 converAndSend() 메서드를 제공하며, 각 메서드는 서로 다른 매개변수를 지원하기 위해 오버라이딩 되어 있다.
// 원시 메시지 전송
@Override
public void send(MessageCreator messageCreator) throws JmsException {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
send(defaultDestination, messageCreator);
}
else {
send(getRequiredDefaultDestinationName(), messageCreator);
}
}
@Override
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
execute(session -> {
doSend(session, destination, messageCreator);
return null;
}, false);
}
@Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
execute(session -> {
Destination destination = resolveDestinationName(session, destinationName);
doSend(session, destination, messageCreator);
return null;
}, false);
}
// 객체로부터 변환된 메시지 전송
@Override
public void convertAndSend(Object message) throws JmsException {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
convertAndSend(defaultDestination, message);
}
else {
convertAndSend(getRequiredDefaultDestinationName(), message);
}
}
@Override
public void convertAndSend(Destination destination, final Object message) throws JmsException {
send(destination, session -> getRequiredMessageConverter().toMessage(message, session));
}
@Override
public void convertAndSend(String destinationName, final Object message) throws JmsException {
send(destinationName, session -> getRequiredMessageConverter().toMessage(message, session));
}
// 객체로부터 메시지로 변환하고 후처리 후 전송
@Override
public void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
convertAndSend(defaultDestination, message, postProcessor);
}
else {
convertAndSend(getRequiredDefaultDestinationName(), message, postProcessor);
}
}
@Override
public void convertAndSend(
Destination destination, final Object message, final MessagePostProcessor postProcessor)
throws JmsException {
send(destination, session -> {
Message msg = getRequiredMessageConverter().toMessage(message, session);
return postProcessor.postProcessMessage(msg);
});
}
@Override
public void convertAndSend(
String destinationName, final Object message, final MessagePostProcessor postProcessor)
throws JmsException {
send(destinationName, session -> {
Message msg = getRequiredMessageConverter().toMessage(message, session);
return postProcessor.postProcessMessage(msg);
});
}
send()
send() 메서드는 Message 객체를 생성하기 위해 MessageCreator를 필요로 하며, JMS 메시지를 쓰는 곳을 지정하는 방법에 따라 3가지 오버라이딩된 메서드를 제공한다.
1. 첫 번째 메서드는 도착지 매개변수가 없으며, 해당 메시지를 기본 도착지로 전송한다.
@Service
@RequiredArgsConstructor
public class JmsOrderMessagingService implements OrderMessagingService {
private final JmsTemplate jms;
@Override
public void sendOrder(Order order) {
jms.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(order);
}
});
}
}
- MessageCreator 인터페이스를 구현한 익명의 내부 클래스를 익명의 내부 클래스를 인자로 전달하여 jms.send()를 호출한다.
- 익명의 내부 클래스는 createMessage()를 오버라이딩하여 전달된 Order 객체로부터 새로운 메시지를 생성한다.
- 메시지의 도착지를 지정하지 않으므로 기본 도착지 이름을 spring.jms.template.default-destination 속성에 지정해주어야 한다.
2. 두 번째 메서드는 해당 메시지의 도착지를 나타내는 Destination 객체를 인자로 받는다.
@Bean
public Destination orderQueue() {
return new ActiveMQQueue("도착지");
}
@Service
@RequiredArgsConstructor
public class JmsOrderMessagingService implements OrderMessagingService {
private final JmsTemplate jms;
private final Destination orderQueue;
@Override
public void sendOrder(Order order) {
jms.send(orderQueue,
session -> session.createObjectMessage(order));
}
}
- Destination 빈을 선언하고 메시지 전송을 수행하는 빈에 주입하면, send()를 호출할 때 이 빈을 사용하여 메시지 도착지를 지정할 수 있다.
- 그러나 실제로는 도착지 이름 외에 다른 것을 지정하는 일이 거의 없으므로, 아래와 같이 send()의 첫 번째 인자로 Destination 객체 대신 도착지 이름만 지정하는 것이 더 쉽다.
3. 세 번째 메서드는 해당 메시지의 도착지를 나타내는 문자열을 인자로 받는다.
@Service
@RequiredArgsConstructor
public class JmsOrderMessagingService implements OrderMessagingService {
private final JmsTemplate jms;
@Override
public void sendOrder(Order order) {
jms.send("도착지",
session -> session.createObjectMessage(order));
}
}
- 하지만 Message 객체를 생성하는 MessageCreator를 두 번째 인자로 전달해야 하므로, 코드가 조금 복잡해진다는 단점
- 이러한 경우 전송할 메시지 객체만 지정할수 있는 convertAndSend()를 사용할 수 있다.
converAndSend()
- Object 타입 객체를 인자로 받아 내부적으로 Message 타입으로 변환한다.
- MessageCreator가 아니라 전송될 객체를 인자로 직접 전달하면 해당 객체가 Message 객체로 변환되어 전송되므로, 메시지 전송이 간단하다.
@Override
public void sendOrder(Order order) {
jms.convertAndSend("도착지", order);
}
- send()처럼 Destination 객체나 문자열 값으로 지정한 도착지를 인자로 받거나, 도착지를 생략하여 기본 도착지로 메시지를 전송할 수 있다.
- 인자로 전달되는 객체는 Message 객체로 변환된 후 전송되며, 이러한 작업은 MessageConverter를 구현하여 처리한다.
- MessageConverter는 스프링에 정의된 인터페이스로, 두 개의 메서드만 정의되어 있다.
- 하지만 스프링이 이미 메시지 변환기를 구현해 놓았기 때문에 우리가 구현할 필요는 없다.
메시지 변환기
MappingJackson2MessageConverter | Jackson 2 JSON 라이브러리를 사용해서 메시지를 JSON으로 상호 변환한다. |
MarshallingMessageConverter | JAXB를 사용해서 메시지를 XML로 상호 변환한다. |
MessagingMessageConverter | 수신된 메시지의 MessageConverter를 사용해서 해당 메시지를 Message 객체로 상호 변환하거나, JMS 헤더와 연관된 JmsHeaderMapper를 표준 메시지 헤더로 상호 변환한다. |
SimpleMessageConverter | 문자열을 TextMessage로, byte 배열을 BytesMessage로, Map을 MapMessage로, Serializable 객체를 Object Message로 상호 변환한다. |
- 기본적으로는 SimpleMessageConverter가 사용되며, 이 경우 전송될 객체가 Serializable 인터페이스를 구현하는 것이어야 한다.
- 다른 메시지 변환기를 적용할 때는 해당 변환기의 인스턴스를 빈으로 선언하면 된다.
@Configuration
public class MessagingConfig {
@Bean
public MappingJackson2MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setTypeIdPropertyName("_typeId");
HashMap<String, Class<?>> typeIdMappings = new HashMap<>();
typeIdMappings.put("order", Order.class);
messageConverter.setTypeIdMappings(typeIdMappings);
return messageConverter;
}
}
- 수신된 메시지의 변환 타입을 메시지 수신자가 알아야 하므로, setTypeIdPropertyName() 메서드를 호출한 후 이 메시지 변환기 인스턴스를 반환한다.
- 여기에는 변환되는 타입의 클래스 이름(패키지 전체 경로 포함)이 포함되는데, 메시지 수신자도 똑같은 클래스와 타입을 가져야 하기 때문에 유연성이 떨어진다.
- 따라서, 유연성을 높이기 위해 메시지 변환기의 setTypeIdMappings()를 호출하여 실제 타입에 임의의 타입 이름을 매핑시킬 수 있다.
- 위의 경우, 해당 메시지의 _typeId 속성에 전송되는 클래스 이름 대신 order 값이 전송되며, 해당 메시지를 수신하는 애플리케이션에는 이와 유사한 메시지 변환기가 구성되어 있으므로 order를 자신이 알고 있는 주문 데이터로 매핑하면 된다.
후처리 메시지
convertAndSent()의 마지막 인자로 MessagePostProcessor를 전달하면, Message 객체가 생성된 후 이 객체에 필요한 처리를 할 수 있다.
@Service
@RequiredArgsConstructor
public class JmsOrderMessagingService implements OrderMessagingService {
private final JmsTemplate jms;
@Override
public void sendOrder(Order order) {
jms.convertAndSend("도착지", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
}
MessagePostProcessor는 함수형 인터페이스로, 익명의 내부 클래스를 람다로 교체하여 아래와 같이 간단하게 표현할 수 있다.
@Override
public void sendOrder(Order order) {
jms.convertAndSend("도착지", order,
message -> {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
});
}
만약 여러 개의 다른 convertAndSend() 호출에서 동일한 MessagePostProcessor를 사용하고 싶은 경우에는 람다보다, 메서드 참조를 사용하면 불필요한 코드의 막을 수 있다.
@Override
public void sendOrder(Order order) {
jms.convertAndSend("도착지", order, this::addOrderSource);
}
private Message addOrderSource(Message message) throws JMSException {
message.setStringProperty("X_ORDER_SOURCE", "WEB");
return message;
}
메시지 수신
- 메시지를 수신하는 방법은 아래와 같이 두 가지가 있다.
- 코드에서 메시지를 요청하고 도착할 때까지 기다리는 풀 모델
- 메시지가 수신 가능하게 되면 코드로 자동 전달하는 푸시 모델
- JmsTemplate에서 제공하는 모든 메서드는 풀 모델을 사용하므로, 이 메서드 중 하나를 호출하여 메시지를 요청하면 스레드에서 미시지를 수신할 수 있을 때까지 기다린다.
- 만약 푸시 모델을 사용하고 싶으면 언제든 메시지가 수신 가능할 때 자동 호출되는 메시지 리스너를 정의해야 한다.
- 일반적으로 푸시 모델이 스레드의 실행을 막지 않으므로 좋지만, 많은 메시지가 너무 빨리 도착한다면 리스너에 과부하가 걸릴 수 있다.
pull model
- JmsTemplate은 브로커로부터 메시지를 가져오는 여러 메서드를 제공한다.
// 원시 메시지 수신
@Override
@Nullable
public Message receive() throws JmsException {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
return receive(defaultDestination);
}
else {
return receive(getRequiredDefaultDestinationName());
}
}
@Override
@Nullable
public Message receive(Destination destination) throws JmsException {
return receiveSelected(destination, null);
}
@Override
@Nullable
public Message receive(String destinationName) throws JmsException {
return receiveSelected(destinationName, null);
}
// 메시지를 도메인 타입으로 변환
@Override
@Nullable
public Object receiveAndConvert() throws JmsException {
return doConvertFromMessage(receive());
}
@Override
@Nullable
public Object receiveAndConvert(Destination destination) throws JmsException {
return doConvertFromMessage(receive(destination));
}
@Override
@Nullable
public Object receiveAndConvert(String destinationName) throws JmsException {
return doConvertFromMessage(receive(destinationName));
}
receive()
@Component
@RequiredArgsConstructor
public class JmsOrderReceiver implements OrderReceiver {
private final JmsTemplate jms;
private final MessageConverter converter;
@Override
public Order receiveOrder() throws JMSException {
Message message = jms.receive("도착지");
return (Order) converter.fromMessage(message);
}
}
- receive() 메서드는 변환되지 않은 메시지를 반환하며, 메시지 내부의 객체로 변환하기 위해 MessageConverter를 사용한다.
- 수신 메시지의 타입 ID 속성은 해당 메시지를 객체로 변환하라고 알려주며, 변환된 객체의 타입은 Object이므로 캐스팅한 후 반환해야 한다.
- 메시지의 속성과 헤더가 필요한 경우에는 이처럼 원시 Message 객체를 메시지로 수신하는 것이 유용하지만, 메시지의 메타데이터는 필요없고 페이로드만 필요할 때는 receiveAndConvert()를 사용한다.
receiveAndConvert()
@Component
@RequiredArgsConstructor
public class JmsOrderReceiver implements OrderReceiver {
private final JmsTemplate jms;
@Override
public Order receiveOrder() throws JMSException {
return (Order) jms.receiveAndConvert("도착지");
}
}
모든 메시지 변환은 내부적으로 receiverAndConvert()에서 수행되므로, 더 이상 MessageConverter를 주입할 필요가 없다.
push model
- 메시지 리스너는 메시지가 도착할 때까지 대기하는 수동적 컴포넌트이다.
- JMS에서 메시지에 반응하는 메시지 리스너를 생성하려면 컴포넌트의 메서드에 @JmsListener를 지정하면 된다.
@JmsListener(destination = "도착지")
public void receiveOrder(Order order) {
log.info("RECEIVE ORDER " + order);
}
- 이 메서드는 JmsTemplate을 사용하지 않으며 애플리케이션 코드에서도 호출되지 않는 대신, 스프링의 프레임워크 코드가 특정 도착지에 메시지가 도착하는 것을 기다리다가 도착하면 해당 메시지에 적재된 객체가 인자로 전달되면서 자동 호출된다.
- 즉, 스프링 MVC 요청 매핑 어노테이션과 유사하게 지정된 @JmsListener가 지정된 메서드들은 지정된 도착지에 들어오는 메시지에 반응한다.
정리
- JMS는 표준 자바 명세에 정의되어 있고 여러 브로커에서 지원되므로 자바 메시징에 많이 사용되지만, 자바 애플리케이션에서만 사용할 수 있다는 단점이 존재한다.
- RabbitMQ와 카프카와 같은 새로운 메시징 시스템은 이런 단점을 해결하여 다른 언어와 JVM 외의 다른 플랫폼에서 사용할 수 있다.
AMQP
- RabbitMQ는 AMQP의 구현으로, JMS보다 더 진보된 메시지 라우팅 전략을 제공한다.
- JMS 메시지가 수신자가 가져갈 메시지 도착지의 이름을 주소로 사용하는 반면, AMQP 메시지는 수신자가 리스닝하는 큐와 분리된 거래소 이름과 라우팅 키를 주소로 사용한다.
- 메시지가 RabbitMQ 브로커에 도착하면 주소로 지정된 거래소로 들어간다.
- 거래소는 하나 이상의 큐에 메시지를 전달하는데, 이때 거래소 타입, 거래소와 큐 사이의 바인딩, 메시지의 라우팅 키 값을 기반으로 처리한다.
거래소
Default | 브로커가 자동으로 생성하는 특별한 거래소로, 해당 메시지의 라우팅 키와 이름이 같은 큐로 메시지를 전달한다. 모든 큐는 자동으로 기본 거래소와 연결된다. |
Direct | 바인딩 키가 해당 메시지의 라우팅 키와 같은 큐에 메시지를 전달한다. |
Topic | 바인딩 키가 해당 메시지의 라우팅 키와 일치하는 하나 이상의 큐에 메시지를 전달한다. |
Fanout | 바인딩 키나 라우팅 키에 상관없이 모든 연결된 큐에 메시지를 전달한다. |
Header | 토픽 거래소와 유사하며, 라우팅 키 대신 메시지 헤더 값을 이용한다. |
Dead Letter | 정의된 어떤 거래소-큐 바인딩과도 일치하지 않는 모든 메시지를 보관하는 거래소 |
- 거래소의 가장 간단한 형태는 기본 거래소와 팬아웃 거래소이며, 이들은 JMS의 큐 및 토픽과 거의 일치한다.
- 메시지는 라우팅 키를 갖고 거래소로 전달되고 큐에서 읽혀져 소비되는데, 메시지는 바인딩 정의를 기반으로 거래소로부터 큐로 전달된다.
- 스프링 애플리케이션에서 메시지를 전송하고 수신하는 방법은 사용하는 거래소 타입과 무관하며, 거래소와 큐의 바인딩을 정의하는 방법과도 관계가 없다.
설정
- AMQP 스타터를 빌드에 추가하면 AMQP 연결 팩토리와 RabbitTemplate 빈을 생성하는 자동-구성이 수행된다.
compileOnly 'org.springframework.amqp:spring-rabbit:2.4.2'
- 개발 목적이라면 RabbitMQ 브로커는 로컬 컴퓨터에서 실행되고 5672 포트를 리스닝 하며, 인증 정보가 따로 없다.
- 실무 환경에서는 아래와 같은 속성들을 지정해줄 수 있다.
spring.rabbitmq.addresses | 쉼표로 구분된 리스트 형태의 RabbitMQ 브로커 주소 |
spring.rabbitmq.host | 브로커의 호스트 (기본값은 localhost) |
spring.rabbitmq.port | 브로커의 포트 (기본값은 5672) |
spring.rabbitmq.username | 브로커를 사용하기 위한 사용자 이름 (optional) |
spring.rabbitmq.password | 브로커를 사용하기 위한 사용자 암호 (optional) |
메시지 전송
- RabbitMQ 메시징을 위한 스프링 지원의 핵심은 RabbitTemplate로, RabbitTemplate을 사용한 메시지 전송의 경우 send()와 convertAndSend() 메서드는 같은 이름의 JmsTemplate 메서드와 유사하다.
- 그러나 지정된 큐나 토픽에만 메시지를 전송했던 JmsTemplate 메서드와 달리, RabbitTemplate 메서드는 거래소와 라우팅 키의 형태로 메시지를 전송한다.
- 즉 도착지 이름 대신, 거래소와 라우팅 키를 지정하는 문자열 값을 인자로 받으며, 거래소를 인자로 받지 않으면 기본 거래소로, 라우팅 키를 인자로 받지 않으면 기본 라우팅 키로 메시지를 전송한다.
void send(String exchange, String routingKey, Message message) |
원시 Message 객체 전송 |
void convertAndSend(String exchange, String routingKey, Object message) |
객체를 Message로 변환해서 전송 |
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) |
send()
@RequiredArgsConstructor
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private final RabbitTemplate rabbit;
@Override
public void sendOrder(Order order) {
// 메시지 속성 지정
MessageProperties props = new MessageProperties();
// Order 객체를 Message 객체로 변환
MessageConverter converter = rabbit.getMessageConverter();
Message message = converter.toMessage(order, props);
// 메시지 전송
rabbit.send("라우팅키", message);
}
}
- MessageConverter로 객체를 Message 객체로 변환하고, MessageProperties를 사용해서 메시지 속성을 제공한다.
- send()를 호출하면 메시지와 함께 거래소 및 라우팅 키를 인자로 전달하고, 거래소를 생략하면 기본 거래소를 사용한다.
- 만약 거래소와 라우팅 키의 기본값을 바꾸고 싶으면 spring.rabbitmq.template.exchange와 spring.rabbitmq.template.routing-key 속성을 이용하여 변경할 수 있다.
convertAndSend()
- converAndSend() 메서드를 사용하면 모든 변환 작업을 RabbitTemplate이 처리하도록 할 수 있다.
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("라우팅키", order);
}
메시지 변환기
- 기본적으로 메시지 변환은 SimpleMessageConverter로 수행되며, String과 같은 간단한 타입과 Serializable 객체를 Message 객체로 변환할 수 있다.
- 스프링은 이 외에도 아래와 같이 RabbitTemplate에 사용할 수 있는 여러 개의 메시지 변환기를 제공한다.
Jackson2JsonMessageConverter | Jackson2JSONProcessor를 사용해서 객체를 JSON으로 상호 변환한다. |
MarshallingMessageConverter | 스프링 Marshaller와 Unmarshaller를 사용해서 변환한다. |
SerializerMessageConverter | 스프링의 Serializer와 Deserializer를 사용해서 String과 객체를 변환한다. |
SimpleMessageConverter | String, byte 배열, Serializable 타입을 변환한다. |
ContentTypeDelegatingMessageConverter | contentType 헤더를 기반으로 다른 메시지 변환기에 변환을 위임한다. |
- 만약 메시지 변환기를 변경해야 할 때는 MessageConverter 타입의 빈을 구성하면, 스프링 부트 자동-구성에서 이 빈을 찾아 기본 메시지 변환기 대신 이 빈을 RabbitTemplate으로 주입한다.
@Configuration
public class MessagingConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
메시지 속성 설정
- 전송하는 메시지의 일부 헤더를 설정해야 하는 경우, Message 객체를 생성할 때 메시지 변환기에 제공하는 MessageProperties 인스턴스를 이용한다.
@RequiredArgsConstructor
@Service
public class RabbitOrderMessagingService implements OrderMessagingService {
private final RabbitTemplate rabbit;
@Override
public void sendOrder(Order order) {
MessageConverter converter = rabbit.getMessageConverter();
// 메시지 속성 지정
MessageProperties props = new MessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB")
Message message = converter.toMessage(order, props);
rabbit.send("라우팅키", message);
}
}
- 그러나 convertAndSend()를 사용할 때는 MessageProperties 객체를 직접 사용할 수 없으므로 아래와 같이 MessagePostProcessor를 이용해야 한다.
@Override
public void sendOrder(Order order) {
rabbit.convertAndSend("라우팅키", order,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties props = message.getMessageProperties();
props.setHeader("X_ORDER_SOURCE", "WEB");
return message;
}
});
}
메시지 수신
- JMS에서처럼 RabbitMQ도 아래와 같이 두 가지 방식을 선택할 수 있다.
- RabbitTemplate을 사용해서 큐로부터 메시지를 가져온다. (풀 모델)
- @RabbitListener가 지정된 메서드로 메시지가 푸시된다. (푸시 모델)
RabbitTemplate
- RabbitTemplate은 큐로부터 메시지를 가져오는 여러 메서드를 제공한다.
Message receive(String queueName, long timeoutMillis) | 메시지를 수신한다. |
Object receiveAndConvert(String queueName, long timeoutMillis) | 메시지로부터 변환된 객체를 수신한다. |
<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameteriaedTypeReference<T> type) |
메시지로부터 변환된 타입-안전 객체를 수신한다. |
- receive()는 큐로부터 원시 Message 객체를 수신하며, receiveAndConvert()는 메시지를 수신한 후 메시지 변환기를 사용하여 수신 메시지를 도메인 객체로 변환하여 반환한다.
- 또한 대부분의 수신 메서드는 메시지의 수신 타임아웃을 나타내기 위해 long 타입의 매개변수를 가지며, 기본값은 0밀리초
- 타임아웃 값을 인자로 전달하면 메시지가 도착하거나 타임아웃에 걸릴 때까지 receive()와 receiveAndConvert() 메서드가 대기하게 된다.
- 한가지 주의할 점은 수신 메서드는 거래소나 라우팅 키를 매개변수로 갖지 않는다는 것으로, 이는 일단 메시지가 큐에 들어가면 다음 메시지 도착지는 큐로부터 메시지를 소비하는 컨슈머이기 때문이다.
- 따라서, 메시지를 소비하는 애플리케이션은 거래소 및 라우팅 키를 신경 쓸 필요가 없고 큐만 알면 된다.
receive()
@Component
@RequiredArgsConstructor
public class RabbitOrderReceive implements OrderReceiver {
private final RabbitTemplate rabbit;
private final MessageConverter converter;
@Override
public Order receiveOrder() {
Message message = rabbit.receive("큐이름", 30000);
return message != null ? (Order) converter.fromMessage(message) : null;
}
}
- 주입된 RabbitTemplate의 receive() 메서드를 호출하여 큐로부터 데이터를 가져오고, Message 객체가 반환되면 MessageConverter를 사용하여 Message 객체를 Order 객체로 변환한다.
- 만약 구성을 통해 타임아웃을 설정하고자 한다면, receive() 호출 코드의 타임아웃 값을 제거하고 spring.rabbitmq.template.receive-timeout 속성에 타임아웃 값을 설정할 수도 있다.
receiveAndConvert()
- receiveAndConvert()를 사용하면 메시지를 자동으로 변환해주므로 아래와 같이 작성할 수 있다.
@Override
public Order receiveOrder() {
return (Order) rabbit.receiveAndConvert("큐이름");
}
- 여기서 ParameterizedTypeReference를 receiveAndConvert()의 인자로 전달하면 직접 Order 객체를 수신할 수 있다.
@Override
public Order receiveOrder() {
return rabbit.receiveAndConvert("큐이름",
new ParameterizedTypeReference<Order>() {});
}
- 이 방법은 타입-안전 측면에서는 캐스팅보다 좋지만, 메시지 변환기가 SmartMessageConverter 인터페이스를 구현한 클래스여야 한다. (예를 들어 Jackson2JsonMessageConverter)
@RabbitListener
- 메시지 기반의 RabbitMQ 빈을 위해 스프링은 RabbitListener를 제공한다.
- 메시지가 큐에 도착할 때 메서드가 자동 호출되도록 지정하기 위해서는 @RabbitListener 어노테이션을 RabbitMQ 빈의 메서드에 지정해야 한다.
@Component
public class OrderListener {
@RabbitListener(queues = "큐이름")
public void receiveOrder(Order order) {
// 생략
}
}
'Spring > Rest API' 카테고리의 다른 글
아파치 카프카 (Kafka) (0) | 2022.02.09 |
---|---|
REST 서비스 사용 (0) | 2022.01.30 |
스프링 데이터 REST (0) | 2022.01.30 |
REST 엔드포인트 정의 (0) | 2022.01.30 |
RestContoller 요청과 응답 방법 (0) | 2022.01.20 |