一、Windows 搭建 Kafka 环境

先实践再学原理,配置流程见参考链接

示例中创建了一个名叫 aurora_test 的 Topic 主题,通过该主题模拟生产者 producer 生成消息、消费者 consumer 消费信息的完整过程。

先启动 zookeeper 服务,然后再启动 kafka 服务

创建一个名为 aurora_test 的主题指令

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 5 --topic aurora_test

模拟生产者生成消息指令

kafka-console-producer.bat --broker-list localhost:9092 --topic aurora_test

启动消费者消费数据指令

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic aurora_test --from-beginning

本地运行后结果如图:

消费中文字符会出现编码异常问题

参考链接:https://cloud.tencent.com/developer/article/2393694

二、Kafka 原理讲解

1. Kafka 的作用

Kafka 是一种消息队列,消息队列的任务就是将消息从 生产者方 producer 发送给 消费者方 consumer 。生产者方将消息生产完成后准备就绪,但是消费者方不一定准备好,此时需要一个中间件来存储这些消息,Kafka 在其中就担任了存储的功能。换言之,生产者方将消息写给 Kafka, 消费者方准备好再由 Kafka 一条一条的将消息推送给消费者方。

不管是 生产者方 producer消费者方 consumer ,还是存储中间件 Kafka 都支持分布式。它们三者都可以有多台服务器,且服务器的数目可以随时的增加或减少,都不影响正常的数据流通。

2. Kafka 集群内部讲解

上面提到 Kafka 支持分布式,可以部署多台服务器构建分布式集群。根据实际情况,选择每台服务器只启动一个 Kafka 实例,然后配合多台服务器搭建集群;也可以选择单台服务器启动多个 Kafka 实例搭建 Kafka 集群。上述两种部署方式根据情况而定,实践中一般是一台服务器上启动一个 Broker。下图展示的是 Kafka 集群内部情况。

Broker 名词解释:Broker就是 Kafka 实例、Kafka 进程的意思,每个 Kafka 进程单独占用一个端口号。

Topic 名词解释:某一种业务数据称为一个 Topic 。比方说,对于淘宝这样一个电商网站,用户进来后就是一个推荐流,推荐流上面用户的每个点击行为 user_click,这种点击数据称为 Topic 。用户点击行为 Topic 可以被多个团队来消费。推荐算法团队要实时地去消费读取这个数据,同时广告团队也要实时地去消费读取这个数据,所以此时我们用 Topic 来标记这种业务数据。

Partition 名词解释:一个 Topic 业务数据分成多个 Partition。分成多份的作用是为了分散负载,通过轮询等算法实现负载均衡。比方说,对于推荐算法团队内部起了三台服务器来消费用户点击数据,那么我们的 Kafka 集群可以起五台服务器。显然,起的服务器数量越多,消费者方消费消息的速度越快。随之而来的问题是原始的这份数据如何分发给这些消费者呢?解决方法就是将原始数据切分成一份一份的 Partition。消费者方从不同的 Partition 中读取数据,提高了消费者的吞吐量;生产者方也是向不同的 Partition 中写入数据,提高生产者的吞吐量。同一个 Partition 数据又有多个备份,一个 Leader(主 Partition) 配备多个 Follower(从 Partition)。在 Partition 的主从模式下,一个 Partition-Leader 读写失败,可以启动其它备份 Partition,提高可靠性。

深入 Partition 进行讲解。比方说,此时有 用户点击 这份 Topic 数据,然后分成了五份 Partition 数据,有两个 Consumer 来消费数据,示意图如下。

通过某种负载均衡算法,一个 Partition 只能由一个 Consumer 来消费,一个 Consumer 可以消费多个 Partition。所以 Consumer 的数量小于 Partition 数时才有意义。

Consumer 数量越多,吞吐越高,消费地越快。

Consumer 增加或减少时,Partition 和 Consumer 的对应关系会自动调整(例如使用 HashRing 算法)。

消费者方 Consumer 可以分组,一个 Group 对应一个使用数据的业务方。比如说浏览器推荐流用户的点击日志,推荐团队需要消费,广告团队也需要消费,这样推荐团队是 Group1,广告团队是 Group2。

注意:每个 Group 消费一份完整地 Topic 数据,见下图。

一个 Partition 内部的消息是有序的,越新的消息 offset 越大。通过 offset 只能比较同一个 Partition 内部的不同消息,但是 Partition 之间的消息无法通过 offset 来比较。

Consumer 顺序地消费 Partition 中的每一条消息,可以每读一条就向 Kafka 上报(commit)一次当前读到了哪个位置(offset),也可以间隔性上报(每读多少条上报一次,或每隔多长时间上报一次)。

Consumer 重启时 Kafka 根据该 Group 上一次 commit 的最大 offset,决定从哪个地方开始消费。

前面提到生产者方 Producer 向一个 Topic 中的多个 Partition 中写入数据,那么 Producer 写数据时如何选择写哪个 Partition ?

Producer 写数据时都是以 key-value 键值对的形式写入到 Kafka 中的。写入方式大致分为三种:

1.Producer 可以显式指定 Partition。

2.没有指定 Partition 时根据消息的 key 通过哈希算法选择一个 Partition。

3.既没指定 Partition,消息又没有 key 时,按照轮询的方式选择 Partition。

上图展示从生产者方向 Kafka 写入数据的流程:

  • 步骤1,Producer 1 向 Kafka 集群展开询问,获取 Topic A 的 Partition 0 的 Leader;
  • 步骤2,Producer 1 把数据写入 Leader,Leader 再把数据写入本地磁盘保存;
  • 步骤3,Follower 从 Leader 中拉取数据,然后写入对应 Follower 的本地磁盘中保证数据一致性;
  • 步骤4,Follower 向 Leader 返回 ACK,表示确认成功;
  • 步骤5,Leader 向 Producer 1 返回 ACK,表示确认成功。

上述 5 个步骤在实际使用中可以进行简化。比方说,可以对 Producer 进行设置,正常情况下需要 Leader 向 Producer 发送 ACK 确认消息;但是也可以不需要 ACK 确认消息,认为 Producer 的写入全部成功了,只需要继续进行下一次写入即可。

三、Golang 操作 Kafka

这部分内容主要讲解的是如何使用 Go 语言向 Kafka 写入数据和读取数据。使用的 package 是 github 上的

开源项目 kafa-go,链接为 https://github.com/segmentio/kafka-go 。除了写入和读取数据方法,该包还有很多其它的操作方法,可以进一步研究。

生产消息示例代码:

// 声明 Topic
var topic = "user_list"

// 生产消息
func writeKafka(ctx context.Context) {
	//创建一个 Writer 对象,配置相应参数
        writer := kafka.Writer{
		Addr:                   kafka.TCP("localhost:9092"),//本机 Kafka 地址和端口号
		Topic:                  topic, //Kafka 主题名
		Balancer:               &kafka.Hash{},//Producer与多个 Partition 之间采用哈希
		WriteTimeout:           1 * time.Second, //写入等待超时时间为 1 s
		RequiredAcks:           kafka.RequireNone, //不需要 ACK 返回
		AllowAutoTopicCreation: true, //一般情况下设置为 false,表示 Topic 不存在也不会主动创建 topic,创建 Topic 的工作是交给运维来做的。
	}
	defer writer.Close()

        // for 循环的作用是重试,写失败后最多重试 3 次
	for i := 0; i < 3; i++ {
		if err := writer.WriteMessages(
			ctx,
			kafka.Message{Key: []byte("1"), Value: []byte("梦")},
			kafka.Message{Key: []byte("1"), Value: []byte("塔")},
			kafka.Message{Key: []byte("1"), Value: []byte("世")},
			kafka.Message{Key: []byte("1"), Value: []byte("界")},
		); err != nil {
			if err == kafka.LeaderNotAvailable {
				time.Sleep(500 * time.Millisecond)
				continue
			} else {
				fmt.Printf("批量写入Kafka失败:%v\n", err)
			}
		} else {
			break
		}
	}
}

// 消费消息
func readKafka(ctx context.Context) {
        //创建一个 Reader 对象,配置相应参数
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        []string{"localhost:9092"},
		Topic:          topic,
		CommitInterval: 1 * time.Second,
		GroupID:        "rec_team",
		StartOffset:    kafka.FirstOffset,
	})

        // for 死循环,保证消费者方一直在等待消费
	for {
		if message, err := reader.ReadMessage(ctx); err != nil {
			fmt.Printf("读Kafka失败:%v\n", err)
			break
		} else {
			fmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n", message.Topic, message.Partition, string(message.Key), string(message.Value))
		}
	}

}

// 需要监听信息 2 和 15 ,当收到信号时关闭 reader
func listenSignal() {
	c := make(chan os.Signal, 1)
	signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
	sig := <-c
	fmt.Printf("接收到信号 %s", sig.String())
	if reader != nil {
		reader.Close()
	}
	os.Exit(0)
}

func main() {

	ctx := context.Background()
	// writeKafka(ctx)

	go listenSignal()
	readKafka(ctx)

}

输出结果:

执行 go run main.go 命令

跑程序之前忘了起 Kafka 服务,先把 Kafka 服务开启,按照本文第一部分内容开启服务。

先启动 ZooKeeper 服务:服务端启动指令 zkServer;客户端连接指令 zkCli;

再启动 Kafka 服务:启动服务指令 .\bin\windows\kafka-server-start.bat .\config\server.properties;

OK,服务部署好,执行写逻辑代码 go run main.go

查询主题指令:kafka-topics.bat –bootstrap-server localhost:9092 –list

显示内容中除了前文演示 windows 操作系统下连接 Kafka 服务时创建的 aurora_test 主题外,还有本次做测试创建的 user_click 主题。

注释掉写逻辑代码,执行读逻辑代码 go run main.go

结果表明,消费者方确实读取到了生产者方写入 Kafka 的数据,并且光标一直存在,等待生产者方继续写入消息。

Kafka 消费顺序、消息丢失和重复消费总结

  • Kafka 如何保证消息的消费顺序?

在实际的业务场景中,有时需要严格保证消息队列中消息的消费顺序。比如,同时发送两个消息,这两个消息对应的数据库操作为:

消息1:更改用户会员等级。

消息2:根据会员等级计算订单价格。

如果这两条消息消费顺序不一样,就会导致最终结果截然不同。

Kafka 中 的 Partition(分区) 就类似消息队列,是真正存放消息的地方。而 Partition(分区) 又存在于 Topic(主题) 中,一个 Topic 可以指定多个 Partition 。

生产方每次向 Partition 添加消息的时候都会加到其末尾,如上图所示。所以对于单个 Partition 中的消息, Kafka 是可以保证消息有序的。具体原理是:消息在被追加到 Partition 的时候都会分配一个特定的 offset(偏移量),Kafka 通过 offset 来保证消息在该 Partition 内的顺序性。

但是如果以这种指定某一个 Partition 的方式去生产消息,虽然解决了消费顺序的问题,但是此时所有的消息都灌到一个 Patition 中,忽略了 Kafka 可以给 Topic 指定多个 Partition ,通过负载均衡的思想提供的并发能力设计,违反了 Kafka 的设计初衷。

那么如何利用 Kafka ,既能考虑到多 Partition 的并发能力,又能保证消息的消费顺序?

首先来看,Kafka 是如何将消息写入 Partition 的。生产方通过 Kafka 发送消息的时候,Kafka 提供了四个参数可以指定,分别是 Topic,Partition,Key,Data 。

如果消息指定了某个 Partition ,所有消息都会被发送到指定的 Partition,此时的 Kafka 会完全忽略 Key 的分区计算逻辑。无论这批消息的 Key 是否相同,消息都会进入指定的 Partition,这种方式虽然保证了消息的消费顺序,但是没有考虑到多 Partition 的并发能力 。

如果不指定 Partition,Kafka 会根据 Key 的哈希算法自动选择 Partition。对于相同的 Key,这些消息会被分配到同一个 Partition,适用于按 Key 有序的场景;对于不同的 Key,这些消息会尽量均匀分布到所有的 Partition。所以发送一批消息的时候指定同一个 Key 可以满足消费顺序的需求。

如果既不指定 Partition,也不传入 Key,Kafka 就会按照轮询分区算法(Kafka 2.4 版本以前默认为轮询分区算法)或者粘性分区算法(Kafka 2.4 版本以后默认为粘性分区算法)均匀分配给 Partition。按轮询的方式将消息分配给 Partition 就无法保证消息的顺序消费了,而按粘性分区的方式还是可以保证消息的顺序性的。

轮询分区算法:生产者会将消息均匀的放入每个 partition 中。每个 patition 要求消息达到一定批次(batch.size)后或达到一定时间(linger.ms)后才会将整个批次的数据写入 kafka,导致消息写入 kafka 的性能比较差。
粘性分区算法:生产方随机选择一个 partition ,将所有消息都尽可能写入到该 partition 中,达到一定批次(batch.size)后或达到一定时间(linger.ms)后将该 partition 批次的数据写入 kafka。好处是提升消息写入 kafka 的性能。
batch.size与linger.ms参数:batch.size 默认为 16 kb,linger.ms 默认为 0 s。

以上所说的保证消息的消费顺序,都是针对一个 Partition 内的消息。对于不同 Partition 之间的消息顺序,Kafka 是没办法保证的,这一点要特意指出。

总结来说:如何保证 Kafka 中消息消费的顺序,主要有三个方法:

方法一:在使用 Kafka 中间件之初,就明确一个 Topic 只分配一个 Partition 。(虽解决问题,但是忽视了 Kafka 的多 Partition 机制带来的并发能力,违反了 Kafka 设计初衷)

方法二:发送消息的时候指定 Partition 。(虽解决问题,但是忽视了 Kafka 的多 Partition 机制带来的并发能力,违反了 Kafka 设计初衷)

方法三:(推荐)发送消息的时候指定 Key(按照 Key 的哈希分片算法分配 Partition )。(能考虑到多 Partition 的并发能力,又能保证消息的消费顺序

相关阅读:

Kafka常见问题总结 | JavaGuide

【kafka系列】Topic 与 Partition – 技术栈

  • Kafka 如何保证消息的不丢失?
  1. 生产者丢失消息

生产者发送消息之后,消息可能因为网络问题并没有发送成功。因此不能默认生产者发送消息之后消息肯定发送成功。为了确保消息一定发送成功,需要判断消息发送的结果。生产者发送消息实际上是异步的操作,我们可以设置一个回调函数,在保持异步的状态下获取消息发送失败的结果。这样一来,如果消息发送失败,检查失败原因后重新发送即可。一般一条消息发送失败后重试次数会设置一个比较合理的值,一般为 3。

2. 消费者丢失消息

消息在被追加到 Partition 的时候都会分配一个特定的 offset 。offset 表示 Consumer 当前消费到的 Partition 的所在的位置。因此 Kafka 是通过 offset 来保证消息在一个 Partition 内的顺序性的。这个知识点我们在上面讲 Kafka 如何保证消息的顺序性提过。

还要补充一点,当 Consumer 拉取到了某一个 Partition 的某个消息之后,Consumer 会有一个自动提交 offset 的操作,用来告诉 Kafka,这条消息 Consumer 已经拿到了。但是这里会有一个问题,如果 Consumer 拿到这个消息后正准备进行消费,结果突然挂掉了,消息实际上并没有被消费,但是因为 offset 自动提交机制导致 Partition 认为这条消息已经被消费了。出现了消费者丢失消息的情况。

参考网上给的解决办法是,关闭自动提交 offset 配置,在业务代码上每次真正消费完消息之后再手动提交 offset 。但是这又会出现一个问题,消息可能被重复消费。比如说,Consumer 成功消费完消息后,还没提交 offset,Consumer 就挂掉了,那么这条消息会被 Partition 认为没被消费,后续又会被 Consumer 消费,导致消费两次。后面介绍 Kafka 如何保证消息不重复消费还会细讲。

3. Kafka 丢失消息

Kafka 为 Partition 引入了多副本(Replica) 机制,Partition 中的多个副本中有一个叫做 leader 副本,其它副本叫做 follower 副本。生产者与消费者只与 Partition 的 leader 副本交互。生产者发送的消息只会发送到 leader 副本,然后其它的 follower 副本会从 leader 副本中拉取消息进行同步。因此这些 follower 只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

follower 副本与 leader 副本之间的数据同步很容易出现不一致的问题。比如说,leader 副本所在的 broker 节点突然挂掉了,那么就要从 follower 副本中重新选出一个 leader 副本,但是原来挂掉的 leader 副本中还有一些数据没有同步给 follower 副本,这就导致消息的丢失,数据的不一致。

解决办法总结为:四个参数设置,一个等式设置

参数设置1:acks = all

acks 默认值为 1,代表消息被 leader 副本接收之后就算成功发送。当把 acks 配置为 acks = all 表示只有当 leader 副本以及所有 follower 副本都接收到消息时,才会被认为发送成功。这种模式虽然可以保证不止一个 Broker 接收到消息,但是延迟会很高。

参数设置2:replication.factor >= 3

replication.factor 是为了保证 leader 副本能有 follower 副本同步消息。配置 Topic 的参数 replication.factor >= 3,可以保证每个 Partition 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

参数设置3:min.insync.replicas > 1

min.insync.replicas 默认值为 1。一般情况将其设置为 min.insync.replicas > 1,表示消息至少写入到 2 个副本才算是发送成功。

参数设置4:unclean.leader.election.enable = false

unclean.leader.election.enable 默认值为 false。生产方发送的消息会被 leader 副本接收,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当配置了 unclean.leader.election.enable = false 后,当 leader 副本发生故障时就不会从 follower 副本中和 leader 副本同步程度达不到要求的副本中选择出 leader 副本,这样降低了消息丢失的可能性。

等式设置:replication.factor = min.insync.replicas + 1

为了保证整个 Kafka 服务的高可用,需要避免当有一个副本挂掉,整个分区就无法正常工作的情况。一般设置成 replication.factor = min.insync.replicas + 1。

  • Kafka 如何保证消息不重复消费?

Kafka 出现消息重复消费的原因:

1.服务端侧已经消费的数据没有成功提交 offset 。

2.Kafka 侧由于服务端处理业务时间长或者网络链接等原因让 Kafka 认为服务假死,触发了 Partition Rebalnce (分区再均衡)。

解决方案:

1.消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。

2.将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset 。那么这里会出现问题:什么时候提交 offset 合适?

两种情况:

情况1:处理完消息后再提交,依然会有消息重复消费的风险。因为处理完消息后,在提交 offset 前 Kafka 挂掉了,和自动提交情况类似。

情况2:拉取到消息立刻提交,会有丢消息的风险。因为虽然提交了 offset,但是拉取到消息后真正消费消息的时候 Kafka 挂掉了,会出现丢消息的可能。

情况 2 适合允许消息延时的场景,然后通过定时任务在业务不繁忙的时候做数据兜底。

Categories:

Tags:

No responses yet

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注