Kafka는 Apache Software Foundation에서 개발한 분산 스트리밍 플랫폼으로, 실시간 데이터 스트리밍을 처리하고 관리하기 위해 설계되었습니다. Kafka는 데이터 처리, 전송, 저장을 효율적으로 수행하며, 주로 다음과 같은 용도로 사용됩니다:
주요 특징
- 메시지 브로커 역할
Kafka는 대량의 데이터를 생산자(producer)로부터 소비자(consumer)에게 전달하는 메시지 브로커로 작동합니다. 데이터는 **토픽(topic)**이라는 논리적 단위로 분류되어 전송됩니다. - 분산 시스템
Kafka는 여러 서버(브로커)로 구성된 클러스터에서 실행되며, 고가용성과 확장성이 뛰어납니다. - 내구성과 데이터 보존
데이터를 디스크에 영구 저장할 수 있어 필요할 때 다시 읽을 수 있습니다. - 고성능
초당 수백만 건의 메시지를 처리할 수 있는 높은 처리량을 제공합니다.
주요 구성 요소
- 프로듀서(Producer)
데이터를 생성하고 Kafka에 전송하는 역할을 합니다. - 컨슈머(Consumer)
Kafka에서 데이터를 가져가서 처리하는 역할을 합니다. - 브로커(Broker)
데이터를 저장하고 전달하는 Kafka 서버의 구성 요소입니다. - 토픽(Topic)
메시지가 저장되는 논리적 데이터 카테고리입니다. 프로듀서가 메시지를 토픽에 쓰고, 컨슈머가 토픽에서 읽습니다. - 파티션(Partition)
토픽은 여러 파티션으로 나뉘어 분산 저장되며 병렬 처리가 가능합니다.
주요 활용 사례
- 실시간 로그 및 이벤트 수집
애플리케이션 로그나 사용자 이벤트를 실시간으로 수집 및 분석. - 데이터 스트리밍 처리
실시간 데이터 파이프라인 구축 및 분석. - 메시지 큐 시스템 대체
RabbitMQ나 ActiveMQ와 같은 전통적인 메시지 큐 시스템을 대체. - 마이크로서비스 통신
서비스 간 비동기 메시징을 지원.
Kafka는 높은 처리량과 확장성을 필요로 하는 대규모 시스템에서 널리 사용되며, LinkedIn에서 시작되어 Netflix, Uber, Twitter 같은 회사에서 주로 사용됩니다.
AWS EC2 SSH 접속
Unix 계열의 OS를 Local Host PC로 사용하려면, chmod 커맨드로 mykafkakey.pem을 실행할 수 있는 권한을 부여해야 한다.
chmod 400
다음의 ssh 커맨드의 54.180.96.36은 Secure Shell로 접속한 AWS EC2 인스턴스에게 할당된 Public IP Address다.
$ ssh -i ./mykafkakey.pem ubuntu@54.180.96.36
OpenJDK 설치하기
1. 인스턴스 업데이트
우선, EC2 인스턴스의 패키지 목록을 최신 상태로 업데이트한다.
sudo apt upgrade
sudo apt update
2. Java 설치 가능한 버전 확인
사용 가능한 OpenJDK 버전을 검색합니다. 이를 통해 원하는 Java 버전을 선택할 수 있다.
apt search openjdk
3. Java 설치
원하는 버전의 OpenJDK를 설치한다. 예를 들어, OpenJDK 11을 설치하고 싶다면, 다음과 같이 입력한다.
sudo apt install -y openjdk-17-jdk
-y 옵션은 설치 중 나타나는 모든 질문에 자동으로 yes로 답하는 옵션이다, 이를 통해 설치 과정을 자동화할 수 있다.
4. Java 버전 확인
Java 설치가 성공적으로 완료되었는지 확인하기 위해 설치된 Java의 버전을 확인한다.
java -version
환경변수 설정
Java를 설치한 후, 특정 애플리케이션에서 Java 환경변수를 요구하는 경우가 있다. 예를 들어, JAVA_HOME 환경변수를 설정해야 하는 경우, .bashrc 또는 .profile 파일에 다음과 같이 추가할 수 있다.
1. 홈 디렉토리의 .bashrc 파일을 연다.
vi ~/.bashrc
2. 파일의 맨 아래에 JAVA_HOME 환경변수를 추가한다. 예를 들어, OpenJDK 11을 설치한 경우 다음과 같이 추가한다.
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin
...
fi
# colored GCC warnings and errors
#export GCC_COLORS='error=01;31:warning=01;35:note=01;36:caret=01;32:locus=01:quote=01'
# some more ls aliases
alias ll='ls -alF'
alias la='ls -A'
alias l='ls -CF'
# Add an "alert" alias for long running commands. Use like so:
# sleep 10; alert
alias alert='notify-send --urgency=low -i "$([ $? = 0 ] && echo terminal || echo error)" "$(history|tail -n1|sed -e '\''s/^\s*[0-9]\+\s*//;s/[;&|]\s*alert$//'\'')"'
# Alias definitions.
# You may want to put all your additions into a separate file like
# ~/.bash_aliases, instead of adding them here directly.
# See /usr/share/doc/bash-doc/examples in the bash-doc package.
if [ -f ~/.bash_aliases ]; then
. ~/.bash_aliases
fi
# enable programmable completion features (you don't need to enable
# this, if it's already enabled in /etc/bash.bashrc and /etc/profile
# sources /etc/bash.bashrc).
if ! shopt -oq posix; then
if [ -f /usr/share/bash-completion/bash_completion ]; then
. /usr/share/bash-completion/bash_completion
elif [ -f /etc/bash_completion ]; then
. /etc/bash_completion
fi
fi
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$PATH:$JAVA_HOME/bin
3. 변경 사항을 저장하고 .bashrc 파일을 닫은 다음, 변경 사항을 적용한다.
source ~/.bashrc
이렇게 하면 AWS의 Ubuntu EC2 인스턴스에 Java를 설치하고 설정하는 방법을 완료할 수 있다.
kafka 다운로드
1. wget 커맨드로 아파치 카프카를 다운로드 받는다.
wget https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz
2. 압축 파일을 해제한다.
tar xvf ./kafka_2.12-3.7.1.tgz
server.properties 수정
1. kafka_2.13-3.7.1/config/server.properties 내용중, 아래 빨간색 하이라트 부분을 수정한다.
vi ./server.properties
2. ZooKepper를 실행한다.
$ ./bin/zookeeper-server-start.sh -daemon /home/ubuntu/kafka_2.12-3.7.1/config/zookeeper.properties
Zookeeper 실행 확인 명령어는 아래와 같다.
$ jps -m
4954 프로세스는 Zookeeper 서버를 실행하는 Java 프로세스다. QuorumPeerMain 클래스는 Zookeeper 서버를 시작하는 메인 클래스다.
Kafka 실행
$ ./bin/kafka-server-start.sh -daemon /home/ubuntu/kafka_2.12-3.7.1/config/server.properties
Kafka 실행 확인 명령어는 아래와 같다.
$ jps -m
Kafka의 heap memory는 디폴트로 Kafka = 1GByte, Zookeeper=512MByte다.이는 현재 선택한 AWS EC2 프리티어 메모리가 1GByte 이기 때문에 위와 같은 에러가 발생한다. 그래서 Kafka와 Zookeeper의 힙 메모리를 .bashrc 파일에서 재설정해야 한다.
export KAFKA_HEAP_OPTS="-Xms300m -Xmx300m"
KAFKA_HEAP_OPTS 환경 변수를 시스템에 등록한다.
$ source ./.bashrc
Topic 생성 (카프카 브로커를 실행 후, 토픽을 생성해야 함)
$ bin/kafaka-topics.sh \
> --create \
> --bootstrap-server 54.180.100.188:9092 \
> --topic tacocloudorders
Topic 목록 확인하기
$ bin/kafka-topics.sh --bootstrap-server 54.180.100.188:9092 --list
Topic 정보 확인하기
$ bin/kafka-topics.sh --bootstrap-server 54.180.100.188:9092 --topic tacocloudorders --describe
Kafka의 토픽에 저장된 메시지 확인
Kafka의 토픽에 저장된 메시지를 확인하는 가장 일반적인 방법은 Kafka의 커맨드 라인 툴인 `kafka-console-consumer`를 사용하는 것이다. 이 도구를 사용하면 실시간으로 메시지를 소비하거나, 특정 시간에서부터의 메시지를 조회하는 등 다양한 방식으로 메시지를 확인할 수 있다.
kafka-console-consumer 사용법
1. 실시간으로 메시지 소비하기:
Kafka 설치 디렉토리의 bin 폴더 안에서 다음 명령어를 실행하여, 특정 토픽의 메시지를 실시간으로 확인할 수 있다. 이 명령은 새로운 메시지가 토픽에 도착할 때마다 해당 메시지를 출력한다.
./kafka-console-consumer.sh --bootstrap-server <broker-address>:<port> --topic <topic-name> --from-beginning
./kafka-console-consumer.sh --bootstrap-server 54.180.96.36:9092 --topic tacocloudorders --from-beginning
- <broker-address>:<port>: Kafka 브로커의 주소와 포트번호입니다. 예를 들어, 54.180.96.36:9092.
- <topic-name>: 메시지를 확인하고 싶은 토픽의 이름입니다. 예를 들어, tacocloud_orders.
- --from-beginning: 이 옵션을 사용하면 토픽의 시작부터 현재까지의 모든 메시지를 확인할 수 있다. 이 옵션 없이 명령어를 실행하면, 명령어를 실행한 시점 이후에 도착하는 메시지만 확인할 수 있다.
2. 특정 시간부터의 메시지 소비하기
Kafka 2.1.0 이상에서는 --offset 대신 --from-beginning 또는 --partition과 함께 --offset 옵션을 사용하여 특정 시점부터 메시지를 소비할 수 있다. 또한 kafka-console-consumer는 --timestamp 옵션을 지원하지 않으므로, 특정 시간으로부터 메시지를 조회하고자 할 때는 다른 도구나 API를 사용해야 한다.
topic 삭제
Kafka 토픽을 삭제하는 커맨드는 Kafka가 설치된 디렉토리 내의 bin 폴더에서 실행할 수 있는 kafka-topics.sh 스크립트를 사용하여 수행할 수 있다. 기본적인 커맨드 형식은 다음과 같다.
./kafka-topics.sh --delete --topic <토픽 이름> --bootstrap-server <브로커 목록>
여기서 <토픽 이름>에는 삭제하고자 하는 Kafka 토픽의 이름을, <브로커 목록>에는 Kafka 클러스터의 브로커들의 주소 목록을 콤마로 구분하여 입력한다. 예를 들어, 브로커가 localhost:9092에 실행 중이고, 삭제하고자 하는 토픽 이름이 my-topic이라면, 다음과 같이 명령을 실행할 수 있다.
./kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092
이 커맨드를 실행하기 전에 Kafka 설정에서 토픽 삭제가 활성화되어 있는지 확인해야 한다. Kafka의 server.properties 파일에서 delete.topic.enable=true 설정을 확인하거나 추가해야 할 수 있다. 이 옵션이 활성화되어 있지 않으면, 토픽을 삭제해도 실제로는 삭제되지 않고, 삭제 마킹만 되어 있게 된다.
또한, 토픽 삭제 후에는 해당 토픽에 관련된 데이터가 Kafka 클러스터의 브로커에서 실제로 삭제될 때까지 약간의 시간이 소요될 수 있다. 삭제 프로세스는 비동기적으로 수행됩니다.
Kafka 종료
./bin/kafka-server-stop.sh
해당 코드를 을 실행시키면 종료된다
주의사항
- kafka-console-consumer.sh 스크립트는 Kafka의 설치 경로 내 bin 디렉토리에 위치한다. 따라서 해당 스크립트를 실행하기 전에 Kafka 설치 디렉토리로 이동하거나, 전체 경로를 명시해야 한다.
- 실제 운영 환경에서는 토픽에 저장된 대량의 메시지를 모두 소비하는 것이 시간이 많이 걸릴 수 있으므로, 필요한 정보를 효율적으로 검색하기 위해 적절한 옵션을 선택하여 사용하는 것이 중요하다.