Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 861|回复: 0

hive不支持多个字符作为分隔符的解决方案

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-6-1 16:34:31 | 显示全部楼层 |阅读模式

    题记:

      近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。

    1、重新实现hive的InputFormat了,别急放码过来

    package hiveStream;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.JobConfigurable;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    
    public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{
    
    	@Override
    	public RecordReader<LongWritable, Text> getRecordReader(
    			InputSplit genericSplit, JobConf job, Reporter reporter)
    			throws IOException {
    		 reporter.setStatus(genericSplit.toString());
    	       return new MyRecordReader((FileSplit) genericSplit, job);
    	}
    	
    }
    

    2、仔细看看下面的方法,不解释,自己领悟。

    package hiveStream;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.util.LineReader;
    
    
    public class MyRecordReader implements RecordReader<LongWritable, Text>{
    	
    	private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long pos;
        private long end;
        private LineReader lineReader;
        int maxLineLength;
    	
        public MyRecordReader(InputStream in, long offset, long endOffset,
                int maxLineLength) {
            this.maxLineLength = maxLineLength;
            this.start = offset;
            this.lineReader = new LineReader(in);
            this.pos = offset;
            this.end = endOffset;
        }
        
        public MyRecordReader(InputStream in, long offset, long endOffset,
                Configuration job) throws IOException {
            this.maxLineLength = job.getInt(
                    "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
            this.lineReader = new LineReader(in, job);
            this.start = offset;
            this.end = endOffset;
        }
        
        // 构造方法
        public MyRecordReader(FileSplit inputSplit, Configuration job)
                throws IOException {
            maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
                    Integer.MAX_VALUE);
            start = inputSplit.getStart();
            end = start + inputSplit.getLength();
            final Path file = inputSplit.getPath();
            // 创建压缩器
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);
            // 打开文件系统
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(file);
            boolean skipFirstLine = false;
     
            if (codec != null) {
                lineReader = new LineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            } else {
                if (start != 0) {
                    skipFirstLine = true;
                    --start;
                    fileIn.seek(start);
                }
                lineReader = new LineReader(fileIn, job);
            }
     
            if (skipFirstLine) {
                start += lineReader.readLine(new Text(), 0,
                        (int) Math.min((long) Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }
    	
        @Override
    	public void close() throws IOException {
        	if (lineReader != null)
                lineReader.close();
    	}
    
    	@Override
    	public LongWritable createKey() {
    		return new LongWritable();
    	}
    
    	@Override
    	public Text createValue() {
    		 return new Text();
    	}
    
    	@Override
    	public long getPos() throws IOException {
    		  return pos;
    	}
    
    	@Override
    	public float getProgress() throws IOException {
    	   if (start == end) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (pos - start) / (float) (end - start));
            }
    	}
    
    	@Override
    	public boolean next(LongWritable key, Text value) throws IOException {
    		 while (pos < end) {
    	            key.set(pos);
    	            int newSize = lineReader.readLine(value, maxLineLength,
    	                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
    	                            maxLineLength));
    	            // 把字符串中的"##"转变为"#"
    	            String strReplace = value.toString().replaceAll("\\s+", "\001");
    	            Text txtReplace = new Text();
    	            txtReplace.set(strReplace);
    	            value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
    	            if (newSize == 0)
    	                return false;
    	            pos += newSize;
    	            if (newSize < maxLineLength)
    	                return true;
    	        }
    	        return false;
    	    }
    	}
    

    3、处理实例:如下

         

    数据处理要求:
        
        12 afd   fewf	fewfe  we
        76 vee   ppt	wfew  wefw
        83 tyutr   ppt	wfew  wefw
        45 vbe   ppt	wfew  wefw
        565 wee   ppt	wfew  wefw
        12 sde   ppt	wfew  wefw
    注意:字段之间的空格不一致
    
    1、建表:
        create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
    注意:输出咱可没有重写哦
    
    2、加载数据:
        LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;
    
    3、看看的成果:
        select * from micmiu_blog;
    
    自己去试试,不解释
    

      

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-1 03:38 , Processed in 0.090304 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表