ParquetFileReader
ParquetFileReader.java
/**
* @param conf the Hadoop Configuration
* @param file Path to a parquet file
* @param filter a {@link MetadataFilter} for selecting row groups
* @throws IOException if the file can not be opened
*/
public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException {
this.converter = new ParquetMetadataConverter(conf);
this.conf = conf;
FileSystem fs = file.getFileSystem(conf);
this.fileStatus = fs.getFileStatus(file);
this.f = HadoopStreams.wrap(fs.open(file));
//读取文件footer 元数据
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
// 文件粒度地元数据
this.fileMetaData = footer.getFileMetaData();
// rowgroup 粒度的 元数据, blocks 的类型为 BlockMetaData
this.blocks = footer.getBlocks();
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
// the page size parameter isn't meaningful when only using
// the codec factory to get decompressors
this.codecFactory = new CodecFactory(conf);
}
// BlockMetaData 主要成员
public class BlockMetaData {
private List<ColumnChunkMetaData> columns = new ArrayList<ColumnChunkMetaData>();
private long rowCount;
private long totalByteSize;
private String path;
读取 rowgroup
/**
* Reads all the columns requested from the row group at the current file position.
* @throws IOException if an error occurs while reading
* @return the PageReadStore which can provide PageReaders for each column.
*/
public PageReadStore readNextRowGroup() throws IOException {
if (currentBlock == blocks.size()) {
return null;
}
// 获取该 rowgroup 的元数据
BlockMetaData block = blocks.get(currentBlock);
if (block.getRowCount() == 0) {
throw new RuntimeException("Illegal row group of 0 rows");
}
this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount());
// prepare the list of consecutive chunks to read them in one scan
// 存放读取连续的chunk
List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
ConsecutiveChunkList currentChunks = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
ColumnDescriptor columnDescriptor = paths.get(pathKey);
if (columnDescriptor != null) {
long startingPos = mc.getStartingPos();
// first chunk or not consecutive => new list
if (currentChunks == null || currentChunks.endPos() != startingPos) {
currentChunks = new ConsecutiveChunkList(startingPos);
allChunks.add(currentChunks);
}
currentChunks.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
}
}
// actually read all the chunks
// 提炼出每个 chunk
for (ConsecutiveChunkList consecutiveChunks : allChunks) {
final List<Chunk> chunks = consecutiveChunks.readAll(f);
for (Chunk chunk : chunks) {
currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
}
}
// avoid re-reading bytes the dictionary reader is used after this call
if (nextDictionaryReader != null) {
nextDictionaryReader.setRowGroup(currentRowGroup);
}
advanceToNextBlock();
return currentRowGroup;
}
ChunkDescriptor
/**
* information needed to read a column chunk
*/
private static class ChunkDescriptor {
private final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
private final int size;
/**
* @param col column this chunk is part of
* @param metadata metadata for the column
* @param fileOffset offset in the file where this chunk starts
* @param size size of the chunk
*/
private ChunkDescriptor(
ColumnDescriptor col,
ColumnChunkMetaData metadata,
long fileOffset,
int size) {
super();
this.col = col;
this.metadata = metadata;
this.fileOffset = fileOffset;
this.size = size;
}
}
版权声明:本文为zhixingheyi_tian原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。