Make plugin task first-class citizen by honnix · Pull Request #272 · flyteorg/flytekit-java · GitHub
Skip to content
This repository was archived by the owner on Apr 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions flytekit-api/src/main/java/org/flyte/api/v1/PluginTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2023 Flyte Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.api.v1;

/** A registrar that creates {@link PluginTask} instances. */
public abstract class PluginTaskRegistrar implements Registrar<TaskIdentifier, PluginTask> {}
3 changes: 3 additions & 0 deletions flytekit-api/src/main/java/org/flyte/api/v1/TaskTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
/**
* A Task structure that uniquely identifies a task in the system. Tasks are registered as a first
* step in the system.
*
* <p>FIXME: consider offering TaskMetadata instead of having everything in TaskTemplate, see
* https://github.com/flyteorg/flyte/blob/ea72bbd12578d64087221592554fb71c368f8057/flyteidl/protos/flyteidl/core/tasks.proto#L90
*/
@AutoValue
public abstract class TaskTemplate {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Flyte Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.examples;

import com.google.auto.service.AutoService;
import org.flyte.flytekit.SdkPluginTask;
import org.flyte.flytekit.SdkTypes;

@AutoService(SdkPluginTask.class)
public class NoopPluginTask extends SdkPluginTask<Void, Void> {

public NoopPluginTask() {
super(SdkTypes.nulls(), SdkTypes.nulls());
}

@Override
public String getType() {
return "noop";
}
}
112 changes: 112 additions & 0 deletions flytekit-java/src/main/java/org/flyte/flytekit/SdkPluginTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2023 Flyte Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.flyte.flytekit;

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.flyte.api.v1.PartialTaskIdentifier;

/**
* A task that is handled by a Flyte backend plugin instead of run as a container. Note that a
* plugin task template does not have a container defined, neither all the jars captured in
* classpath, so if this is a requirement, one should use SdkRunnableTask overriding run method to
* simply return null.
*/
public abstract class SdkPluginTask<InputT, OutputT> extends SdkTransform<InputT, OutputT> {

private final SdkType<InputT> inputType;
private final SdkType<OutputT> outputType;

/**
* Called by subclasses passing the {@link SdkType}s for inputs and outputs.
*
* @param inputType type for inputs.
* @param outputType type for outputs.
*/
public SdkPluginTask(SdkType<InputT> inputType, SdkType<OutputT> outputType) {
this.inputType = inputType;
this.outputType = outputType;
}

public abstract String getType();

@Override
public SdkType<InputT> getInputType() {
return inputType;
}

@Override
public SdkType<OutputT> getOutputType() {
return outputType;
}

/** Specifies custom data that can be read by the backend plugin. */
public SdkStruct getCustom() {
return SdkStruct.empty();
}

/**
* Number of retries. Retries will be consumed when the task fails with a recoverable error. The
* number of retries must be less than or equals to 10.
*
* @return number of retries
*/
public int getRetries() {
return 0;
}

/**
* Indicates whether the system should attempt to look up this task's output to avoid duplication
* of work.
*/
public boolean isCached() {
return false;
}

/** Indicates a logical version to apply to this task for the purpose of cache. */
public String getCacheVersion() {
return null;
}

/**
* Indicates whether the system should attempt to execute cached instances in serial to avoid
* duplicate work.
*/
public boolean isCacheSerializable() {
return false;
}

@Override
SdkNode<OutputT> apply(
SdkWorkflowBuilder builder,
String nodeId,
List<String> upstreamNodeIds,
@Nullable SdkNodeMetadata metadata,
Map<String, SdkBindingData<?>> inputs) {
PartialTaskIdentifier taskId = PartialTaskIdentifier.builder().name(getName()).build();
List<CompilerError> errors =
Compiler.validateApply(nodeId, inputs, getInputType().getVariableMap());

if (!errors.isEmpty()) {
throw new CompilerException(errors);
}

return new SdkTaskNode<>(
builder, nodeId, taskId, upstreamNodeIds, metadata, inputs, outputType);
}
}
Loading