Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 701|回复: 0

一个flume agent异常的解决过程记录

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-4-24 03:20:05 | 显示全部楼层 |阅读模式

    今天在使用flume agent的时候,遇到了一个异常,  现把解决的过程记录如下:

    问题的背景:

    我使用flume agent 来接收从storm topology发送下来的accesslog , 做本地文件落盘。flume配置文件如下:

    #用于syslog和accesslog的本地文件滚动。
     
    a1.sources=r1
    a1.sinks = sink1
    a1.channels = c1
     
    #thrift source;
    a1.sources.r1.type= thrift
    a1.sources.r1.channels = c1
    a1.sources.r1.bind = 127.0.0.1
    a1.sources.r1.port = 8081
     
     
     
    #先用内存通道测试。
    #a1.channels.c1.type = memory
    #a1.channels.c1.capacity = 10000000
    #a1.channels.c1.byteCapacity = 524288000
     
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir=/Users/dongqingswt/workdir/logs/checkpoint
    a1.channels.c1.dataDirs = /Users/dongqingswt/workdir/logs/datadir
     
     
     
    #使用自定义的文件滚动sink
    a1.sinks.sink1.type=  com.beibei.datahamal.flume.extend.sink.roll.FileRollSink
    a1.sinks.sink1.channel = c1
     
     
     
     
    启动flume agent以后, thrift source在本机建立套接字监听,然后用flume-ng-sdk提供的RpcClientFactory 创建thrift rpc client , 进行消息发送的压测。
    发现rpcclient 发送消息到一定量(100000) 时, FileRollSink就无法从memory channel拉取数据了, 而thrift rpc client 也得到了如下异常:
    Exception in thread "main" org.apache.flume.EventDeliveryException: Failed to send event.
    at org.apache.flume.api.ThriftRpcClient.append(ThriftRpcClient.java:155)
    at com.beibei.datahamal.flume.extend.sink.rolling.FilerRollSinkSender.main(FilerRollSinkSender.java:42)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
    Caused by: java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:201)
    at org.apache.flume.api.ThriftRpcClient.append(ThriftRpcClient.java:134)
    ... 6 more

     查看源码, 发现thrift client有一个20s的请求超时,这时候 , 查看thrift 错误日志,发现了如下异常:

     

    java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method) ~[na:1.7.0_45]
    at java.lang.Thread.start(Thread.java:713) ~[na:1.7.0_45]
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:949) ~[na:1.7.0_45]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1360) ~[na:1.7.0_45]
    at org.apache.thrift.server.TThreadedSelectorServer.requestInvoke(TThreadedSelectorServer.java:306) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:210) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.select(TThreadedSelectorServer.java:586) ~[libthrift-0.9.3.jar:0.9.3]
    at org.apache.thrift.server.TThreadedSelectorServer$SelectorThread.run(TThreadedSelectorServer.java:541) ~[libthrift-0.9.3.jar:0.9.3]

     

     

    直观的看这个异常,会让人想到是堆内存不够用, 调节-Xmx1024m 参数以后,问题依旧。

    接着用jconsole attach到flume agent这个进程,查看堆内存的使用量, 远没有达到预设的xmx阈值。

    但是·有一个表现就是线程数猛涨到2000以后,直接掉到了20左右,为什么会创建这么多线程呢, 都是什么线程呢?

     

    结合thrift source 的源码, 发现thrift server 采用的也是java nio   , 有SelectorThread做socket的read/write 就绪select 

    AcceptorThread做socket的accept的select ,而socket读就绪以后,收到的FrameBuffer会被包装成一个Runnable丢到线程池处理(查看

    TThreadedSelectorServer的304 行),代码如下:
    protected boolean requestInvoke(FrameBuffer frameBuffer) {
    Runnable invocation = getRunnable(frameBuffer);
    if (invoker != null) {
    try {
    invoker.execute(invocation);
    return true;
    } catch (RejectedExecutionException rx) {
    LOGGER.warn("ExecutorService rejected execution!", rx);
    return false;
    }
    } else {
    // Invoke on the caller's thread
    invocation.run();
    return true;
    }
    }

    这个任务invocation的处理逻辑是在ThriftSource的ThriftSourceHandler定义的,也就是把thrift flume event直接丢到memchannel以后返回。起初 ,我怀疑是不是flume event丢到memchannel处理太慢(比如有线程死锁),导致线程堆积, 但后面换成file channel
    问题依旧,于是继续看jconsole上thrift 的Flume 线程,因为线程工厂在创建线程的时候,指定了线程名:
    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
    "Flume Thrift IPC Thread %d").build();
    if (maxThreads == 0) {
    sourceService = Executors.newCachedThreadPool(threadFactory);
    } else {
    sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
    }

    所以, 查看这些线程发现这些线程:
    堆栈跟踪:
    sun.misc.Unsafe.park(Native Method)
    java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
    java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    java.lang.Thread.run(Thread.java:744)
     
    说明线程池中的这些线程都在等任务工作, 既然大量的线程都在等任务,为什么还要创建这么多线程呢?是不是因为超过了单进程最大可建立的线程数呢,
    结合ThriftSource源码,发现不指定最大线程数时, thrift server的线程池的确是不停的新建线程,而maxThreads又是一个Integer.MAX_VALUE,
    if (maxThreads == 0) {
    sourceService = Executors.newCachedThreadPool(threadFactory);
    } else {
    sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory);
    }


    试着在flume 配置文件中指定最大线程数:
    a1.channels.r1.threads=10

    问题解决。
     这个问题的解决也让我们反思线程池的使用,不要设定太大的最大线程数。
     
     
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-8 18:50 , Processed in 0.075466 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表