Apache Kafka 是由 LinkedIn 于 2011 年开源、后成为 Apache 顶级项目的分布式消息队列系统,由 Jay Kreps、Jun Rao 和 Neha Narkhede 创建。Kafka 是 实时数据流处理的事实标准。
Kafka 的核心定位是 高吞吐量的分布式消息系统。它提供了:
LinkedIn 在 2010 年面临活动流数据处理挑战,需要一种高吞吐、低延迟的消息系统。Jay Kreps、Jun Rao 和 Neha Narkhede 开发了 Kafka,2011 年开源,2012 年成为 Apache 顶级项目。
# 创建 Topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2
# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
# 生产者发送消息
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic my-topic
# 消费者消费消息
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning
# 查看消费者组
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
# 查看消费者组详情
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
// Kafka 生产者 Java 示例
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
// 创建生产者
Producer producer = new KafkaProducer<>(props);
// 发送消息
for (int i = 0; i < 100; i++) {
ProducerRecord record =
new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("发送成功: offset=%d, partition=%d%n",
metadata.offset(), metadata.partition());
} else {
exception.printStackTrace();
}
});
}
producer.close();
}
}
// Kafka 消费者 Java 示例
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
// 创建消费者
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 订阅 Topic
consumer.subscribe(Arrays.asList("my-topic"));
// 消费消息
try {
while (true) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
System.out.printf("收到消息: offset=%d, key=%s, value=%s%n",
record.offset(), record.key(), record.value());
// 手动提交偏移量
consumer.commitSync();
}
}
} finally {
consumer.close();
}
}
}
# 使用 kafka-python 库
from kafka import KafkaProducer, KafkaConsumer
import json
# 生产者
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all',
retries=3
)
# 发送消息
producer.send('my-topic', {'id': 1, 'name': 'Alice'})
producer.flush()
# 消费者
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False
)
# 消费消息
for msg in consumer:
print(f"收到消息: key={msg.key}, value={msg.value}")
consumer.commit()
// Spring Boot Kafka 配置
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory producerFactory() {
Map config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory consumerFactory() {
Map config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
// 使用
@Service
public class KafkaService {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("my-topic", message);
}
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("收到消息: " + message);
}
}
// Kafka Streams 示例
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.Properties;
public class WordCountStream {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// 读取输入流
KStream textLines = builder.stream("input-topic");
// 单词计数
KTable wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((key, word) -> word)
.count();
// 输出到 Topic
wordCounts.toStream().to("output-topic");
// 启动流处理
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
| 对比项 | Kafka | RabbitMQ | Pulsar |
|---|---|---|---|
| 吞吐量 | 极高 | 高 | 极高 |
| 延迟 | 低 | 极低 | 低 |
| 持久化 | ✅ 磁盘 | ✅ 可选 | ✅ 磁盘 |
| 消息顺序 | 分区内保证 | 队列内保证 | 分区内保证 |
| 流处理 | ✅ Kafka Streams | ❌ | ✅ Pulsar Functions |
| 适用场景 | 大数据流 | 通用消息 | 云原生流 |
Kafka 核心概念(Topic/Partition/Consumer Group)、安装部署
生产者/消费者开发、消息顺序、分区策略、副本机制
Kafka Streams、Kafka Connect、Exactly-Once 语义、KRaft
集群部署、性能调优、监控告警、生产环境最佳实践
Apache Kafka 是实时数据流的高速公路。
它用 高吞吐、持久化、分布式架构 重新定义了消息队列和流处理的标准。Kafka 是现代数据基础设施的核心组件,支撑着日志收集、实时监控、数据管道、流处理等关键场景。
无论你是后端开发者、数据工程师还是 DevOps,Kafka 都是数据流领域必须掌握的技术。
"Kafka 是实时数据生态的基石。" 🔄