-
Notifications
You must be signed in to change notification settings - Fork 184
First commit on supporting parquet #650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
unical1988
wants to merge
49
commits into
apache:main
Choose a base branch
from
unical1988:parquet
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit
Hold shift + click to select a range
74cbc83
First commit on supporting parquet
79bd222
catch file not found exception
2143c99
executed mvn spotless:apply
4f1ea77
added byte_array data type
f71610b
added ParquetStatsExtractor
c57a42f
added InternalDataFile population from parquet metadata
1557ea3
added col stats for parquet
24c474a
set todos
e1a3f35
integrated ParquetPartitionExtractor.java
fbbd1eb
added partitionValues to StatsExtractor builder
40c5e67
added the parquet conversion source provider
ec222de
run mvn spotless:apply
e0fbca8
edited ParquetSchemaExtractor to include some other LogicalTypes and …
6e2fc66
ParquetSchemaExtractor few fixes
b4c49b7
ParquetSchemaExtractor NULL type added
cac552a
ParquetSchemaExtractor Numeric and time types OK, TODO : Arrays and Maps
004d763
ParquetSchemaExtractor added groupTypes Map and List: TODO: tests
4b4593b
added -write parquet- to test Parquet types
9d56c21
added first test for primitive types
18ef037
cleanups
bd11c67
added timestamp metadata (millis, micros, nanos)
0dbedb0
added else type for each switch case
0233d54
added string type
8fc6a95
added Time type
c88fb25
added metadata for ENUM and FIXED
6c04cc7
adjusted primitive type detection
9bdd972
adjusted primitive types for fromInternalSchema sync, TODO: ENUM, LIS…
924db34
logic for partitionFields (from user configuration) and updated Conve…
271756e
adjusted data class for reading user config
f7db318
removed unacessary class
1323f63
added alternative methods for ParquetSchemaExtractor: to test
5c87799
fixed small error in the previous commit
c53b7c5
fixed small errors
80b9300
partitions are read from config
c54d038
conversion source and schema extractor link fixed, TODO: split into t…
c49dbaa
Schema Extractor: List and Map and Fixed are converted Avro Types
f95f87a
read config source bug fix
9df1c42
FIXED type conversion ok
60fdc8a
fixed few compilation errors
eb7f60f
few other compilation errors fixed
96e91cd
few other compilation errors fixed 2
f1b7524
code compiling
a79af62
cleanups for ParquetStatsExtractor and ParquetSchemaExtractor: compil…
a27e172
cleanups for Parquet partition methods: compiling but not tested
c58fe53
cleanups for Parquet Conversion Source and adjust StatsExtractor for …
429581a
ENUM conversion using avro-parquet: compiling but not tested
0f3c60b
LIST and RECORD for parquet data conversion
60dced9
Boolean for parquet data conversion, code compiling, not tested
1b698bd
StatsExtractor adjusted to remove encodings and add min max vals
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
249 changes: 249 additions & 0 deletions
249
xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,249 @@ | ||
package org.apache.xtable.parquet; | ||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
import lombok.Builder; | ||
import lombok.NonNull; | ||
import org.apache.parquet.Schema; | ||
import org.apache.parquet.SchemaBuilder; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.*; | ||
import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
import org.apache.xtable.parquet.ParquetSchemaConverter; | ||
import org.apache.xtable.model.*; | ||
import org.apache.xtable.model.schema.InternalPartitionField; | ||
import org.apache.xtable.model.schema.InternalSchema; | ||
import org.apache.xtable.model.storage.*; | ||
import org.apache.xtable.spi.extractor.ConversionSource; | ||
|
||
@Builder | ||
public class ParquetConversionSource implements ConversionSource<Long> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Few clarification questions to ensure we are on the same page.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
private final String tableName; | ||
private final String basePath; | ||
@NonNull private final Configuration hadoopConf; | ||
|
||
@Builder.Default | ||
private static final ParquetSchemaConverter schemaExtractor = ParquetSchemaConverter.getInstance(); | ||
|
||
|
||
|
||
@Builder.Default | ||
private static final ParquetMetadataExtractor parquetMetadataExtractor = | ||
ParquetMetadataExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private static final ParquetPartitionHelper parquetPartitionHelper = | ||
ParquetPartitionHelper.getInstance(); | ||
|
||
private Map<String, List<String>> initPartitionInfo() { | ||
return getPartitionFromDirectoryStructure( | ||
hadoopConf, basePath, Collections.emptyMap()); | ||
} | ||
|
||
/** | ||
* To infer schema getting the latest file assumption is that latest file will have new fields | ||
* | ||
* @param modificationTime the commit to consider for reading the table state | ||
* @return | ||
*/ | ||
@Override | ||
public InternalTable getTable(Long modificationTime) { | ||
|
||
Optional<LocatedFileStatus> latestFile = | ||
|
||
getParquetFiles(hadoopConf, basePath) | ||
.max(Comparator.comparing(FileStatus::getModificationTime)); | ||
|
||
ParquetMetadata parquetMetadata = | ||
parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath()); | ||
Schema tableSchema = | ||
new org.apache.parquet.parquet.ParquetSchemaConverter() | ||
.convert(parquetMetadata.getFileMetaData().getSchema()); | ||
|
||
Set<String> partitionKeys = initPartitionInfo().keySet(); | ||
|
||
// merge schema of partition into original as partition is not part of parquet fie | ||
if (!partitionKeys.isEmpty()) { | ||
tableSchema = mergeParquetSchema(tableSchema, partitionKeys); | ||
} | ||
InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema); | ||
|
||
List<InternalPartitionField> partitionFields = | ||
partitionKeys.isEmpty() | ||
? Collections.emptyList() | ||
: parquetPartitionHelper.getInternalPartitionField(partitionKeys, schema); | ||
DataLayoutStrategy dataLayoutStrategy = | ||
partitionFields.isEmpty() | ||
? DataLayoutStrategy.FLAT | ||
: DataLayoutStrategy.HIVE_STYLE_PARTITION; | ||
return InternalTable.builder() | ||
.tableFormat(TableFormat.PARQUET) | ||
.basePath(basePath) | ||
.name(tableName) | ||
.layoutStrategy(dataLayoutStrategy) | ||
.partitioningFields(partitionFields) | ||
.readSchema(schema) | ||
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Here to get current snapshot listing all files hence the -1 is being passed | ||
* | ||
* @return | ||
*/ | ||
@Override | ||
public InternalSnapshot getCurrentSnapshot() { | ||
|
||
List<LocatedFileStatus> latestFile = | ||
getParquetFiles(hadoopConf, basePath).collect(Collectors.toList()); | ||
Map<String, List<String>> partitionInfo = initPartitionInfo(); | ||
InternalTable table = getTable(-1L); | ||
List<InternalDataFile> internalDataFiles = | ||
latestFile.stream() | ||
.map( | ||
file -> | ||
InternalDataFile.builder() | ||
.physicalPath(file.getPath().toString()) | ||
.fileFormat(FileFormat.APACHE_PARQUET) | ||
.fileSizeBytes(file.getLen()) | ||
.partitionValues( | ||
parquetPartitionHelper.getPartitionValue( | ||
basePath, | ||
file.getPath().toString(), | ||
table.getReadSchema(), | ||
partitionInfo)) | ||
.lastModified(file.getModificationTime()) | ||
.columnStats( | ||
parquetMetadataExtractor.getColumnStatsForaFile( | ||
hadoopConf, file, table)) | ||
.build()) | ||
.collect(Collectors.toList()); | ||
|
||
return InternalSnapshot.builder() | ||
.table(table) | ||
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Whenever new file is added , condition to get new file is listing files whose modification time | ||
* is greater than previous ysnc | ||
* | ||
* @param modificationTime commit to capture table changes for. | ||
* @return | ||
*/ | ||
@Override | ||
public TableChange getTableChangeForCommit(Long modificationTime) { | ||
List<FileStatus> tableChanges = | ||
|
||
getParquetFiles(hadoopConf, basePath) | ||
.filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) | ||
.collect(Collectors.toList()); | ||
// TODO avoid doing full list of directory to get schema , just argument of modification time | ||
// needs to be tweaked | ||
InternalTable internalTable = getTable(-1L); | ||
Set<InternalDataFile> internalDataFiles = new HashSet<>(); | ||
Map<String, List<String>> partitionInfo = initPartitionInfo(); | ||
for (FileStatus tableStatus : tableChanges) { | ||
internalDataFiles.add( | ||
InternalDataFile.builder() | ||
.physicalPath(tableStatus.getPath().toString()) | ||
.partitionValues( | ||
parquetPartitionHelper.getPartitionValue( | ||
basePath, | ||
tableStatus.getPath().toString(), | ||
internalTable.getReadSchema(), | ||
partitionInfo)) | ||
.lastModified(tableStatus.getModificationTime()) | ||
.fileSizeBytes(tableStatus.getLen()) | ||
.columnStats( | ||
parquetMetadataExtractor.getColumnStatsForaFile( | ||
hadoopConf, tableStatus, internalTable)) | ||
.build()); | ||
} | ||
|
||
return TableChange.builder() | ||
.tableAsOfChange(internalTable) | ||
.filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public CommitsBacklog<Long> getCommitsBacklog( | ||
InstantsForIncrementalSync instantsForIncrementalSync) { | ||
|
||
List<Long> commitsToProcess = | ||
Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); | ||
|
||
return CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build(); | ||
} | ||
|
||
// TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs | ||
// to be checked | ||
@Override | ||
public boolean isIncrementalSyncSafeFrom(Instant instant) { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException {} | ||
|
||
private Schema mergeParquetSchema(Schema internalSchema, Set<String> parititonFields) { | ||
|
||
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = | ||
SchemaBuilder.record(internalSchema.getName()).fields(); | ||
for (Schema.Field field : internalSchema.getFields()) { | ||
fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault(); | ||
} | ||
|
||
for (String paritionKey : parititonFields) { | ||
fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault(); | ||
} | ||
|
||
return fieldAssembler.endRecord(); | ||
} | ||
|
||
|
||
public Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, String basePath) { | ||
try { | ||
FileSystem fs = FileSystem.get(hadoopConf); | ||
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(basePath), true); | ||
return remoteIteratorToStream(iterator) | ||
.filter(file -> file.getPath().getName().endsWith("parquet")); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public Map<String, List<String>> getPartitionFromDirectoryStructure( | ||
Configuration hadoopConf, String basePath, Map<String, List<String>> partitionMap) { | ||
|
||
try { | ||
FileSystem fs = FileSystem.get(hadoopConf); | ||
FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); | ||
Map<String, List<String>> currentPartitionMap = new HashMap<>(partitionMap); | ||
|
||
for (FileStatus dirStatus : baseFileStatus) { | ||
if (dirStatus.isDirectory()) { | ||
String partitionPath = dirStatus.getPath().getName(); | ||
if (partitionPath.contains("=")) { | ||
String[] partitionKeyValue = partitionPath.split("="); | ||
currentPartitionMap | ||
.computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>()) | ||
.add(partitionKeyValue[1]); | ||
getPartitionFromDirectoryStructure( | ||
hadoopConf, dirStatus.getPath().toString(), partitionMap); | ||
} | ||
} | ||
} | ||
return currentPartitionMap; | ||
|
||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
APL header is missing. Please run spotless plugin on the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just did, but on the other hand, not sure what is RFC, is there docs that explain what is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an example: #634
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@unical1988 Here's the template, I can help if you have more clarifications, we can discuss in the slack.
https://github.com/apache/incubator-xtable/blob/main/rfc/template.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vinishjail97 ok