Kafka概述

Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

消息队列的两种模式

点对点模式

一对一,消费者主动拉取数据,消息收到后消息清除。消息生产者生产消息发送到 Queue 中,然后消息消费者从 Queue 中取出并且消费消息。消息被消费以后,Queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

image-20220425222529896

发布/订阅模式

一对多,消费者消费数据之后不会清除消息。消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。该模式具体可分为拉和推两种方式:

  • 队列主动推送:各个消费者处理的速率不同,可能会导致消费者资源浪费或不足。
  • 消费者主动拉取:长轮询

image-20220425222547643

Kafka 基础架构

image-20220426141607627

  • Producer:消息生产者,就是向 Kafka broker 发消息的客户端
  • Consumer:消息消费者,向 Kafka broker 取消息的客户端
  • Consumer Group(CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不 影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  • Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  • Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服 务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
  • Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower
  • Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数 据的对象都是 Leader。
  • Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

Kafka快速入门

单机部署

1)解压安装包

tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

2)创建logs日志文件夹

mkdir ~/Application/kafka_2.12-3.0.0/logs

3)修改配置文件

vim kafka_2.12-3.0.0/config/server.properties
#broker 的全局唯一编号,不能重复 broker.id=0
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量 
num.network.threads=3
#用来处理磁盘 IO 的现成数量 
num.io.threads=8 
#发送套接字的缓冲区大小 
socket.send.buffer.bytes=102400 
#接收套接字的缓冲区大小 
socket.receive.buffer.bytes=102400 
#请求套接字的缓冲区大小 
socket.request.max.bytes=104857600 
#kafka 运行数据和日志存放的路径 
log.dirs=~/Application/kafka_2.12-3.0.0/logs
#topic 在当前 broker 上的分区个数 
num.partitions=1
#用来恢复和清理 data 下数据的线程数量 
num.recovery.threads.per.data.dir=1 
#segment 文件保留的最长时间,超时将被删除 
log.retention.hours=168
#配置连接 Zookeeper 集群地址 
zookeeper.connect=localhost:2181/kafka

4)依次启动zookeeper和kafka(Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2182)

cd ~/Applications/ZooKeeper
bin/zkServer.sh start
~/Application/kafka_2.12-3.0.0
sh bin/kafka-server-start.sh  -daemon  config/server.properties # -daemon 后台启动kafka

如kafka启动报错:在 server.properties 找到 log.dirs 配置的路径。将该路径下的所有文件删除,或者编辑meta.properties文件修改里面的cluster.id即可。

ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID kVSgfurUQFGGpHMTBqBPiw doesn't match stored clusterId Some(0Qftv9yBTAmf2iDPSlIk7g) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
    at kafka.server.KafkaServer.startup(KafkaServer.scala:220)
    at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
    at kafka.Kafka$.main(Kafka.scala:84)
    at kafka.Kafka.main(Kafka.scala)

主题命令行操作

1)查看操作主题命令参数

bin/kafka-topics.sh

image-20220426130141201

2)查看当前服务器中的所有 topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

3)创建 first topic

# --topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 3 --topic first

4)查看 first 主题的详情

bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic first

5)修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first --partitions 3

6)删除 topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic first

生产者命令行操作

1)查看操作生产者命令参数

bin/kafka-console-producer.sh

image-20220426140913384

2)发送消息

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
>hello world

消费者命令行操作

1)查看操作消费者命令参数

参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
–topic <String: topic> 操作的 topic 名称
–from-beginning 从头开始消费
–group <String: consumer group id> 指定消费者组名称

2)消费消息

# 消费 first 主题中的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
# 把主题中所有的数据都读取出来(包括历史数据)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first