1、创建orc格式hive表:
create table test_orc(name string,age int) stored as orc
2、查看表结构:show create table test_orc
CREATE TABLE `test_orc`(
`name` string,
`age` int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://localhost:9000/user/work/warehouse/test_orc'
TBLPROPERTIES (
'transient_lastDdlTime'='1502868725')
3、插入测试数据:
insert into table test_orc select name ,age from test limit 10;
4、读取mr:
1)pom.xml:
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.0</version>
</dependency>
2)代码:package com.fan.hadoop.orc;
import com.fan.hadoop.parquet.thrift.ParquetThriftWriterMR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import java.io.IOException;
public class OrcReaderMR {
public static class OrcMap extends Mapper<NullWritable,OrcStruct,Text,IntWritable> {
// Assume the ORC file has type: struct<s:string,i:int>
public void map(NullWritable key, OrcStruct value,
Context output) throws IOException, InterruptedException {
// take the first field as the key and the second field as the value
output.write((Text) value.getFieldValue(0),
(IntWritable) value.getFieldValue(1));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ParquetThriftWriterMR.class);
job.setJobName("parquetthrfit");
String in = "hdfs://localhost:9000/user/work/warehouse/test_orc";
String out = "hdfs://localhost:9000/test/orc";
job.setMapperClass(OrcMap.class);
OrcInputFormat.addInputPath(job, new Path(in));
job.setInputFormatClass(OrcInputFormat.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(out));
job.waitForCompletion(true);
}
}
3)查看生成的文件:hadoop dfs -cat /test/orc/part-m-00000
kafka 14
tensflow 98
hadoop 34
hbase 68
flume 57
kafka 99
kafka 28
flume 24
tensflow 35
flume 44
5、mr写orc文件:
1)代码:
package com.fan.hadoop.orc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.orc.OrcConf;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;
import java.io.IOException;
public class OrcWriterMR {
public static class OrcWriterMapper
extends Mapper<LongWritable,Text,NullWritable,OrcStruct> {
private TypeDescription schema =
TypeDescription.fromString("struct<name:string,age:int>");
private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema);
private final NullWritable nada = NullWritable.get();
private Text name = new Text();
private IntWritable age = new IntWritable();
public void map(LongWritable key, Text value,
Context output
) throws IOException, InterruptedException {
if(!"".equals(value.toString())){
String[] arr = value.toString().split("\t");
name.set(arr[0]);
age.set(Integer.valueOf(arr[1]));
pair.setFieldValue(0, name);
pair.setFieldValue(1,age);
output.write(nada, pair);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf,"struct<name:string,age:int>");
Job job = Job.getInstance(conf);
job.setJarByClass(OrcWriterMR.class);
job.setJobName("OrcWriterMR");
String in = "hdfs://localhost:9000/user/work/warehouse/test/ddd.txt";
String out = "hdfs://localhost:9000/test/orc2";
job.setMapperClass(OrcWriterMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(OrcOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(in));
OrcOutputFormat.setOutputPath(job, new Path(out));
job.waitForCompletion(true);
}
}
2)查看:#### 生成orc文件
hadoop dfs -ls /test/orc2
-rw-r--r-- 3 work supergroup 0 2017-08-16 17:45 /test/orc2/_SUCCESS
-rw-r--r-- 3 work supergroup 6314874 2017-08-16 17:45 /test/orc2/part-m-00000.orc
3)导入到hive:hadoop fs -cp /test/orc2/part-m-00000.orc /user/work/warehouse/test_orc/
hive> select * from test_orc limit 13;
OK
kafka 14
tensflow 98
hadoop 34
hbase 68
flume 57
kafka 99
kafka 28
flume 24
tensflow 35
flume 44
flume 44
tensflow 35
flume 24
Time taken: 0.045 seconds, Fetched: 13 row(s)