feat(avro): expose Avro reader via registerAvro and readAvro (#60) · coderfender/datafusion-java@fe4414a · GitHub
Skip to content

Commit fe4414a

Browse files
authored
feat(avro): expose Avro reader via registerAvro and readAvro (apache#60)
1 parent 333ce48 commit fe4414a

12 files changed

Lines changed: 705 additions & 1 deletion

File tree

core/pom.xml

Lines changed: 5 additions & 0 deletions
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.datafusion;
21+
22+
import org.apache.arrow.vector.types.pojo.Schema;
23+
import org.apache.datafusion.protobuf.AvroReadOptionsProto;
24+
25+
/**
26+
* Configuration knobs for Avro sources passed to {@link SessionContext#registerAvro(String, String,
27+
* AvroReadOptions)} and {@link SessionContext#readAvro(String, AvroReadOptions)}.
28+
*
29+
* <p>Mirrors the subset of DataFusion's {@code AvroReadOptions} that maps onto the Java surface
30+
* today: {@code fileExtension} (default {@code ".avro"}) and an explicit Arrow {@code schema} that
31+
* bypasses on-read schema inference. {@code tablePartitionCols} is intentionally deferred -- no
32+
* other Java reader exposes Hive-style partitioning yet.
33+
*
34+
* <p>Avro carries its own per-block compression (snappy, deflate, bzip2, xz, zstandard) inside the
35+
* object container itself, negotiated when the file is written, so unlike CSV / NDJSON there is no
36+
* {@code FileCompressionType} setter.
37+
*/
38+
public final class AvroReadOptions {
39+
40+
private String fileExtension = ".avro";
41+
private Schema schema;
42+
43+
public AvroReadOptions fileExtension(String ext) {
44+
this.fileExtension = ext;
45+
return this;
46+
}
47+
48+
public AvroReadOptions schema(Schema schema) {
49+
this.schema = schema;
50+
return this;
51+
}
52+
53+
byte[] toBytes() {
54+
return AvroReadOptionsProto.newBuilder().setFileExtension(fileExtension).build().toByteArray();
55+
}
56+
57+
Schema schema() {
58+
return schema;
59+
}
60+
}

core/src/main/java/org/apache/datafusion/SessionContext.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,70 @@ public DataFrame readArrow(String path, ArrowReadOptions options) {
362362
return new DataFrame(dfHandle);
363363
}
364364

365+
/** Register an Avro file (or directory of Avro files) as a table. */
366+
public void registerAvro(String name, String path) {
367+
registerAvro(name, path, new AvroReadOptions());
368+
}
369+
370+
/**
371+
* Register an Avro file (or directory of Avro files) as a table with the supplied {@link
372+
* AvroReadOptions}.
373+
*
374+
* @throws IllegalArgumentException if any of {@code name}, {@code path}, or {@code options} is
375+
* {@code null}.
376+
* @throws RuntimeException if registration fails (path not found, schema mismatch, etc.).
377+
*/
378+
public void registerAvro(String name, String path, AvroReadOptions options) {
379+
if (nativeHandle == 0) {
380+
throw new IllegalStateException("SessionContext is closed");
381+
}
382+
if (name == null) {
383+
throw new IllegalArgumentException("registerAvro name must be non-null");
384+
}
385+
if (path == null) {
386+
throw new IllegalArgumentException("registerAvro path must be non-null");
387+
}
388+
if (options == null) {
389+
throw new IllegalArgumentException("registerAvro options must be non-null");
390+
}
391+
registerAvroWithOptions(
392+
nativeHandle,
393+
name,
394+
path,
395+
options.toBytes(),
396+
options.schema() != null ? serializeSchemaIpc(options.schema()) : null);
397+
}
398+
399+
/** Read an Avro file as a {@link DataFrame} without registering it. */
400+
public DataFrame readAvro(String path) {
401+
return readAvro(path, new AvroReadOptions());
402+
}
403+
404+
/**
405+
* Read an Avro file as a {@link DataFrame} with the supplied {@link AvroReadOptions}.
406+
*
407+
* @throws IllegalArgumentException if {@code path} or {@code options} is {@code null}.
408+
* @throws RuntimeException if the read fails.
409+
*/
410+
public DataFrame readAvro(String path, AvroReadOptions options) {
411+
if (nativeHandle == 0) {
412+
throw new IllegalStateException("SessionContext is closed");
413+
}
414+
if (path == null) {
415+
throw new IllegalArgumentException("readAvro path must be non-null");
416+
}
417+
if (options == null) {
418+
throw new IllegalArgumentException("readAvro options must be non-null");
419+
}
420+
long dfHandle =
421+
readAvroWithOptions(
422+
nativeHandle,
423+
path,
424+
options.toBytes(),
425+
options.schema() != null ? serializeSchemaIpc(options.schema()) : null);
426+
return new DataFrame(dfHandle);
427+
}
428+
365429
/**
366430
* Register a Java-implemented scalar UDF. After registration, the function can be invoked by SQL
367431
* via the UDF's name or referenced in DataFusion plans deserialised with {@link #fromProto}.
@@ -443,6 +507,12 @@ private static native void registerArrowWithOptions(
443507
private static native long readArrowWithOptions(
444508
long handle, String path, byte[] optionsBytes, byte[] schemaIpcBytes);
445509

510+
private static native void registerAvroWithOptions(
511+
long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes);
512+
513+
private static native long readAvroWithOptions(
514+
long handle, String path, byte[] optionsBytes, byte[] schemaIpcBytes);
515+
446516
private static native void registerJsonWithOptions(
447517
long handle, String name, String path, byte[] optionsBytes, byte[] schemaIpcBytes);
448518

Lines changed: 58 additions & 0 deletions

0 commit comments

Comments
 (0)