티스토리 뷰

인프런 이도원님의 'Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)' 듣고 정리한 내용입니다.

 

Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 인프런 | 강의

Spring framework의 Spring Cloud 제품군을 이용하여 마이크로서비스 애플리케이션을 개발해 보는 과정입니다. Cloud Native Application으로써의 Spring Cloud를 어떻게 사용하는지, 구성을 어떻게 하는지에 대해

www.inflearn.com

 

1. Orders Microservice와 Catalogs Microservice에 Kafka Topic의 적용

각 마이크로서비스는 각각의 데이터베이스를 가지고 있다. order-service에서 재고 수량의 변동이 생길 경우 catalog-service에도 적용이 되어야한다. 이때 데이터 전달에 Kafka를 사용해보겠다.

order-service는 kafka topic으로 메시지를 전송하므로 produecer 역할을 하고, catalog-service는 kafka topic에 전송된 메시지를 가지고 와서 업데이트시키는 consumer 역할을 한다.

 

pom.xml - Kafka를 사용하겠다는 dependency 추가

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

 

catalog-service에 KafkaConsumerConfig 파일을 만들어서 Kafka 접속을 하기 위한 configuration 정보를 가질 수 있도록 구성을 해준다.

 

1. topic에 접속을 하기 위한 정보가 들어가 있는 ConsumerFactory

2. topic에 변동사항을 감지할 수 있는 Listener 등록 ConcurrentKafkaListenerConatinerFactory

 

KafkaConsumerConfig.java

@EnableKafka // Kafka에서 사용할 수 있도록 설정
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(properties);
    } // 접속하고자 하는 정보

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
                = new ConcurrentKafkaListenerContainerFactory<>();

        kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

        return kafkaListenerContainerFactory;
    } // 접속 정보를 이용한 Listener
}

 

다음은 이 configuration 정보를 가지고 사용할 수 있는 consumer를 만들어준다. 여기서는 topic에 변경된 데이터 값을 가지고 실제 데이터베이스에 반영해주는 작업을 한다.

 

KafkaConsumer.java

@Service
@Slf4j
public class KafkaConsumer {
    CatalogRepository repository;

    @Autowired
    public KafkaConsumer(CatalogRepository repository) {
        this.repository = repository;
    }

    @KafkaListener(topics = "example-catalog-topic")
    public void upadateQty(String kafkaMessage) {
        log.info("Kafka Message: -> " + kafkaMessage);

        // 메시지가 직렬화 되어 저장이 들어오므로 역직렬화를 해서 사용해야 함
        Map<Object, Object> map = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        // String을 JSON 타입으로 볼 수 있도록!
        try {
            map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        CatalogEntity entity = repository.findByProductId((String)map.get("productId")); // map으로 get하면 JSON 타입일테니 String으로 다시 바꿔줘야 함!
        if (entity != null) {
            entity.setStock(entity.getStock() - (Integer)map.get("qty"));
            repository.save(entity);
        }
    } // 설정해둔 topic으로부터 데이터가 전달이 되면 가지고 와서 메서드를 실행
}

직렬화: 객체 또는 데이터를 외부의 자바 시스템에서 사용할 수 있도록 바이트 형태로 데이터 변환

역직렬화: 바이트로 변환된 데이터를 다시 객체로 변환

 

다음은 order-service에서 configuration 정보를 가질 수 있는 KafkaProducerConfig를 만들어준다.

 

KafkaProduecerConfig.java

@EnableKafka // Kafka에서 사용할 수 있도록 설정
@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        // consumer가 아니기 때문에 group id 필요 없음
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(properties);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    } // 데이터 전달하는 용의 인스턴스
}

 

KafkaProducer.java

@Service
@Slf4j
public class KafkaProducer {

    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto) {
        ObjectMapper mapper = new ObjectMapper();
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(orderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic,  jsonInString);
        log.info("Kafka Producer sent data from the Order Microservice : " + orderDto);

        return orderDto;
    }
}

 

OrderController.java

@RestController
@RequestMapping("/order-service")
public class OrderController {

    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;

    @Autowired
    public OrderController(Environment env, OrderService orderService, KafkaProducer kafkaProducer) {
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
    }

...

    // 사용자 별 상품 주문
    @PostMapping("{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable String userId, @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        /* JPA */
        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);
        OrderDto createdOrder = orderService.createOrder(orderDto);

        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        // Kafka에게 메시지를 전달하는 동작을 추가해야 함

        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

...

}

 

이제 실행시켜보자.

<실행해야할 것들>

1. Zookepter

2. Kafka Server

3. Eureka Server

4. apigateway-service

5. config-service

6. catalog-service

7. order-service

 

catalog-service 기동하면서 생겼던 오류

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'dataSourceScriptDatabaseInitializer'

https://fftl.tistory.com/m/15 여기 참고

-> jpa 설정에

defer-datasource-initialization: true

추가해주었다.

 

실행시켜서 확인해보니..

catalog service
order service

 

각각 독립적인 데이터베이스를 사용하고 있다는 것을 볼 수 있다.

 

다음과 같은 주문이 들어왔다고 하자.

CATALOG-001 10개 주문

order service에는 주문 내역 잘 들어왔고 ~

원래 catalog 내역

주문 후에는?

CATALOG-001 개수가 90으로 줄어듬

order service의 데이터가 catalog service에게 전달되면서 데이터의 동기화가 이루어졌다.

 

order service에서는 데이터가 전달된 것을 로그로 확인이 가능하고

catalog service에서는 listener가 데이터를 받은 것을 로그로 확인이 가능하다.

데이터를 받은 후에는 update 쿼리문을 사용하여 변경된 데이터를 다시 저장한다.

 

2. Multiple Orders Service에서의 데이터 동기화

문제점

order service를 2개를 기동시킬 경우 각각의 단일 데이터베이스를 사용하게 된다. 그러면 user service에서 order service의 데이터를 다 잘 불러올 수 있을까? orders의 데이터가 분산되어 저장되기 때문에 동기화의 문제가 생긴다.

단일 데이터베이스를 사용할 수 있도록 설정해보겠다.

 

order service를 2개 기동시켜서 주문을 5개 등록하자. (user service도 기동시켜서 회원가입하기!)

데이터가 2개의 마이크로서비스를 번갈아가면서 등록되었다.

데이터가 이렇게 분산되어 저장되다보니 어떤 문제가 생길까!

사용자의 주문 목록을 조회하면 모든 데이터가 한번에 보이는 것이 아니라 나눠서 보인다.

 

어떻게 해결하냐!

order servcie의 주문 정보를 DB가 아닌 Kafka topic으로 전송하여 Kafka Sink Connect를 이용해서 단일 DB에 저장하도록 하겠다.

 

source connect = kafka 서비스에서 메시지를 topic에 전달해주는 역할

sink connect = topic에 전달된 데이터를 가져와서 사용하는 역할

 

위에서 order service에서 데이터를 전달하는 producer 설정을 해주었다. 지금 해야할 것은... topic에 전달된 데이터를 단일 데이터베이스에 반영 시켜주는 작업을 해주자.

 

일단 데이터베이스를 h2가 아닌 mysql로 변경해주자.

mysql DB에 orders 테이블 추가

order-service에서 datasource를 mysql로 변경하기!

 

다시 주문을 등록하는 작업을 진행해보자.

mysql에 주문내역 데이터가 들어온 것을 확인할 수 있다.

 

이제 order-service에서 Kafka 관련 설정을 하자.

 

OrderController.java

@RestController
@RequestMapping("/order-service")
public class OrderController {

    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;

...

    // 사용자 별 상품 주문
    @PostMapping("{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable String userId, @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);
        
        /* JPA */
//        OrderDto createdOrder = orderService.createOrder(orderDto);
//        ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);

        /* Kafka */
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(orderDto.getQty() * orderDto.getUnitPrice());


        // Kafka에게 메시지를 전달하는 동작을 추가해야 함
        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto);

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

...

}

 

주문 정보를 topic에 보낼 때는 다음과 같은 포멧을 유지하며 보낸다.

{
  "schema":"{"
    "type": "struct",
    "fields": [
      {"type": "string", "optional": true, "field":"order_id"},
      {"type": "string", "optional": true, "field":"user_id"},
      {"type": "int32", "optional": true, "field":"product_id"}
      {"type": "int32", "optional": true, "field":"qty"}
      {"type": "int32", "optional": true, "field":"total_price"}
      {"type": "int32", "optional": true, "field":"unit_price"}
    ],
    "optionl": false,
    "name": "orders"
  },
  "payload": {
    "order_id": "1234567",
    "user_id": "abcd",
    "product_id": "CATALOG-001",
    "qty": 5
    "total_price": 6000,
    "unit_price": 1200
  }
}

1. Schema: 테이블의 구조

2. Field: 각각의 데이터베이스에 저장될 필드값 (type, optional, field)

3. Payload: 저장될 데이터

 

Field.java

@Data
@AllArgsConstructor
public class Field { // 데이터를 저장하기 위해서 어떤 필드가 사용되는지 지정
    private String type;
    private boolean optional;
    private String field;
    // 이 값들을 임의로 정하는 것이 아니라 source connect에서 sink connect를 데이터를 보낼 때 어떤 정보를 저장해야할지 가지고 있어야 할 필드 구성
}

 

Payload.java

@Data
@Builder
public class Payload {
    // 여기 정보들은 mysql orders 테이블에 있는 정보랑 동일
    private String order_id;
    private String user_id;
    private String product_id;
    private int qty;
    private int unit_price;
    private int total_price;
}

 

Schema.java

@Data
@Builder
public class Schema {
    private String type;
    private List<Field> fields;
    private boolean optional;
    private String name;

}

 

KafkaOrderDto.java

@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
    private Schema schema;
    private Payload payload;
}

 

Kafka에 전달하기 위한 OrderProducer.java

@Service
@Slf4j
public class OrderProducer {

    private KafkaTemplate<String, String> kafkaTemplate;

    // topic에 전달하기 위한 내용을 생성해야 함
    List<Field> fields = Arrays.asList(new Field("string", true, "order_id"),
            new Field("string", true, "user_id"),
            new Field("string", true, "product_id"),
            new Field("int32", true, "qty"),
            new Field("int32", true, "unit_price"),
            new Field("int32", true, "total_price"));

    Schema schema = Schema.builder()
            .type("struct")
            .fields(fields)
            .optional(false)
            .name("orders")
            .build();

    @Autowired
    public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public OrderDto send(String topic, OrderDto orderDto) {
        Payload payload = Payload.builder()
                .order_id(orderDto.getOrderId())
                .user_id(orderDto.getUserId())
                .product_id(orderDto.getProductId())
                .qty(orderDto.getQty())
                .unit_price(orderDto.getUnitPrice())
                .total_price(orderDto.getTotalPrice())
                .build(); // 실제 전달되는 값들은 Payload에 담겨있음

        // Kafka에 전달할 객체
        KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);

        ObjectMapper mapper = new ObjectMapper(); // schema와 payload 형태로 바꿔야함
        String jsonInString = "";
        try {
            jsonInString = mapper.writeValueAsString(kafkaOrderDto);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }

        kafkaTemplate.send(topic,  jsonInString);
        log.info("Order Producer sent data from the Order Microservice : " + kafkaOrderDto);

        return orderDto;
    }
}

 

OrderController.java

@RestController
@RequestMapping("/order-service")
public class OrderController {

    Environment env;
    OrderService orderService;
    KafkaProducer kafkaProducer;
    OrderProducer orderProducer;

    @Autowired
    public OrderController(Environment env, OrderService orderService,
                           KafkaProducer kafkaProducer, OrderProducer orderProducer) {
        this.env = env;
        this.orderService = orderService;
        this.kafkaProducer = kafkaProducer;
        this.orderProducer = orderProducer;
    }

...

    // 사용자 별 상품 주문
    @PostMapping("{userId}/orders")
    public ResponseEntity<ResponseOrder> createOrder(@PathVariable String userId, @RequestBody RequestOrder orderDetails) {
        ModelMapper mapper = new ModelMapper();
        mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);

        OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
        orderDto.setUserId(userId);

        /* Kafka */
        orderDto.setOrderId(UUID.randomUUID().toString());
        orderDto.setTotalPrice(orderDto.getQty() * orderDto.getUnitPrice());


        // Kafka에게 메시지를 전달하는 동작을 추가해야 함
        /* send this order to the kafka */
        kafkaProducer.send("example-catalog-topic", orderDto);
        orderProducer.send("orders", orderDto);

        ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);

        return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
    }

...

}

 

데이터를 사용하기 위해 sink connect를 추가해보자.

 

다음과 같은 sink를 생성한다.

{
    "name": "my-order-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/msaDB",
        "connection.user": "root",
        "connection.password": "pwd",
        "auto.create": "true",
        "auto.evlove": "true",
        "delete.enabled": "false",
        "tasks.max": "1",
        "topics": "orders"
    }
}

 

조회하면 총 3개의 connector가 있다.

 

마지막으로, 여러 개의 order-service에서 발생했던 메시지가 topic에 잘 전달이 되고 메시지가 하나의 데이터베이스에 들어가는지 테스트 해보자.

 

order-service를 2개 기동시켜서 주문을 여러번 등록한다. 각 주문마다 다른 마이크로서비스에서 실행이된다.

하지만 하나의 데이터베이스에 저장이 된다.

 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함