개요
Kafka는 대규모 데이터 처리와 실시간 스트리밍을 위한 강력한 메시징 시스템으로 널리 사용됩니다. 이를 운영하고 관리하려면 Kafka 클러스터의 토픽, 브로커, ACL, 설정 등을 효율적으로 제어할 수 있는 관리 도구가 필요합니다.
흔히 카프카를 사용할 때, 토픽을 새로 생성하거나, 파티션 수를 증가시키는 작업은 카프카 클러스터 내부 브로커에서 직접 sh 파일을 실행시켜서 수정을 진행합니다. 또는 Consumer Group의 offset을 확인한다던지, Topic의 로그 압축 방식을 확인하여, 시스템 요구사항에 맞게 설정해야할 경우, 번거롭게 내부의 shell file을 실행해야합니다.
예를 들어, 애플리케이션이 특정한 토픽에 이벤트를 써야한다고 가정합니다.
이 가정은 이벤트를 쓰기전에 토픽이 존재해야한다는 것을 의미합니다.
Admin Client가 존재하기 전까지, 이것을 확인하는 방법은 거의 없었고, 사용자 친화적인 툴도 존재하지 않았습니다.
확인하려면, producer.send() 를 호출하고, UNKNOWN_TOPIC_OR_PARTITIONS 예외가 발생할 경우, 관리자에게 해당 토픽을 생성하라고 알리거나, 카프카 클러스터에 자동 토픽 생성이 켜져있기를 기대해야했습니다.
하지만 Admin Client가 추가되면서, 이러한 과정들을 쉽게 처리하게 될 수 있었습니다.
해당 글에서는 Admin Client의 정의와 역할, Java와 Spring에서 Admin Client를 다루기 위해서 사용하는 Kafka Admin의 정의와 역할, 간단한 코드와 함께 설명할 예정입니다.
Admin Client란?
Admin Client의 정의는 Kafka Cluster를 관리하기 위해 Kafka Client 라이브러리에서 제공하는 Java API입니다.
Kafka Cluster의 토픽, 브로커, ACL(Access Control List), ConfigResources 등을 관리하거나, 상태를 조회할 때 사용됩니다.
Kafka Cluster와 직접 통신하며, Kafka Client 개발시 세부적인 관리 작업을 수행할 수 있도록 도와줍니다.
Kafka의 Admin Client를 이해할 때 가장 중요한 점은 “비동기적(Asynchronous)”으로 동작한다는 점입니다.
각 메서드는 요청을 클러스터 컨트롤러로 전달한 후 바로 1개 이상의 Future 객체를 반환합니다.
Future 객체는 비동기 작업의 결과물로, 비동기 작업의 결과를 확인하거나, 취소하거나, 대기하거나, 응답이 왔을 때 실행할 함수를 지정하는 메서드를 갖고있습니다.
Kafka의 Admin Client는 Future 객체를 Result 객체 안에 감싸는데, Result 객체는 작업이 끝나는 것을 대기하거나, 결과에 대해서 일반적으로 뒤이어 쓰이는 작업을 수행하는 헬퍼 메서드를 갖고있습니다.
예를 들면 listTopics라는 메서드를 호출하면, ListTopicResult라는 결과값을 리턴하게됩니다.
해당 결과값은 비동기 함수의 결과값이므로, get()을 사용하여 완료될 때까지 대기한 후 결과값을 반환할 수 있습니다.
카프카 컨트롤러로부터 브로커로의 메타데이터 전파가 비동기적으로 일어나기때문에 AdminClient API가 리턴하는 객체들은 모든 브로커들의 메타데이터가 업데이트 된 이후에 작업이 완료된 것으로 간주됩니다.
만약 createTopics 메서드를 비동기적으로 호출한 후, listTopics를 호출하게 된 경우, 모든 브로커에 업데이트가 이루어지지 않았다면, listTopics는 최신상태가 아닌 브로커에 의해서 업데이트 되지 않은 topic들이 나오게 될 수 있습니다.
최종적으로는 모든 브로커에서 업데이트가 발생할 것이지만, 언제 될지에 대해서는 아무런 보장을 할 수 없습니다.
이러한 속성을 “최종적 일관성(Eventual Consistency)”이라고 합니다.
Kafka Admin이란?
Kafka Admin은 Spring Kafka 라이브러리에서 제공하는 Kafka Cluster 관리 도구로, Spring 애플리케이션에서 Kafka의 자원을 더 쉽게 생성하고 관리할 수 있도록 설계되었습니다.
내부적으로 Kafka 클라이언트의 Admin Client를 사용하지만, Spring의 설정과 통합하여 더 간단하고 직관적으로 사용할 수 있게 만들어졌습니다.
Kafka Admin은 애플리케이션 초기화 단계에서 필요한 Kafka 자원을 동적으로 생성하거나, 간단한 설정만으로 Kafka 클러스터와 상호작용할 수 있는 편의성을 제공합니다.
Kafka Admin을 통해서 다음과 같은 작업을 수행할 수 있습니다.
- 애플리케이션 시작 시, 미리 정의된 토픽을 자동으로 생성합니다.
- 미리 생성된 토픽의 설정 값을 변경합니다. ex) partition 수, replication factor, retention.ms 등등
- 필요하지 않은 토픽을 삭제할 수 있습니다.
하지만 아무리 spring 친화적이고, 잘 통합이 되어있더라도, Kafka Admin만 사용하는 것에는 한계가 있습니다.
고급 설정같은 내용들은 Admin Client를 사용해야하는데, 다행이도 Kafka Admin의 설정 값을 추출해서 Admin Client 객체를 쉽게 만들어서 사용할 수 있습니다.
예시 코드
설정
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}
기존에 Value 어노테이션으로 받던 bootstrapServers를 그대로 사용해서 KafkaAdmin Bean을 생성합니다.
서비스
@Component
@RequiredArgsConstructor
public class ProducerServiceImpl implements ProducerService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final KafkaAdmin kafkaAdmin;
...
}
다음과 같이 서비스로직(?)에서 의존성을 주입받아서 사용합니다.
토픽 생성
@Override
public void createTopic(CreateTopicRequest request) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
adminClient.createTopics(
List.of(new NewTopic(request.getTopic(), request.getPartitions(),
request.getReplicationFactor()))
);
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
}
Admin Client를 생성할 때, kafkaAdmin.getConfigurationProperties() 를 통해서 설정 값을 통해 static method인 create 메서드를 호출합니다.
이를 통해 매번 admin client에 대한 설정값을 넣어주지 않아도 됩니다.
하지만 이렇다고 해서 그냥 AdminClient를 하나만 만들어 놓고 사용해서는 안됩니다. 이는 다음 절에서 설명할 예정입니다.
adminClient의 createTopics()를 할 때, 메서드 명으로 유추할 수 있듯이, List 타입으로 파라미터를 제공해야합니다.
해당 토픽을 생성할 때, 토픽의 이름, 파티션 수, replicatation factor를 지정해서 생성할 수 있습니다.
토픽 리스트 확인
@Override
public Set<String> listTopics() {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
ListTopicsResult topics = adminClient.listTopics();
return topics.names().get(); // 토픽 이름 리스트 반환
} catch (InterruptedException | ExecutionException e) {
log.info("error : {}", e.getMessage());
}
return Collections.emptySet();
}
adminClient.listTopics() 를 통해서 연결된 카프카 클러스터에 존재하는 모든 토픽의 이름을 반환합니다.
여기서 살펴볼 점은 get() 입니다. ListTopicsResult는 비동기 메서드의 결과 값인데, 해당 객체는 Future 객체를 래핑하고 있으므로, get()메서드를 호출할 때, 예외 처리가 필요합니다.
해당 코드에서는 따로 예외를 발생시키지는 않고, log를 출력하였습니다.
토픽 삭제
@Override
public void deleteTopic(String topic) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
adminClient.deleteTopics(List.of(topic));
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
}
adminClient.deleteTopics() 메서드를 통해서 토픽을 삭제합니다. listTopics와 마찬가지로, List 타입으로 입력을 받아서 처리합니다.
토픽 상세 조회
@Override
public Map<String, Object> describeTopic(String topic) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(List.of(topic));
TopicDescription description = describeTopicsResult.topicNameValues().get(topic).get();
return Map.of(
"name", description.name(),
"isInternal", description.isInternal(),
"partitions", description.partitions().stream().map(partitionInfo -> Map.of(
"partition", partitionInfo.partition(),
"leader", partitionInfo.leader().id(),
"replicas", partitionInfo.replicas().stream().map(Node::id).toList(),
"isr", partitionInfo.isr().stream().map(Node::id).toList()
)).toList()
);
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
return Collections.emptyMap();
}
adminClient.describeTopics() 를 통해서 DescribeTopicsResult를 가져오고, topicNameValues() 메서드를 통해서 데이터를 추출합니다.
DescribeTopicsResult는 위의 사진과 같이, topicIdFutures와 nameFutures를 멤버 변수로 갖고있는데, 이름 뒤에 붙은 Future는 비동기 메서드의 결과 값임을 나타냅니다.
topicNameValues()를 통해서 nameFuture를 가져오게 되고, 확인하고자하는 토픽 명으로 get()을 호출하여 정보를 추출합니다.
실행 결과는 다음과 같습니다.
{
"isInternal": false,
"partitions": [
{
"isr": [
1,
2,
0
],
"leader": 1,
"partition": 0,
"replicas": [
1,
2,
0
]
},
{
"isr": [ // in-sync replication
2,
0,
1
],
"leader": 2,
"partition": 1,
"replicas": [
2,
0,
1
]
},
{
"isr": [
0,
1,
2
],
"leader": 0,
"partition": 2,
"replicas": [
0,
1,
2
]
}
],
"name": "create"
}
토픽 설정 상세 조회
@Override
public Map<String, String> describeTopicConfig(String topic) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
List.of(configResource));
Config config = describeConfigsResult.all().get().get(configResource);
return config.entries().stream()
.collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
return Collections.emptyMap();
}
설정 관리는 ConfigResource를 통해서 사용할 수 있는데, 설정 가능한 자원에는 브로커, 브로커 로그, 토픽이 있습니다.
모두 비동기의 결과이기 때문에, get() 메서드를 사용해서 요청이 처리될 때까지 대기하다가 데이터를 가져오게됩니다.
실행결과는 다음과 같습니다.
{
"compression.type": "producer",
"leader.replication.throttled.replicas": "",
"remote.storage.enable": "false",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"local.retention.ms": "-2",
"cleanup.policy": "delete", //cleanup policy가 삭제로 되어있음.
"flush.ms": "9223372036854775807",
"follower.replication.throttled.replicas": "",
...
}
토픽 설정 변경
@Override
public void alterTopicConfig(String topic, Map<String, String> configs) {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
// 설정 키가 허용된 키인지 확인
if (!ALLOWED_CONFIG_KEYS.containsAll(configs.keySet())) {
throw new IllegalArgumentException("Not allowed config key");
}
// AlterConfigOp 리스트 생성
List<AlterConfigOp> configOps = configs.entrySet().stream()
.map(entry -> new AlterConfigOp(
new ConfigEntry(entry.getKey(), entry.getValue()),
AlterConfigOp.OpType.SET // 설정 추가/수정
))
.toList();
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, configOps)).all()
.get();
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
}
토픽 설정을 변경하는 부분은 카프카 클러스터에 큰 영향을 끼칠 수 있으므로, 변경할 수 있는 키를 따로 관리하여 하는 게 좋다고 생각했습니다. 해당 코드에서는 앞으로 변경할 수 있는 코드에 대해서만 추가하여, retention.ms, cleanup.policy, min.insync.replicas만 포함했습니다.
OpType에는 DELETE와 SET, APPEND, SUBTRACT가 존재합니다. 요구사항과 구현하려고 하는 목적에 따라 다르게 설정하면 될 것같습니다.
위와 같이 API를 설계해서 cleanup.policy를 delete에서 compact 로 변경하는 요청을 보내보면
{
"compression.type": "producer",
"leader.replication.throttled.replicas": "",
"remote.storage.enable": "false",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"local.retention.ms": "-2",
"cleanup.policy": "compact", // <- 변경된 Config 값
설정값이 compact로 변경된 것을 확인할 수 있습니다.
컨슈머 그룹 리스트 조회
@Override
public Map<String, Object> listConsumerGroups() {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
return adminClient.listConsumerGroups()
.all().get().stream()
.collect(Collectors.toMap(
ConsumerGroupListing::groupId,
group -> Map.of(
"state", group.state()
)
));
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
return Collections.emptyMap();
}
Kafka Cluster에 어떤 Consumer Group이 존재하는지 조회합니다.
실행결과는 다음과 같습니다.
{
"my-group": {
"state": "STABLE"
}
}
컨슈머 그룹 상세 조회
@Override
public Map<String,Object> describeConsumerGroup(String groupId) {
try(AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())){
Map<String, ConsumerGroupDescription> groupDescriptionMap = adminClient.describeConsumerGroups(
Collections.singletonList(groupId)
).all().get();
ConsumerGroupDescription consumerGroupDescription = groupDescriptionMap.get(groupId);
return Map.of(
"groupId", consumerGroupDescription.groupId(),
"state", consumerGroupDescription.state(),
"is simple", consumerGroupDescription.isSimpleConsumerGroup(),
"members", consumerGroupDescription.members().stream().map(memberDescription -> Map.of(
"clientId", memberDescription.clientId(),
"host", memberDescription.host(),
"assignment", memberDescription.assignment().topicPartitions().stream().map(tp -> Map.of(
"topic", tp.topic(),
"partition", tp.partition()
)).toList()
)).toList()
);
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
return Collections.emptyMap();
}
특정 Consumer Group에 대해서 어떤 컨슈머들이 등록되어 있는지 조회합니다.
member에는 clusterId, hostIP와 topic과 partition에 대한 정보를 조회할 수 있습니다.
실행결과는 다음과 같습니다.
{
"is simple": false,
"state": "STABLE",
"groupId": "my-group",
"members": [
{
"assignment": [
{
"topic": "my-topic",
"partition": 0
}
],
"clientId": "consumer-my-group-1",
"host": "/192.168.65.1"
}
]
}
컨슈머 그룹 Offsets 리스트 조회
@Override
public Map<String, Object> listConsumerGroupOffsets(String groupId) {
try(AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())){
return adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get().entrySet().stream()
.collect(Collectors.toMap(
entry -> entry.getKey().topic(),
entry -> Map.of(
"offset", entry.getValue().offset(),
"partition", entry.getKey().partition(),
"metadata", entry.getValue().metadata()
)
));
} catch (Exception e) {
log.info("error : {}", e.getMessage());
}
return Collections.emptyMap();
}
특정 Consumer group의 metadata와 offset, partition에 대한 정보를 조회할 수 있습니다.
실행결과는 다음과 같습니다.
{
"my-topic": {
"metadata": "",
"offset": 8,
"partition": 0
}
}
주의할 점
설정 값을 수정하는 경우에는 상당히 조심스럽게 다뤄야합니다.
만약 옳지 않은 값을 넣게 되는 경우 Kafka가 비정상적으로 동작할 수도 있기때문에, 메서드 호출 시 매개변수에 대한 검증이 필요합니다.
예제 코드 중 토픽의 설정 값을 바꾸는 코드의 경우, 허용하고자하는 설정 key 값들을 지정해둠으로써 처리했습니다.
정리
Kafka의 Admin Client와 Spring에서 wrapping한 Kafka Admin에 대해서 살펴봤습니다.
시스템의 요구 사항에 따라서 보다 정밀하게 Kafka Cluster를 제어할 필요가 있을 때, 사용하면 매우 유용할 것같습니다.
'공부' 카테고리의 다른 글
[Elastic Search] Elastic Search란? (1) | 2024.10.24 |
---|---|
정규 표현식 Regular Expression (0) | 2024.06.23 |