Apache ORC格式文件使用Java读写

  |   0 评论   |   2,638 浏览

背景

接前文Apache ORC格式简介和使用工具读写

Java类设计

VectorizedRowBatch

ORC中的数据,在内存中的类表示为org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch,如下:

public class VectorizedRowBatch implements Writable {
  public int numCols;           // number of columns
  public ColumnVector[] cols;   // a vector for each column
  public int size;              // number of rows that qualify (i.e. haven't been filtered out)
  ...
}

ColumnVector

每列的内容存储在org.apache.hadoop.hive.ql.exec.vector.ColumnVector中,如下:

public abstract class ColumnVector {

  public final Type type;
  
  // If the whole column vector has no nulls, this is true, otherwise false.
  public boolean noNulls;

  /*
   * If hasNulls is true, then this array contains true if the value
   * is null, otherwise false. The array is always allocated, so a batch can be re-used
   * later and nulls added.
   */
  public boolean[] isNull;

  /*
   * True if same value repeats for whole column vector.
   * If so, vector[0] holds the repeating value.
   */
  public boolean isRepeating;
  ...
}

可见对于没有null的情景,和值全相同的情景,做了特殊优化。

其子类型包括:

ORC类型 列类型
array ListColumnVector
binary BytesColumnVector
bigint LongColumnVector
boolean LongColumnVector
char BytesColumnVector
date LongColumnVector
decimal DecimalColumnVector
double DoubleColumnVector
float DoubleColumnVector
int LongColumnVector
map MapColumnVector
smallint LongColumnVector
string BytesColumnVector
struct StructColumnVector
timestamp TimestampColumnVector
tinyint LongColumnVector
uniontype UnionColumnVector
varchar BytesColumnVector

LongColumnVector

LongColumnVector处理所有的整数类型,包括boolean, bigint, date, int, smallinttinyint

public class LongColumnVector extends ColumnVector {
  public long[] vector;
  ...
}

TimestampColumnVector

TimestampColumnVector处理时间戳类型

public class TimestampColumnVector extends ColumnVector {

  /*
   * The values from Timestamp.getTime().
   */
  public long[] time;

  /*
   * The values from Timestamp.getNanos().
   */
  public int[] nanos;
  ...      
}

DoubleColumnVector

DoubleColumnVector处理所有的浮点数,包括double和flat

public class DoubleColumnVector extends ColumnVector {
  public double[] vector;
  public  short  scale;
  public  short  precision;
  ...
}

DecimalColumnVector

DecimalColumnVector处理十进制小数类型。但是其实现不是为性能设计的,以后可能会变化

public class DecimalColumnVector extends ColumnVector {

  /**
   * A vector of HiveDecimalWritable objects.
   *
   * For high performance and easy access to this low-level structure,
   * the fields are public by design (as they are in other ColumnVector
   * types).
   */
  public HiveDecimalWritable[] vector;
  ...
}

BytesColumnVector

BytesColumnVector处理所有的二进制类型,包括binary, char, string和varchar。

public class BytesColumnVector extends ColumnVector {
  public byte[][] vector;
  public int[] start;          // start offset of each field

  /*
   * The length of each field. If the value repeats for every entry, then it is stored
   * in vector[0] and isRepeating from the superclass is set to true.
   */
  public int[] length;

StructColumnVector

StructColumnVector是结构体类型,其实现是一个ColumnVector的数组,巧妙不过自然。

public class StructColumnVector extends ColumnVector {

  public ColumnVector[] fields;
  ...
}

UnionColumnVector

UnionColumnVector是联合类型,其中的元素共享存储空间。实现如下:

public class UnionColumnVector extends ColumnVector {

  public int[] tags;
  public ColumnVector[] fields;
  ...
}

其中的tags来标识元素子类型,fields[tag]标识对应的值。

MultiValuedColumnVector

MultiValuedColumnVector为多值列,需要记录每个值的起始点和长度。

public abstract class MultiValuedColumnVector extends ColumnVector {

  public long[] offsets;
  public long[] lengths;
  // the number of children slots used
  public int childCount;
  ...
}

ListColumnVector

ListColumnVector处理数组列

public class ListColumnVector extends MultiValuedColumnVector {

  public ColumnVector child;
  ...
}

其中list[i]的值为 offsets[i]..offsets[i]+lengths[i]-1 inclusive.

MapColumnVector

MapColumnVector处理map列

public class MapColumnVector extends MultiValuedColumnVector {

  public ColumnVector keys;
  public ColumnVector values;
  ...
}

其它类型不再罗列了,具体可以参看官方文档Apache ORC: Using Core Java

Java写ORC文件

快速上手示例

步骤

  1. 定义schema
  2. 使用OrcFile类创建Writer

pom文件

		<dependency>
			<groupId>org.apache.orc</groupId>
			<artifactId>orc-core</artifactId>
			<version>1.5.1</version>
		</dependency>

示例如下

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

public class WriteExample {
	public static void main(String[] args) throws IllegalArgumentException, IOException {
		Configuration conf = new Configuration();

		// 定义schema
		TypeDescription schema = TypeDescription.fromString("struct<x:int,y:int>");

		// 创建writer
		Writer writer = OrcFile.createWriter(new Path("my-file.orc"), OrcFile.writerOptions(conf).setSchema(schema));

		// 写文件
		VectorizedRowBatch batch = schema.createRowBatch();
		LongColumnVector x = (LongColumnVector) batch.cols[0];
		LongColumnVector y = (LongColumnVector) batch.cols[1];

		// 模拟10000行数据
		for (int r = 0; r < 10000; ++r) {
			int row = batch.size++;
			x.vector[row] = r;
			y.vector[row] = r * 3;

			// 默认每个batch为1024行,如果满了,则新起一个batch.
			if (batch.size == batch.getMaxSize()) {
				writer.addRowBatch(batch);
				batch.reset();
			}
		}
		if (batch.size != 0) {
			writer.addRowBatch(batch);
			batch.reset();
		}
		writer.close();
	}
}

高级示例

写入Map类型

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

public class AdvancedWriteExample {
	public static void main(String[] args) throws IllegalArgumentException, IOException {
		Configuration conf = new Configuration();

		// 定义schema
		TypeDescription schema = TypeDescription.fromString("struct<first:int,second:int,third:map<string,int>>");

		// 创建writer
		Writer writer = OrcFile.createWriter(new Path("advanced-example.orc"),
				OrcFile.writerOptions(conf).setSchema(schema));

		// 写文件
		VectorizedRowBatch batch = schema.createRowBatch();

		// 定义前两列
		LongColumnVector first = (LongColumnVector) batch.cols[0];
		LongColumnVector second = (LongColumnVector) batch.cols[1];

		// 定义map列,对key和value要做cast
		MapColumnVector map = (MapColumnVector) batch.cols[2];
		BytesColumnVector mapKey = (BytesColumnVector) map.keys;
		LongColumnVector mapValue = (LongColumnVector) map.values;

		// 每个map包含5个元素
		final int MAP_SIZE = 5;
		final int BATCH_SIZE = batch.getMaxSize();

		// 确保map的空间充足
		mapKey.ensureSize(BATCH_SIZE * MAP_SIZE, false);
		mapValue.ensureSize(BATCH_SIZE * MAP_SIZE, false);

		// 增加1500行到文件中
		for (int r = 0; r < 1500; ++r) {
			int row = batch.size++;

			// 处理前两列
			first.vector[row] = r;
			second.vector[row] = r * 3;

			// 处理map列偏移
			map.offsets[row] = map.childCount;
			map.lengths[row] = MAP_SIZE;
			map.childCount += MAP_SIZE;

			// 处理map列的值
			for (int mapElem = (int) map.offsets[row]; mapElem < map.offsets[row] + MAP_SIZE; ++mapElem) {
				String key = "row " + r + "." + (mapElem - map.offsets[row]);
				mapKey.setVal(mapElem, key.getBytes(StandardCharsets.UTF_8));
				mapValue.vector[mapElem] = mapElem;
			}

			// 默认每个batch为1024行,如果满了,则新起一个batch.
			if (row == BATCH_SIZE - 1) {
				writer.addRowBatch(batch);
				batch.reset();
			}
		}
		if (batch.size != 0) {
			writer.addRowBatch(batch);
			batch.reset();
		}
		writer.close();
	}
}

Java读ORC文件

步骤

  1. 使用OrcFile创建Reader
  2. 读取文件

示例

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;

public class ReaderExample {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();

		// 使用OrcFile创建Reader
		Reader reader = OrcFile.createReader(new Path("my-file.orc"), OrcFile.readerOptions(conf));

		// 读取文件
		RecordReader rows = reader.rows();
		VectorizedRowBatch batch = reader.getSchema().createRowBatch();

		while (rows.nextBatch(batch)) {
			System.out.println(batch.size);
			for (int r = 0; r < batch.size; ++r) {
				// ... process row r from batch
				// System.out.println(r);
			}
		}
		rows.close();
	}
}

结果

1024
1024
1024
1024
1024
1024
1024
1024
1024
784

参考

评论

发表评论

validate