简历写着熟悉 Dubbo,居然连 Dubbo 线程池监控都不知道?

优采云 发布时间: 2022-07-03 23:26

  简历写着熟悉 Dubbo,居然连 Dubbo 线程池监控都不知道?

  Dubbo 是一款优秀的微服务框架,它以其高性能、简单易用、易扩展等特点,广泛应用于互联网、金融保险、科技公司、制造业、零售物流等多个领域。如今,Dubbo 框架已经成了互联网开发中比较常用的技术框架。

  在Dubbo框架中,当客户端调用服务端的时候,请求抵达了服务端之后,会有专门的线程池去接收参数并且处理。所以如果要实现Dubbo的线程池监控,就需要先了解下Dubbo底层对于业务线程池的实现原理。

  Dubbo底层对于线程池的查看

  这里我所使用的框架是 Dubbo 2.7.8 版本,它在底层对于线程池的管理是通过一个叫做ExecutorRepository 的类处理的,这个类负责创建并管理 Dubbo 中的线程池,通过该扩展接口,我们可以获取到Dubbo再实际运行中的业务线程池对象。

  具体的处理逻辑部分如下所示:

  package org.idea.dubbo.monitor.core.collect;<br />import org.apache.dubbo.common.extension.ExtensionLoader;<br />import org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository;<br />import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;<br />import java.lang.reflect.Field;<br />import java.util.concurrent.ConcurrentMap;<br />import java.util.concurrent.ExecutorService;<br />import java.util.concurrent.ThreadPoolExecutor;<br />/**<br /> * @Author idea<br /> * @Date created in 7:04 下午 2022/6/29<br /> */<br />public class DubboThreadPoolCollector {<br />    /**<br />     * 获取Dubbo的线程池<br />     * @return<br />     */<br />    public static ThreadPoolExecutor getDubboThreadPoolInfo(){<br />        //dubbo线程池数量监控<br />        try {<br />            ExtensionLoader executorRepositoryExtensionLoader = ExtensionLoader.getExtensionLoader(ExecutorRepository.class);<br />            DefaultExecutorRepository defaultExecutorRepository = (DefaultExecutorRepository) executorRepositoryExtensionLoader.getDefaultExtension();<br />            Field dataField = defaultExecutorRepository.getClass().getDeclaredField("data");<br />            dataField.setAccessible(true);<br />            ConcurrentMap data = (ConcurrentMap) dataField.get(defaultExecutorRepository);<br />            ConcurrentMap executorServiceConcurrentMap = data.get("java.util.concurrent.ExecutorService");<br />            //获取到默认的线程池模型<br />            ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorServiceConcurrentMap.get(9090);<br />            return threadPoolExecutor;<br />        } catch (Exception e) {<br />            e.printStackTrace();<br />        }<br />        return null;<br />    }<br />}

  好了,现在我们知道如何在代码中实时查看Dubbo线程池的信息了,那么接下来要做的就是如何采集这些线程池的数据,并且进行上报,最后将上报存储的数据通过统计图的方式展示出来。

  下边我们按照采集,上报,展示三个环节来展示数据。

  采集数据

  在采集数据这块,有两种思路去采集,分别如下:

  采用两种不同的模式采集出来的数据,可能会有些差异,下边是两种方式的比对:

  统计方式实现难度可能存在的问题

  定时任务采集数据

  简单

  定时任务执行间隙中的数据无法采集,导致数据失真。

  请求抵达是采集数据

  稍为复杂一些

  在每次请求的时候都需要采集数据,会对性能有一定损耗。

  通过对实际的业务场景分析,其实第二种方式对应用的性能损耗极微,甚至可以忽略,所以使用这种方式去采集数据的话会比较合适。

  

  下边让我们一起来看看这种方式采集数据的话,该如何实现。

  首先我们需要自己定义一个filter过滤器:

  package org.idea.dubbo.monitor.core.filter;<br />import org.apache.dubbo.common.constants.CommonConstants;<br />import org.apache.dubbo.common.extension.Activate;<br />import org.apache.dubbo.rpc.*;<br />import org.idea.dubbo.monitor.core.DubboMonitorHandler;<br />import java.util.concurrent.ThreadPoolExecutor;<br />import static org.idea.dubbo.monitor.core.config.CommonCache.DUBBO_INFO_STORE_CENTER;<br />/**<br /> * @Author idea<br /> * @Date created in 2:33 下午 2022/7/1<br /> */<br />@Activate(group = CommonConstants.PROVIDER)<br />public class DubboRecordFilter implements Filter {<br />    @Override<br />    public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {<br />        ThreadPoolExecutor threadPoolExecutor = DubboMonitorHandler.getDubboThreadPoolInfo();<br />        //请求的时候趣统计线程池,当请求量太小的时候,这块的数据可能不准确,但是如果请求量大的话,就接近准确了<br />        DUBBO_INFO_STORE_CENTER.reportInfo(9090,threadPoolExecutor.getActiveCount(),threadPoolExecutor.getQueue().size());<br />        return invoker.invoke(invocation);<br />    }<br />}<br />

  关于DUBBO_INFO_STORE_CENTER的代码如下所示:

  并且在dubbo的spi配置文件中指定好它们:

  dubboRecordFilter=org.idea.dubbo.monitor.core.filter.DubboRecordFilter<br />

  当provider加入了这个过滤器以后,若有请求抵达服务端,则会通过这个filter触发采集操作。

  package org.idea.dubbo.monitor.core.collect;<br />import org.idea.dubbo.monitor.core.bo.DubboInfoStoreBO;<br />import java.util.Map;<br />import java.util.concurrent.ConcurrentHashMap;<br />/**<br /> * Dubbo数据存储中心<br /> *<br /> * @Author idea<br /> * @Date created in 11:15 上午 2022/7/1<br /> */<br />public class DubboInfoStoreCenter {<br />    private static Map dubboInfoStoreBOMap = new ConcurrentHashMap();<br />    public void reportInfo(Integer port, Integer corePoolSize, Integer queueLength) {<br />        synchronized (this) {<br />            DubboInfoStoreBO dubboInfoStoreBO = dubboInfoStoreBOMap.get(port);<br />            if (dubboInfoStoreBO != null) {<br />                boolean hasChange = false;<br />                int currentMaxPoolSize = dubboInfoStoreBO.getMaxCorePoolSize();<br />                int currentMaxQueueLength = dubboInfoStoreBO.getMaxCorePoolSize();<br />                if (corePoolSize > currentMaxPoolSize) {<br />                    dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);<br />                    hasChange = true;<br />                }<br />                if (queueLength > currentMaxQueueLength) {<br />                    dubboInfoStoreBO.setMaxQueueLength(queueLength);<br />                    hasChange = true;<br />                }<br />                if (hasChange) {<br />                    dubboInfoStoreBOMap.put(port, dubboInfoStoreBO);<br />                }<br />            } else {<br />                dubboInfoStoreBO = new DubboInfoStoreBO();<br />                dubboInfoStoreBO.setMaxQueueLength(queueLength);<br />                dubboInfoStoreBO.setMaxCorePoolSize(corePoolSize);<br />                dubboInfoStoreBOMap.put(port, dubboInfoStoreBO);<br />            }<br />        }<br />    }<br />    public DubboInfoStoreBO getInfo(Integer port){<br />        return dubboInfoStoreBOMap.get(port);<br />    }<br />    public void cleanInfo(Integer port) {<br />        dubboInfoStoreBOMap.remove(port);<br />    }<br />}<br />

  注意这个采集类只会采集一段时间的数据,然后定期会清空重置。

  之所以这么做,是希望用这个map统计指定时间内的最大线程数和最大队列数,接着当这些峰值数据被上报到存储中心后就进行清空。

  关于DubboInfoStoreCenter对象的定义,我将它放置在了一个叫做CommonCache的类里面,具体如下:

  package org.idea.dubbo.monitor.core.config;<br />import org.idea.dubbo.monitor.core.store.DubboInfoStoreCenter;<br />/**<br /> * @Author idea<br /> * @Date created in 12:15 下午 2022/7/1<br /> */<br />public class CommonCache {<br />    public static DubboInfoStoreCenter DUBBO_INFO_STORE_CENTER = new DubboInfoStoreCenter();<br />}<br />

  所以在上边的过滤器中,我们才可以直接通过静态类引用去调用它的采集接口。

  好了,现在整体来看,我们已经实现了在过滤器中去实时采集线程池的数据,并且将它暂存在了一个Map表中,这个map的数据主要是记录了某段时间内的线程池峰值,供采集器角色去使用。

  那么接下来,我们就来看看上报器模块主要做了哪些操作。

  上报数据

  上报数据前,最重要的就是选择合适的存储组件了。首先上报的数据本身体量并不大,我们可以将采集时间短设置为15秒,那么设计一个上报任务,每隔15秒采集一次dubbo线程池的数据。那么一天的时间就需上报5760次,假设一次上报存储一条记录的话,那么一天下来所需要存储的数据也并不是特别多。

  并且存储下来的服务数据实际上也并不需要保留太长的时间,一般存储个一周时间也就足够了,所以最终我选用啦Redis进行这方面的存储。

  我们实际每次关注的数据字段主要有三个,关于它们的定义我整理成了下边这个对象:

  package org.idea.dubbo.monitor.core.bo;<br />/**<br /> * @Author idea<br /> * @Date created in 7:17 下午 2022/6/29<br /> */<br />public class ThreadInfoBO {<br /><br /><br />    private Integer activePoolSize;<br />    private Integer queueLength;<br />    private long saveTime;<br />    public Integer getActivePoolSize() {<br />        return activePoolSize;<br />    }<br />    public void setActivePoolSize(Integer activePoolSize) {<br />        this.activePoolSize = activePoolSize;<br />    }<br />    public Integer getQueueLength() {<br />        return queueLength;<br />    }<br />    public void setQueueLength(Integer queueLength) {<br />        this.queueLength = queueLength;<br />    }<br />    public long getSaveTime() {<br />        return saveTime;<br />    }<br />    public void setSaveTime(long saveTime) {<br />        this.saveTime = saveTime;<br />    }<br />    @Override<br />    public String toString() {<br />        return "ThreadInfoBO{" +<br />                ", queueLength=" + queueLength +<br />                ", saveTime=" + saveTime +<br />                '}';<br />    }<br />}<br />

  

  接着会开启一个线程任务,每间隔15秒就会执行一轮上报数据的动作:

  这类要注意下,Dubbo应用的线程池上报任务应当等整个SpringBoot应用启动成功之后再去触发,否则可能会有些许数据不准确性。所以再定义Bean初始化线程的时候,我选择了CommandLineRunner接口。

  细心查看代码的你可能会看到这么一个类:

  org.idea.dubbo.monitor.core.report.IReportTemplate<br />

  这个类定义了数据上报器的基本动作,下边是它的具体代码:

  package org.idea.dubbo.monitor.core.report;<br /><br /><br />/**<br /> * 上报模版<br /> *<br /> * @Author idea<br /> * @Date created in 7:10 下午 2022/6/29<br /> */<br />public interface IReportTemplate {<br />    /**<br />     * 上报数据<br />     *<br />     * @return<br />     */<br />    boolean reportData(String json);<br /><br /><br />}<br />

  实现类部分如下所示:

  package org.idea.dubbo.monitor.core.report.impl;<br />import org.idea.dubbo.monitor.core.report.IReportTemplate;<br />import org.idea.qiyu.cache.redis.service.IRedisService;<br />import org.springframework.stereotype.Component;<br />import javax.annotation.Resource;<br />import java.time.LocalDate;<br />import java.util.concurrent.TimeUnit;<br />/**<br /> * @Author idea<br /> * @Date created in 7:12 下午 2022/6/29<br /> */<br />@Component<br />public class RedisTemplateImpl implements IReportTemplate {<br />    @Resource<br />    private IRedisService redisService;<br />    private static String queueKey = "dubbo:threadpool:info:";<br />    @Override<br />    public boolean reportData(String json) {<br />        redisService.lpush(queueKey + LocalDate.now().toString(), json);<br />        redisService.expire(queueKey + LocalDate.now().toString(),7, TimeUnit.DAYS);<br />        return true;<br />    }<br /><br /><br />}<br />

  这里面我采用的是list的结构去存储这些数据指标,设定了一个过期时间为一周,最终存储到redis之后的格式如下所示:

  数据展示

  好了,现在我们已经完成了对线程池的监控,最后只需要设计一个管理台,从缓存中提取上报的数据并且进行页面的展示即可。

  实现的逻辑比较简单,只需要定义好统计图所需要的数据结构,然后在controller曾返回即可,例如下图所示:

  最终展现出来的效果如下图:

  随着请求dubbo接口的量发生变化,统计图可以展示出dubbo线程池的数据变动情况。如果希望统计图以实时的方式展示数据的话,其实只需要在js中写一个定时调用的函数即可。

  这里我是使用的是echart插件做的图表渲染,我选用的是最简单的统计图类型,大家也可以根据自己的具体所需在echart的官网上选择合适的模型进行渲染,下边这是echart的官网地址:

  推荐

  PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线