안녕하세요 F4팀의 혁잔디입니다. Kafka를 활용한 프로젝트를 설계하면서 Client의 요청에 대한 응답을 Kafka로 어떻게 처리할 수 있을까에 대한 궁금증이 생겨 관련 자료를 찾아보았고 이해를 위해 간단한 예제를 만들어보았습니다.

예제 간단 소개

Database에 User, Order 테이블 존재
User 테이블에는 UserId와 Money(보유한 금액)이 존재
Order 테이블에는 UserId, Paid(결제 금액), Result(결과) 존재
User가 결제 금액을 요청했을 때 보유 금액이 결제 금액보다 크거나 같은 경우 “성공했습니다.” 반환받고
그렇지 않은 경우 “실패했습니다.” 반환받음

Apache Kafka를 사용한 동기식 요청-응답

Apache Kafka는 스트리밍 데이터와 다양한 생산자와 소비자 간의 디커플링을 위해 구축되었기 때문에 정말 필요한 상황이 아니면 Kafka와 요청-응답 개념을 사용하지 않는 것이 좋습니다.
하지만 필요한 상황에서는 Spring kafka 템플릿을 사용하여 동기식 요청-응답을 구현할 수 있습니다.
간단한 예제를 통해 사용 방법을 익혀보았습니다.

flow
PostMan을 통해 Controller에 JSON타입으로 요청을 보내면
Controller에서 ProducerRecord를 통해 헤더에 Reply_Topic을 “order”로 설정해 주고 “user” topic에 이벤트 발행
“user” topic을 구독하던 Consumer에서 간단한 로직 처리 후 @SendTo에 의해 헤더에 있는 Reply_Topic에 결과 반환
Controller에서 ConsumerRecord를 통해 결과값 받은 후 Client에 반환

Controller

@PostMapping(value="/order",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)
	public String sum(@RequestBody OrderEntity request) throws InterruptedException, ExecutionException {
		// create producer record
		ProducerRecord<String, OrderEntity> record = new ProducerRecord<String, OrderEntity>(requestTopic, request);
		// set reply topic in header
		record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
		// post in kafka topic
		RequestReplyFuture<String, OrderEntity, OrderEntity> sendAndReceive = kafkaTemplate.sendAndReceive(record);

		// confirm if producer produced successfully
		SendResult<String, OrderEntity> sendResult = sendAndReceive.getSendFuture().get();

		//print all headers
		sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));
		// get consumer record
		ConsumerRecord<String, OrderEntity> consumerRecord = sendAndReceive.get();

		// return consumer value
		return consumerRecord.value().getResult();
	}

Consumer

@KafkaListener(topics = "${kafka.topic.request-topic}")
	@SendTo
	public OrderEntity listen(OrderEntity request) throws InterruptedException {
		if(uRepository.findById(request.getUserId()).isPresent()) {
			UserEntity user = uRepository.findById(request.getUserId()).get();
			if(user.getMoney() < request.getPaid()) {
				request.setResult("Failure");
			}else {
				user.setMoney(user.getMoney()-request.getPaid());
				request.setResult("Success");
			}
			uRepository.save(user);
			oRepository.save(request);
			return request;
		}else {
			request.setResult("없는 사용자");
			return request;
		}
	}



Git

Configuration 등 설정이나 전체 코드는
https://github.com/rlagurnws/kafkatest.git v2.1 tag에서 확인 가능합니다.