Kafka
- 아파치 카프카는 가장 새로운 메시징 시스템으로, ActiveMQ, Artemis, RabbitMQ와 유사하면서도 특유의 아키텍처를 가지고 있다.
- 카프카는 높은 확장성을 제공하는 클러스터로 실행되도록 설계되었으며, 클러스터의 모든 카프카 인스턴스에 걸쳐 토픽을 파티션으로 분할하여 메시지를 관리한다.
- 카프카의 토픽은 클러스터의 모든 브로커에 걸쳐 복제된다.
- 클러스터의 각 노드는 하나 이상의 토픽에 대한 리더로 동작하고, 토픽 데이터를 관리하고, 클러스터의 다른 노드로 데이터를 복제한다.
- 각 토픽은 여러 개의 파티션으로 분할될 수 있으며, 클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 된다.
설정
implementation 'org.springframework.kafka:spring-kafka'
- 해당 의존성을 추가하면 스프링 부트가 카프카 사용을 위한 자동-구성을 해주므로, KafkaTemplate을 통해 메시지를 전송, 수신할 수 있다.
- KafkaTemplate은 기본적으로 localhost에서 실행되면서 9092 포트를 리스닝하는 카프카 브로커를 사용한다.
- 애플리케이션 개발 시에는 로컬의 카프카 브로커를 사용하면 되지만, 실무 환경에서는 다른 호스트와 포트로 구성해야 한다.
- spring.kafka.bootstrap-servers 속성에는 카프카 클러스터로의 초기 연결에 사용되는 하나 이상의 카프카 서버들의 위치를 설정한다.
메시지 전송
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
return send(this.defaultTopic, data);
}
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {
return send(this.defaultTopic, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
return send(this.defaultTopic, partition, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {
return send(this.defaultTopic, partition, timestamp, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
@Nullable V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
return doSend(producerRecord);
}
@Override
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
Assert.notNull(record, "'record' cannot be null");
return doSend(record);
}
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<SendResult<K, V>> send(Message<?> message) {
ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);
if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jackson
byte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
if (correlationId != null) {
producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);
}
}
return doSend((ProducerRecord<K, V>) producerRecord);
}
- KafkaTemplate은 제네릭 타입을 사용하고, 메시지를 전송할 때 직접 도메인 타입을 처리할 수 있기 때문에 convertAndSend() 메서드가 없으며, 대신 모든 send() 메서드가 convertAndSend()의 기능을 가지고 있다.
- send()와 sendDefault()에는 메시지가 전송되는 방법을 알려주는 아래와 같은 매개변수를 지정할 수 있다.
메시지가 전송될 토픽 | send()에 필요 |
토픽 데이터를 쓰는 파티션 | Optional |
레코드 전송 키 | Optional |
타임스탬프 | Optional, 기본값은 System.currentTimeMillis() |
페이로드 | 메시지에 적재된 순수한 데이터 |
send(), sendDefault()
@Service
@RequiredArgsConstructor
public class KafkaOrderMessagingService implements OrderMessagingService {
private final KafkaTemplate<String, Order> kafkaTemplate;
@Override
public void sendOrder(Order order) {
kafkaTemplate.send("토픽", order);
// kafkaTemplate.sendDefault(order);
}
}
- 만약 기본 토픽을 설정하려면 spring.kafka.template.default-topic 속성에 토픽을 지정하고 send() 대신에 sendDefault() 메서드를 사용하면 된다.
카프카 리스너
- KafkaTemplate은 메시지를 수신하는 메서드를 제공하지 않기 때문에, 스프링을 이용해서 카프카 토픽의 메시지를 가져오려면 메시지 리스너를 작성해야 한다.
- 메시지 리스너는 @KafkaListener 어노테이션이 지정된 메서드에 정의되며, @KafkaListener가 지정된 메서드는 해당 이름의 토픽에 메시지가 도착할 때 자동 호출된다.
@Component
public class OrderListener {
@KafkaListener(topics = "토픽")
public void handle(Order order) {
// 생략
}
}
- 만약 메시지의 추가적인 메타데이터가 필요하다면 ConsumerRecord나 Message 객체도 인자로 받을 수 있다.
@KafkaListener(topics = "토픽")
public void handle(Order order, ConsumerRecord<Order> record) {
// 수신된 메시지의 파티션
log.info(record.partition());
// 타임스탬프
log.info(record.timestamp());
}
@KafkaListener(topics = "토픽")
public void handle(Order order, Message<Order> message) {
MessageHeaders headers = message.getHeaders();
// 수신된 메시지의 파티션
log.info(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID));
// 타임스탬프
log.info(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
}
- 메시지 페이로드는 ConsumerRecold.value()나 Message.getPayload()를 사용해서 받을 수 있는데, 이는 매개변수로 직접 객체를 요청하는 대신 ConsumerRecold나 Message 객체를 통해 객체를 요청할 수 있음을 의미한다.
'Spring > Rest API' 카테고리의 다른 글
비동기 메시지 전송 (JMS, RabbitMQ) (0) | 2022.02.09 |
---|---|
REST 서비스 사용 (0) | 2022.01.30 |
스프링 데이터 REST (0) | 2022.01.30 |
REST 엔드포인트 정의 (0) | 2022.01.30 |
RestContoller 요청과 응답 방법 (0) | 2022.01.20 |