Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 601|回复: 0

kafkaStream解析json出错导致程序中断的解决方法

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-8-27 13:35:56 | 显示全部楼层 |阅读模式

    出错在 KStreamFlatMapValues 方法执行时,由于json异常数据无法解析,结果生成的值为null,报错信息如下:

    2018-04-18 19:21:04,776 ERROR [app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1] com.gw.stream.KStream103.lambda$main$1(100) | 捕获到异常:hello world hello world king
    Exception in thread "app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1" java.lang.NullPointerException
            at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
            at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
            at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
            at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
            at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
            at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
            at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
            at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
            at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
            at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
            at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
            at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
            at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
    
    

    问题解决方案:

    1. 对json解析的bean添加未知字段忽略

      
      import java.util.List;
      import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
      
      @JsonIgnoreProperties(ignoreUnknown = true)
      public class Bean103 {
      
          private List<String> key1;
          private List<List<String>> key2;
            
          public void setKey1(List<String> key1) {
              this.key1 = key1;
          }
          public List<String> getKey1() {
              return key1;
          }
          
          public void setKey2(List<List<String>> key2) {
              this.key2 = key2;
          }
          public List<List<String>> getKey2() {
              return key2;
          }
      }
      
      
    2. 由于报空指针错误,所以解决空指针问题,即判断为null时创建一个空对象.

      return list == null ? new ArrayList<String>():list;
      
    3. 完整的示例代码如下:

      package com.gw.stream;
      
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import java.util.stream.Collectors;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.KafkaStreams;
      import org.apache.kafka.streams.KeyValue;
      import org.apache.kafka.streams.StreamsBuilder;
      import org.apache.kafka.streams.StreamsConfig;
      import org.apache.kafka.streams.kstream.KStream;
      import org.apache.kafka.streams.kstream.Produced;
      import org.apache.log4j.Logger;
      
      import com.alibaba.fastjson.JSONObject;
      
      public class KStream103 {
      
      	private static Logger log = Logger.getLogger(KStream103.class);
      
      	public static void main(String[] args) {
      		
      		if(args.length < 6){
      			log.error("错误:参数个数不正确[application_id bootstarp_server groupid source_topic target_topic auto_offset_reset]");
      			return ;
      		}
      		String application_id=args[0];
      		String bootstarp_server = args[1];
      		String groupid = args[2];
      		String source_topic = args[3];
      		String target_topic = args[4];
      		String auto_offset_reset = args[5];
      			
      		Properties props = new Properties();
      		// consumer group
      		// 指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件
      		props.put(StreamsConfig.APPLICATION_ID_CONFIG, application_id);
      		props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/");
      		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarp_server);
      		// props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
      		props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
      		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
      		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);  //自动提交
      		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
      		//针对时间异常解决方法
      		props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
      		
      
      		final String splitChar = "\001";
      
      		StreamsBuilder builder = new StreamsBuilder();
      		KStream<String, String> textLines = builder.stream(source_topic); // 接收第一个topic
      		textLines.flatMapValues(value -> {
      			
      			Bean103 bean103 = null;
      			List<String> list = null;
      
      			try {
      				
      				//这里是value的业务处理逻辑...最终返回的是一个list
      								
      			} catch (Exception e) {
      				log.error("捕获到异常:" + value);
      				log.error("error message:" + e.getMessage());
      				
      			}
      			return list == null ? new ArrayList<String>():list;
      
      		}).filter((k,v)-> v !=null).map((k, v) -> new KeyValue<>(k, v))
      		.to(target_topic, Produced.with(Serdes.String(), Serdes.String()));
      		
      		KafkaStreams streams = new KafkaStreams(builder.build(), props);
      		streams.start();
      
      	}
      
      }
      
      
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-3 16:27 , Processed in 0.070183 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表