返回主页 学习路径
Apache Kafka
高吞吐 · 持久化 · 分布式流处理
Apache Kafka 是由 LinkedIn 于 2011 年开源、后成为 Apache 顶级项目的分布式消息队列系统,由 Jay Kreps、Jun Rao 和 Neha Narkhede 创建。Kafka 最初用于 LinkedIn 的活动流数据处理,如今已成为实时数据流处理的事实标准。Kafka 以高吞吐、低延迟、持久化、分布式和水平扩展能力著称,被超过 80% 的财富 500 强企业用于日志收集、实时监控、数据管道和流处理等场景。
分布式消息队列之王 · 实时数据流
📅 诞生时间2011年 · LinkedIn / Apache
🧩 类型分布式消息系统 · 流处理
📊 语言Java / Scala / Python / Go
⚡吞吐量
9/10
📦生态
10/10
🧠易用
6/10
🚀扩展性
8/10

📑 本文目录

📌 第一部分:Apache Kafka 概览与定位

1.1 定义与全称

Apache Kafka 是由 LinkedIn 于 2011 年开源、后成为 Apache 顶级项目的分布式消息队列系统,由 Jay Kreps、Jun Rao 和 Neha Narkhede 创建。Kafka 是 实时数据流处理的事实标准

1.2 核心定位

Kafka 的核心定位是 高吞吐量的分布式消息系统。它提供了:

1.3 主要应用领域

1.4 知名案例


📜 第二部分:Apache Kafka 的历史与发展演进

2.1 诞生背景(2011年)

LinkedIn 在 2010 年面临活动流数据处理挑战,需要一种高吞吐、低延迟的消息系统。Jay Kreps、Jun Rao 和 Neha Narkhede 开发了 Kafka,2011 年开源,2012 年成为 Apache 顶级项目。

2.2 关键版本里程碑

2.3 核心概念


⚙️ 第三部分:核心操作

3.1 基础命令

# 创建 Topic
kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --topic my-topic \
    --partitions 3 \
    --replication-factor 2

# 查看 Topic 列表
kafka-topics.sh --list --bootstrap-server localhost:9092

# 查看 Topic 详情
kafka-topics.sh --describe \
    --bootstrap-server localhost:9092 \
    --topic my-topic

# 生产者发送消息
kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic my-topic

# 消费者消费消息
kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic my-topic \
    --from-beginning

# 查看消费者组
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list

# 查看消费者组详情
kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --group my-group \
    --describe

3.2 Java 生产者示例

// Kafka 生产者 Java 示例
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");  // 等待所有副本确认
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);

        // 创建生产者
        Producer producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord record =
                new ProducerRecord<>("my-topic", "key-" + i, "message-" + i);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("发送成功: offset=%d, partition=%d%n",
                        metadata.offset(), metadata.partition());
                } else {
                    exception.printStackTrace();
                }
            });
        }

        producer.close();
    }
}

3.3 Java 消费者示例

// Kafka 消费者 Java 示例
import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");

        // 创建消费者
        KafkaConsumer consumer = new KafkaConsumer<>(props);

        // 订阅 Topic
        consumer.subscribe(Arrays.asList("my-topic"));

        // 消费消息
        try {
            while (true) {
                ConsumerRecords records =
                    consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord record : records) {
                    System.out.printf("收到消息: offset=%d, key=%s, value=%s%n",
                        record.offset(), record.key(), record.value());

                    // 手动提交偏移量
                    consumer.commitSync();
                }
            }
        } finally {
            consumer.close();
        }
    }
}

3.4 Python 示例

# 使用 kafka-python 库
from kafka import KafkaProducer, KafkaConsumer
import json

# 生产者
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',
    retries=3
)

# 发送消息
producer.send('my-topic', {'id': 1, 'name': 'Alice'})
producer.flush()

# 消费者
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=['localhost:9092'],
    group_id='my-group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    enable_auto_commit=False
)

# 消费消息
for msg in consumer:
    print(f"收到消息: key={msg.key}, value={msg.value}")
    consumer.commit()

3.5 Spring Boot 集成

// Spring Boot Kafka 配置
@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory producerFactory() {
        Map config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        Map config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public KafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

// 使用
@Service
public class KafkaService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send("my-topic", message);
    }

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("收到消息: " + message);
    }
}

3.6 Kafka Streams 流处理

// Kafka Streams 示例
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;

import java.util.Properties;

public class WordCountStream {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsBuilder builder = new StreamsBuilder();

        // 读取输入流
        KStream textLines = builder.stream("input-topic");

        // 单词计数
        KTable wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((key, word) -> word)
            .count();

        // 输出到 Topic
        wordCounts.toStream().to("output-topic");

        // 启动流处理
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // 优雅关闭
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

⚖️ 第四部分:Kafka vs RabbitMQ vs Pulsar

对比项 Kafka RabbitMQ Pulsar
吞吐量极高极高
延迟极低
持久化✅ 磁盘✅ 可选✅ 磁盘
消息顺序分区内保证队列内保证分区内保证
流处理✅ Kafka Streams✅ Pulsar Functions
适用场景大数据流通用消息云原生流

🧠 第五部分:学习建议

1
基础入门

Kafka 核心概念(Topic/Partition/Consumer Group)、安装部署

2
核心进阶

生产者/消费者开发、消息顺序、分区策略、副本机制

3
高级特性

Kafka Streams、Kafka Connect、Exactly-Once 语义、KRaft

4
实战与运维

集群部署、性能调优、监控告警、生产环境最佳实践

推荐学习资源


🎯 总结升华

Apache Kafka 是实时数据流的高速公路。

它用 高吞吐、持久化、分布式架构 重新定义了消息队列和流处理的标准。Kafka 是现代数据基础设施的核心组件,支撑着日志收集、实时监控、数据管道、流处理等关键场景。

无论你是后端开发者、数据工程师还是 DevOps,Kafka 都是数据流领域必须掌握的技术

"Kafka 是实时数据生态的基石。" 🔄

🔖 相关标签
#消息队列 #流处理 #分布式 #Kafka #大数据 #实时数据 #日志收集
📄 本文档为 Apache Kafka 完整白皮书 · 最后更新于 2026年06月28日