This module provides a high-level integration layer for the Calcite-based query engine, enabling external systems such as Apache Spark or command-line tools to parse and analyze queries without exposing low-level internals.
This module provides components organized into two main areas aligned with the Unified Query API architecture:
UnifiedQueryParser: Parses PPL (Piped Processing Language) or SQL queries and returns the native parse result (UnresolvedPlanfor PPL,SqlNodefor Calcite SQL).UnifiedQueryPlanner: Accepts PPL or SQL queries and returns CalciteRelNodelogical plans as intermediate representation.UnifiedQueryTranspiler: Converts Calcite logical plans (RelNode) into SQL strings for various target databases using different SQL dialects.
UnifiedQueryCompiler: Compiles Calcite logical plans (RelNode) into executable JDBCPreparedStatementobjects for separation of compilation and execution.UnifiedFunction: Engine-agnostic function interface that enables functions to be evaluated across different execution engines without engine-specific code duplication.UnifiedFunctionRepository: Repository for discovering and loading functions asUnifiedFunctioninstances, providing a bridge between function definitions and external execution engines.
Together, these components enable complete workflows: parse PPL or SQL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines.
This API is currently experimental. The design intentionally exposes Calcite abstractions (Schema for catalogs, RelNode as IR, SqlDialect for dialects) rather than creating custom wrapper interfaces. This is to avoid overdesign by leveraging the flexible Calcite interface in the short term. If a more abstracted API becomes necessary in the future, breaking changes may be introduced with the new abstraction layer.
UnifiedQueryContext is a reusable abstraction shared across unified query components (planner, compiler, etc.). It bundles CalcitePlanContext and Settings into a single object, centralizing configuration for all unified query operations.
Create a context with catalog configuration, query type, and optional settings:
UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL) // or QueryType.SQL for SQL
.catalog("opensearch", opensearchSchema)
.catalog("spark_catalog", sparkSchema)
.defaultNamespace("opensearch")
.cacheMetadata(true)
.setting("plugins.query.size_limit", 200)
.build();Use UnifiedQueryParser to parse queries into their native parse tree. The parser is owned by UnifiedQueryContext and returns the native parse result for each language.
// PPL parsing
UnresolvedPlan ast = (UnresolvedPlan) context.getParser().parse("source = logs | where status = 200");
// SQL parsing (with QueryType.SQL context)
SqlNode sqlNode = (SqlNode) sqlContext.getParser().parse("SELECT * FROM logs WHERE status = 200");Callers can then use each language's native visitor infrastructure (AbstractNodeVisitor for PPL, SqlBasicVisitor for Calcite SQL) on the typed result for further analysis.
Use UnifiedQueryPlanner to parse and analyze PPL or SQL queries into Calcite logical plans. The planner accepts a UnifiedQueryContext and can be reused for multiple queries.
// Create planner with context
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
// Plan multiple queries (context is reused)
RelNode plan1 = planner.plan("source = logs | where status = 200");
RelNode plan2 = planner.plan("source = metrics | stats avg(cpu)");
// SQL queries are also supported (with QueryType.SQL context)
RelNode plan3 = planner.plan("SELECT * FROM logs WHERE status = 200");Use UnifiedQueryTranspiler to convert Calcite logical plans into SQL strings for target databases. The transpiler supports various SQL dialects through Calcite's SqlDialect interface.
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
.dialect(SparkSqlDialect.DEFAULT)
.build();
String sql = transpiler.toSql(plan);Supported SQL dialects include:
SparkSqlDialect.DEFAULT- Apache Spark SQLPostgresqlSqlDialect.DEFAULT- PostgreSQLMysqlSqlDialect.DEFAULT- MySQL- And other Calcite-supported dialects
Use UnifiedQueryCompiler to compile Calcite logical plans into executable JDBC statements. This separates compilation from execution and returns standard JDBC types.
UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context);
try (PreparedStatement statement = compiler.compile(plan)) {
ResultSet rs = statement.executeQuery();
while (rs.next()) {
// Standard JDBC ResultSet access
}
}The Unified Function API provides an engine-agnostic abstraction for functions, enabling them to be evaluated across different execution engines (Spark, Flink, Calcite, etc.) without engine-specific code duplication.
Types are represented as SQL type name strings for engine-agnostic serialization:
- Primitive types:
"VARCHAR","INTEGER","BIGINT","DOUBLE","BOOLEAN","DATE","TIMESTAMP" - Array types:
"ARRAY<ELEMENT_TYPE>"(e.g.,"ARRAY<INTEGER>") - Struct types:
"STRUCT<field1:TYPE1, field2:TYPE2>"(e.g.,"STRUCT<name:VARCHAR, age:INTEGER>")
Use UnifiedFunctionRepository to discover and load unified functions:
// Create repository with context
UnifiedFunctionRepository repository = new UnifiedFunctionRepository(context);
// Load all available functions
List<UnifiedFunctionDescriptor> allFunctions = repository.loadFunctions();
for (UnifiedFunctionDescriptor descriptor : allFunctions) {
String name = descriptor.getFunctionName();
UnifiedFunctionBuilder builder = descriptor.getBuilder();
// Use builder to create function instances
}
// Load a specific function by name
UnifiedFunctionDescriptor upperDescriptor = repository.loadFunction("UPPER").orElseThrow();Functions are created using builders with specific input types:
// Get function descriptor
UnifiedFunctionDescriptor descriptor = repository.loadFunction("UPPER").orElseThrow();
// Build function with specific input types
UnifiedFunction upperFunc = descriptor.getBuilder().build(List.of("VARCHAR"));
// Get function metadata
String name = upperFunc.getFunctionName(); // "UPPER"
List<String> inputTypes = upperFunc.getInputTypes(); // ["VARCHAR"]
String returnType = upperFunc.getReturnType(); // "VARCHAR"
// Evaluate function
Object result = upperFunc.eval(List.of("hello")); // "HELLO"Combining all components for a complete PPL query workflow:
// Step 1: Create reusable context (shared across all components)
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", schema)
.defaultNamespace("catalog")
.build()) {
// Step 2: Create planner with context
UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
// Step 3: Plan PPL query into logical plan
RelNode plan = planner.plan("source = employees | where age > 30");
// Option A: Transpile to target SQL
UnifiedQueryTranspiler transpiler = UnifiedQueryTranspiler.builder()
.dialect(SparkSqlDialect.DEFAULT)
.build();
String sparkSql = transpiler.toSql(plan);
// Result: SELECT * FROM `catalog`.`employees` WHERE `age` > 30
// Option B: Compile and execute directly
UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context);
try (PreparedStatement statement = compiler.compile(plan)) {
ResultSet rs = statement.executeQuery();
while (rs.next()) {
// Process results with standard JDBC
}
}
}The unified query API supports the same profiling capability as the PPL REST endpoint. When enabled, each unified query component automatically collects per-phase timing metrics. For code outside unified query components (e.g., PreparedStatement.executeQuery() or response formatting), context.measure() records custom phases into the same profile.
try (UnifiedQueryContext context = UnifiedQueryContext.builder()
.language(QueryType.PPL)
.catalog("catalog", schema)
.defaultNamespace("catalog")
.profiling(true)
.build()) {
// Auto-profiled: ANALYZE
RelNode plan = new UnifiedQueryPlanner(context).plan(query);
// Auto-profiled: OPTIMIZE
PreparedStatement stmt = new UnifiedQueryCompiler(context).compile(plan);
// User-profiled via measure()
ResultSet rs = context.measure(MetricName.EXECUTE, stmt::executeQuery);
String json = context.measure(MetricName.FORMAT, () -> formatter.format(result));
// Retrieve profile snapshot
QueryProfile profile = context.getProfile();
}The returned QueryProfile follows the same JSON structure as the REST API:
{
"summary": {
"total_time_ms": 33.34
},
"phases": {
"analyze": { "time_ms": 8.68 },
"optimize": { "time_ms": 18.2 },
"execute": { "time_ms": 4.87 },
"format": { "time_ms": 0.05 }
},
"plan": {
"node": "EnumerableCalc",
"time_ms": 4.82,
"rows": 2,
"children": [
{ "node": "CalciteEnumerableIndexScan", "time_ms": 4.12, "rows": 2 }
]
}
}When profiling is disabled (the default), all components execute with zero overhead.
A set of unit tests is provided to validate planner behavior.
To run tests:
./gradlew :api:test
This guide walks through how to integrate unified query planner into your application.
The module is currently published as a snapshot to the AWS Sonatype Snapshots repository. To include it as a dependency in your project, add the following to your pom.xml or build.gradle:
<dependency>
<groupId>org.opensearch.query</groupId>
<artifactId>unified-query-api</artifactId>
<version>YOUR_VERSION_HERE</version>
</dependency>You must implement the Calcite Schema interface and register them using the fluent catalog() method on the builder.
public class MySchema extends AbstractSchema {
@Override
protected Map<String, Table> getTableMap() {
return Map.of(
"test_table",
new AbstractTable() {
@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return typeFactory.createStructType(
List.of(typeFactory.createSqlType(SqlTypeName.INTEGER)),
List.of("id"));
}
});
}
}- Extend planner to generate optimized physical plans using Calcite's optimization frameworks.
