您现在的位置是:首页 > 正文

随笔——消息队列线程池模型如何保证重启时消息不丢

2024-02-01 03:15:51阅读 1

背景

今天在脉脉上面看到了一个帖子,比较有意思:

这个帖子的意思是:在使用Kafka的时候,我们已经设置了多个分区,如何去提升消费能力?如果使用线程池的方式去提升如何保证重启时消息不丢。

这个题其实问了两个点,第一个是如何提升消费能力,第二个是如果选择线程池,我们如何做到消息不丢。

这里先解释一下这两个问题到底是怎么回事,在很多消息队列中都有一个概念叫partion,代表着分区,分区是我们提高消息队列消费的关键,我们的消费者消费的渠道就是从每个分区中来的,一个分区只能被一个消费者持有,如下图所示:

有点类似银行排队,队列的个数越多,排队的时间相对来说就会越少,当然也可以通过异步的方式去处理,比如线程池,把所有的消息都扔到线程池中去执行,这就引出了作者说的第二个问题,首先我们来看看同步消费为什么不会丢消息呢?

如果我们使用的是同步模型,当我们消费了之后会将offset ack回去,如果我们出现了重启,没有成功offset,那么这部分数据将会再次消费,如果是用线程池进行消费,那么我们如何进行ack呢,比如我们用线程池消费了 10,11,12 三条消息如果12先消费完,那么我们ack 13吗?如果这样做的话,这个时候重启,kafka就会认为你已经处理了10,11的消息,这个时候消息就会出现丢失,而发这个帖子的同学就是对于这一块是比较疑惑。

网友的回答

我们来看看网友的一些回答:

网友A:


这名网友的回答本质还是使用线程池,作者也回复了,并没有解决线程池的问题。

网友B:

这个方法类似银行排队,只要队列多,那么处理速度就会加快,的确是第一个问题的解决办法之一。

网友C:


这一类主要解决了第二个问题,通过外部维护offset,比如通过offset入库的方式,我们就能找到正确的应该消费的offset,这个相对来说比较复杂,使用一个MQ还得配套一个数据库,万一我使用MQ的服务根本都没有数据库,还得单独去申请。

网友D:

还有另外一种观点就是,代码写好一点,让消费的速度提高,那消费能力自然就上去了,这个的确是一个很重要的点,通常被其他人给忽略,有时候消费比较慢,很多人可能一上来就是考虑中间件应该怎么设置,往往会忽略自己的代码。

看了这么多帖子的一个回复,感觉没有真正能让我满意的答案,下面来说说我心中的一些思路。

我的想法

对于第一个问题的话,如何提升消费能力?这个问题其实可以总结为三个办法:

  1. 如果每台消费者机器消费线程是固定的,那么我们可以扩容消费机器和partion,类似银行排队增加排队窗口一样。

  2. 如果机器和partion是固定的,增加消费线程就是一个比较好的办法,但是如果是顺序消费,就不能通过增加线程数的方式来提升消费能力,因为顺序消费每个partion都是一个单独的线程,只能通过第一种方式去解决。

  3. 增加自身代码的消费能力,你想想如果银行办事,如果柜员的办事效率能提升的非常高,那么整个排队速度肯定也是很快的。

对于第二个问题,如果我们使用线程池模型,如何去解决消息丢失问题,这里我比较推荐的是RocketMQ中的做法,我们之前说了用数据库去保存offset比较复杂,性能还比较差,在RocketMQ中使用了一个TreeMap的结构做了我们上面提到的数据库的事:

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

这个TreeMap的key是每个message的offset,value就是这条消息的一些信息,TreeMap的底层是使用红黑树去实现的,我们可以很快获取其中的最小值和最大值,当我们每次处理完某一条消息的时候我们会将这条消息从msgTreeMap中移除,

public long removeMessage(final List<MessageExt> msgs) {
        long result = -1;
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!msgTreeMap.isEmpty()) {
                    result = this.queueOffsetMax + 1;
                    int removedCnt = 0;
                    for (MessageExt msg : msgs) {
                        MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                        if (prev != null) {
                            removedCnt--;
                            msgSize.addAndGet(0 - msg.getBody().length);
                        }
                    }
                    msgCount.addAndGet(removedCnt);

                    if (!msgTreeMap.isEmpty()) {
                        result = msgTreeMap.firstKey();
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (Throwable t) {
            log.error("removeMessage exception", t);
        }
        return result;
    }

removeMessage这个方法就是移除已经消费过的消息,并且返回当前最新的消费offset,这里返回的结果就是msgTreeMap.firstKey(),我们ack给消息队列server的值其实也是这个,回到我们这个问题上,如果我们发生重启,那么其实也不需要担心我们会出现消息丢失。

最后

这里只是简单的对消息队列提升消息能力做了一些介绍,如果大家对消息队列有兴趣的,可以看我之前的一些文章:

如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:

网站文章

  • springcloud — 微服务鉴权管理Spring Security OAuth2原理解析(四)

    springcloud — 微服务鉴权管理Spring Security OAuth2原理解析(四)

    回顾之前文章:1. 微服务鉴权管理之OAuth2原理解析(一)2. 微服务鉴权管理Spring Security原理解析(二)3. 微服务鉴权管理Spring Security OAuth2原理解析(...

    2024-02-01 03:15:22
  • Java报错ClassNotFoundException或者NoSuchMethodError

    Java报错ClassNotFoundException或者NoSuchMethodError

    出现这种报错99%为包版本不同,依赖冲突导致。

    2024-02-01 03:15:15
  • 利用bind9架设智能DNS——postgreSQL数据库篇

    智能DNS工作原理: 在用户解析一个域名的时候,判断一下用户的IP,然后跟DNS服务器内部的IP表匹配一下,看看用户是电信还是网通用户,然后给用户返回对应的IP地址。目前的域名服务运营商不提供智能DNS服务,所以必须自行架设DNS服务或者使用网上免费的智能DNS

    2024-02-01 03:15:08
  • 计算机网络:网络层——数据平面

    计算机网络:网络层——数据平面

    概述 两种重要的网络层功能:转发和路由选择。(forwarding and routing) 数据层面(Data plane): local, per-router function determin...

    2024-02-01 03:15:02
  • 兴趣篇——用C语言写打字母游戏

    昨天说的,以后会写和小游戏相关的博客,事不宜迟,今天就先开始用最简单的C语言编写一个简单的打字母小游戏吧。 应今天我的软件工程老师的一句话:当做一件事时,如果能很快的从中得到正反馈,我们将会有成就感,我们将会提高对它的兴趣。 我们编程的入门语言是C语言,是一门相对简单的编程语言,但是大多数大学生或者其他刚刚接触编程学完C语言的人,往往会产生一个疑问:我们学习了C语言到底能干什么?更有甚者,像计

    2024-02-01 03:14:32
  • 带宽和时延究竟有没有关系

    一、严格意义上来说是不一样的,简单点说,网络为什么会出现延时,一定程度上就是带宽时延和数据量造成的;举个例子来说就很清楚了:带宽,bandwidth,是指每秒钟传输的最大字节数。带宽本来是指某个信号具...

    2024-02-01 03:14:26
  • python django框架orm_利用Python的Django框架中的ORM建立查询API

    摘要在这篇文章里,我将以反模式的角度来直接讨论Django的低级ORM查询方法的使用。作为一种替代方式,我们需要在包含业务逻辑的模型层建立与特定领域相关的查询API,这些在Django中做起来不是非常...

    2024-02-01 03:14:20
  • Php Jquery Load FadeIn FadeOut 无刷新分页

    Php Jquery Load FadeIn FadeOut 无刷新分页index.php<?phpdefine('IN_LOVE',true);require_once('includes/load.php');$smarty->display('index.html');?>index.html<html>&lt...

    2024-02-01 03:13:49
  • java中如何将url地址文件流上传到sftp的某个目录下面?(亲测)

    java中如何将url地址文件流上传到sftp的某个目录下面?(亲测)

    java中如何将url地址文件流上传到sftp的某个目录下面?

    2024-02-01 03:13:43
  • 什么是分布式数据库?我不信,看完这篇你还不懂!

    什么是分布式数据库?我不信,看完这篇你还不懂!

    原文来源: https://tidb.net/blog/eb3cb609 ...

    2024-02-01 03:13:37