티스토리 뷰
이도원님의 'Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA)' 듣고 정리한 내용입니다.
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 인프런 | 강의
Spring framework의 Spring Cloud 제품군을 이용하여 마이크로서비스 애플리케이션을 개발해 보는 과정입니다. Cloud Native Application으로써의 Spring Cloud를 어떻게 사용하는지, 구성을 어떻게 하는지에 대해
www.inflearn.com
1. Kafka란?
실시간으로 데이터 피드를 관리하기 위해 사용되어 처리량이 높고 지연시간이 낮다. RabbitMQ와 같이 메시지를 다른 쪽으로 전달시켜주는 서버이지만 대용량 처리가 가능하고 안정성이 높다.
특징
- Producer(메시지를 보내는) Consumer(메시지를 받는) 분리
- 메시지를 여러 Consumer에게 허용
- 높은 처리량을 위한 메시지 최적화
- Scale - out 가능
- Eco -sysetem
Kafka 서버는 Kafka Broker라 하고 3대 이상의 Broker Cluster로 구성하는 것을 권장한다. Kafka Broker들을 컨트롤해주는 것은 Zookepter이다. 여러 Broker들 중 1개는 Controller 기능을 수행해야 한다. 문제가 생겼을 때 Controller 역할을 교체한다.
2. Kafka 설치

3. Kafka Producer/Consumer
Kafka를 어떻게 기동하고 메시지를 어떻게 저장하고 보내주는지 살펴보자. 순서는 Zookepter, Kafka 서버, topic 순으로 진행되고 Kafka 파일 밑에서 명령어를 입력해주면 된다.
먼저 Zookepter를 실행시켜주어야 한다.
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
기본 포트번호는 2181이다.
Kafka 서버를 실행시키자.
./bin/kafka-server-start.sh ./config/server.properties
기본 포트번호는 9092이다.
topic을 생성해보자. topic에 관련된 명령어는 다음과 같다.
- topic 생성
./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 \ --partions 1
- topic 목록 확인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
topic에 아무것도 없을 경우 빈 값으로 나온다.
- topic 정보 확인
./bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
이제 Producer와 Consumer을 실행시켜보자. 마찬가지로 Kafka 파일 밑에서 실행하면 된다.
메시지를 보내는 Producer이다.
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
데이터를 받아오는 Consumer이다. topic을 지정하여 topic에 변동사항이 생길 경우 데이터를 가져오도록 설정할 수 있고 --from-beginning을 설정하면 모든 메시지를 처음부터 받아올 수 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
Producer가 직접 Consumer에게 전달하는 것이 아니라 Consumer가 topic을 등록을 해두면 등록되어 있던 메시지를 topic으로부터 받는 것이다. 즉, Producer는 topic에 데이터를 전달하는 것이다.

새로운 Consumer를 등록한다면?

Consumer가 여러개여도 Producer가 보낸 메시지를 모두 확인할 수 있다.
4. Kafka Connect
Kafka Connect를 이용하면 데이터를 자유롭게 import, export 할 수 있다는 것을 의미한다.
저는 mysql을 사용해보겠습니다 - 😎
<!-- https://mvnrepository.com/artifact/com.mysql/mysql-connector-j -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>
order service에 mysql 의존성을 추가해준다.

Kafka Connector는 users 데이터베이스에 데이터가 삽입될 때 데이터를 감지했다가 새로운 데이터베이스에 옮기는 작업을 한다.
먼저 topic 목록을 조회해보자.

Kafka Connector를 설치하고 Kafka Connector 서버를 기동시킨다. confluent 파일 밑에서 명령어를 입력해주면 된다.
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
topic을 다시 조회해보면?

connect와 관련된 파일 3개가 생긴 것을 볼 수 있다.
다음은 Kafka Connector JDBC를 설치하자.
kafka-connect-jdbc-10.6.3.jar 파일을 kafka connector 기동할 때 설정해 줄 것이다.
confluent 파일 내에서 다음 명령어를 입력하여 설정파일을 수정한다.
code ./etc/kafka/connect-distributed.properties

마지막으로 mysql jar파일을 kafka connector 파일에서 share/java/kafka 폴더에 복붙하기

{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/msaDB",
"connection.user": "root",
"connection.password": "pwd",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "msaDB.users",
"topic.prefix": "my_topic_",
"tasks.max": "1"
}
}
데이터값을 전달해보자.
(이때 kafka connect 잘 실행되어있는지 확인하기!)

get 방식으로 호출하면 등록한 connector를 볼 수 있는데, /connector 이름/status로 호출하면 상세한 정보 확인이 가능하다.

이 오류 해결했습니다 .... 하
connection.url으로 써야하는데..... connector.url로 해서 생겼던 오류였습니다... ......

insert into users(user_id, pwd, name) values('user1', 'test1111', "User name");
쿼리문을 이용하여 데이터를 넣어주고 topic 목록을 다시 확인하면 my_topic_users가 추가되어 있다.


insert 쿼리문을 2번 실행시켜서 데이터가 2개가 있다.
이제까지 Source Connect에서 데이터를 보내면 topic으로 데이터가 쌓이는 것을 확인하였다. Sinck Connect의 역할은 topic에 전달된 데이터를 가지고 와서 사용하는 것이다.
{
"name": "my-sink-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/msaDB",
"connection.user": "root",
"connection.password": "pwd",
"auto.create": "true",
"auto.evlove": "true",
"delete.enabled": "false",
"tasks.max": "1",
"topics": "my_topic_users"
}
}


뒤에꺼 하다가 다시 돌아온거라 orders 테이블이 있다.
my_topic_users 테이블을 조회하면 users 테이블과 동일한 데이터를 가져온다.
'Spring' 카테고리의 다른 글
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) #13 (0) | 2023.09.09 |
---|---|
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) #12 (0) | 2023.09.02 |
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) #10 (0) | 2023.08.18 |
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) #9 (0) | 2023.08.11 |
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) #8 (0) | 2023.08.11 |