Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 860|回复: 0

解决flume运行中的一个异常问题!

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-4-11 09:17:12 | 显示全部楼层 |阅读模式

    今天在本地测试flume的exec  监控文件   分割的问题!!!遇到各种141异常问题!

     

    怀疑是在切割文件的时候超过了监控文本的时间,导致flume异常退出,,,所以增加了keep-alive 时长,,,他的默认值是3秒,,我把它设置为30秒,,,之后运行,,,,他不再异常!!!

    解决:设置agent1.channels.<channel_name>.keep-alive = 30

    参考文章:问题2,,,,虽然前边的agent,方式可能不一样,但是这个关键的时间是一样的。

    -------------------------------------------------以下是原文,原文地址:http://www.tuicool.com/articles/mmm2AvF

    flume 问题分析与处理

    问题一:

    org.apache.flume.EventDeliveryException:Failed to send events

           atorg.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)

           atorg.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

           at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

           at java.lang.Thread.run(Thread.java:722)

    Caused by:org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host:10.95.198.123, port: 44444 }: Failed to send batch

           at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:294)

           atorg.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:366)

           ... 3 more

    Caused by:org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host:10.95.198.123, port: 44444 }: Avro RPC call returned Status: FAILED

           atorg.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:370)

    分析:

    代码分析

        try { 
          appendBatch(events, requestTimeout, TimeUnit.MILLISECONDS); 
        } catch (Throwable t) { 
          // we mark as no longer active without trying to clean up resources 
          // client is required to call close() to clean up resources 
          setState(ConnState.DEAD); 
          if (t instanceof Error) { 
            throw (Error) t; 
          } 
          if (t instanceof TimeoutException) { 
            throw new EventDeliveryException(this + ": Failed to send event. " + 
                "RPC request timed out after " + requestTimeout + " ms", t); 
          } 
          throw new EventDeliveryException(this + ": Failed to send batch", t);

    请求超时,导致发送event失败

    解决:

    设置request-timeout长一点,默认20秒 

    问题二:

    org.apache.flume.ChannelException: Unableto put batch on required channel: org.apache.flume.channel.MemoryChannel{name:woStoreSoftWDownloadC2}

           atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

           atorg.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:376)

           atorg.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:336)

           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

           at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)

           at java.util.concurrent.FutureTask.run(FutureTask.java:166)

           atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

           atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

           at java.lang.Thread.run(Thread.java:722)

    Caused by:org.apache.flume.ChannelException: Space for commit to queue couldn't beacquired Sinks are likely not keeping up with sources, or the buffer size istoo tight

           atorg.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)

           atorg.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)

            atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

           ... 8 more

    30 Mar 2014 10:16:00,960 ERROR[timedFlushExecService18-0](org.apache.flume.source.ExecSource$ExecRunnable$1.run:322)  - Exception occured when processing eventbatch

    org.apache.flume.ChannelException: Unableto put batch on required channel: org.apache.flume.channel.MemoryChannel{name:woStoreSoftWDownloadC2}

           atorg.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)

           atorg.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:376)

           atorg.apache.flume.source.ExecSource$ExecRunnable.access$100(ExecSource.java:249)

           at org.apache.flume.source.ExecSource$ExecRunnable$1.run(ExecSource.java:318)

           atjava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

           atjava.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)

           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)

            at  

    代码分析:

     protected void doCommit() throws InterruptedException { 
          int remainingChange = takeList.size() - putList.size(); 
          if(remainingChange < 0) { 
            if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { 
              throw new ChannelException("Space for commit to queue couldn't be acquired" + 
                  " Sinks are likely not keeping up with sources, or the buffer size is too tight"); 
            } 
          } 
          int puts = putList.size(); 
          int takes = takeList.size(); 
          synchronized(queueLock) { 
            if(puts > 0 ) { 
              while(!putList.isEmpty()) { 
                if(!queue.offer(putList.removeFirst())) { 
                  throw new RuntimeException("Queue add failed, this shouldn't be able to happen"); 
                } 
              } 
            } 
            putList.clear(); 
            takeList.clear(); 
          } 
          queueStored.release(puts); 
          if(remainingChange > 0) { 
            queueRemaining.release(remainingChange); 
          } 
          if (puts > 0) { 
            channelCounter.addToEventPutSuccessCount(puts); 
          } 
          if (takes > 0) { 
            channelCounter.addToEventTakeSuccessCount(takes); 
          } 
          channelCounter.setChannelSize(queue.size()); 
        } 

    等待keep-alive之后,还是没办法插入event

    解决方案:

    设置keep-alive(默认3秒) , capacity(100), transactionCapacity(100)大一点

    -----------------------------------------------------------------------------------------------

    之后,前边的异常少了,,后边的异常是,,,产生消息快,命令消费慢!!!导致管道满!!!!

    这个就需要对端加大管道的消费速度,来调整???,,,,,怀疑之,,,然后增大空间和传输空间!

     

    -----------更改结果如下!

    stage_nginx.channels.M1.capacity = 1000
    stage_nginx.channels.M1.transactionCapacity = 1000
    stage_nginx.channels.M1.keep-alive = 30

    ---------

     

     

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-4-29 11:14 , Processed in 0.064613 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表