Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 720|回复: 0

spark出现task不能序列化错误的解决方法

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-4-24 08:36:49 | 显示全部楼层 |阅读模式

    应用场景:使用JavaHiveContext执行SQL之后,希望能得到其字段名及相应的值,但却出现"Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField"的错误,代码如下:

    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaHiveContext sqlContext = new JavaHiveContext(sc);
    JavaSchemaRDD schema = sqlContext.sql("select * from default.dual");
    final StructField[] fields = schema.schema().getFields();
    JavaRDD<String> result = schema.map(new Function<Row, String>() {
    	private static final long serialVersionUID = 1L;
    	@Override
    	public String call(Row row) throws Exception {
    		StringBuffer out = new StringBuffer();
    		for (int i = 0; i < row.length(); i++) {
    			out.append(fields.getName() + "->" + row.get(i) + ";");
    		}
    		return out.toString();
    	}
    });
    System.out.println(result.collect());                    
    

    在spark官网上查找序列化方面的内容,看到可以通过注册的方式自定义类的序列化方式,故在conf上添加以下设置:

    conf.registerKryoClasses(new Class[] { org.apache.spark.sql.api.java.StructField.class });
    

    测试执行后,还是报相同的错误:java.io.NotSerializableException: org.apache.spark.sql.api.java.StructField,不知道为什么,如果有朋友知道,可在下面留言。

    上述方法测不通后,又再网上寻求方法,此时看到了下面的这篇文章,内容摘录见下:http://www.cnblogs.com/zwCHAN/p/4305156.html

    按照第一种方法,将依赖的变量StructField[]放到map内部定义,代码见下:

    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaHiveContext sqlContext = new JavaHiveContext(sc);
    JavaSchemaRDD schema = sqlContext.sql("select * from default.dual");
    JavaRDD<String> result = schema.map(new Function<Row, String>() {
    	private static final long serialVersionUID = 1L;
    	@Override
    	public String call(Row row) throws Exception {
                    StructField[] fields = schema.schema().getFields();
    		StringBuffer out = new StringBuffer();
    		for (int i = 0; i < row.length(); i++) {
    			out.append(fields.getName() + "->" + row.get(i) + ";");
    		}
    		return out.toString();
    	}
    });
    System.out.println(result.collect());            
    

      

    测试通过,但考虑到每次map都需要从JavaSchemaRDD中获取一次schema信息,比较耗时,而在map中有只需要String类型的字段名就可以了,故在原有基础上对代码进行优化,见下:

    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaHiveContext sqlContext = new JavaHiveContext(sc);
    JavaSchemaRDD schema = sqlContext.sql("select * from default.dual");
    StructField[] fields = schema.schema().getFields();
    final String[] fieldsName = new String[fields.length];
    for (int i = 0; i < fields.length; i++) {
    	fieldsName = fields.getName();
    }
    JavaRDD<String> result = schema.map(new Function<Row, String>() {
    	private static final long serialVersionUID = 1L;
    	@Override
    	public String call(Row row) throws Exception {
    		StringBuffer out = new StringBuffer();
    		for (int i = 0; i < row.length(); i++) {
    			out.append(fieldsName + "->" + row.get(i) + ";");
    		}
    		return out.toString();
    	}
    });
    System.out.println(result.collect());

    以下内容摘录自:http://www.cnblogs.com/zwCHAN/p/4305156.html

    出现“org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有:

    1. 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
    2. 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
    3. 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。
    4. 将引用的类做成可序列化的。
    5. 以下这两个没试过。。
    • Make the NotSerializable object as a static and create it once per machine.
    • Call rdd.forEachPartition and create the NotSerializable object in there like this:
    ==================

     

    If you see this error:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
    

    The above error can be triggered when you intialize a variable on the driver (master), but then try to use it on one of the workers. In that case, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. Consider the following code snippet:

    NotSerializable notSerializable = new NotSerializable(); JavaRDD<String> rdd = sc.textFile("/tmp/myfile"); rdd.map(s -> notSerializable.doSomething(s)).collect(); 

    This will trigger that error. Here are some ideas to fix this error:

    • Serializable the class
    • Declare the instance only within the lambda function passed in map.
    • Make the NotSerializable object as a static and create it once per machine.
    • Call rdd.forEachPartition and create the NotSerializable object in there like this:
    rdd.forEachPartition(iter -> {
      NotSerializable notSerializable = new NotSerializable(); // ...Now process iter });

    另外, stackoverflow上http://stackoverflow.com/questions/25914057/task-not-serializable-exception-while-running-apache-spark-job 这个答的也很简明易懂。

     
    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-9 02:17 , Processed in 0.063754 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表