Fix PubSub Iterator pullAsync: add callback to PubSubRpc.pull by mziccard · Pull Request #1048 · googleapis/google-cloud-java · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.Empty;
Expand Down Expand Up @@ -89,6 +91,30 @@ protected ExecutorFactory executorFactory() {
}
}

private static final class PullFutureImpl
extends ForwardingListenableFuture.SimpleForwardingListenableFuture<PullResponse>
implements PullFuture {

PullFutureImpl(ListenableFuture<PullResponse> delegate) {
super(delegate);
}

@Override
public void addCallback(final PullCallback callback) {
Futures.addCallback(delegate(), new FutureCallback<PullResponse>() {
@Override
public void onSuccess(PullResponse result) {
callback.success(result);
}

@Override
public void onFailure(Throwable error) {
callback.failure(error);
}
});
}
}

public DefaultPubSubRpc(PubSubOptions options) throws IOException {
executorFactory = new InternalPubSubOptions(options).executorFactory();
executor = executorFactory.get();
Expand Down Expand Up @@ -136,13 +162,13 @@ private static ApiCallSettings.Builder apiCallSettings(PubSubOptions options) {
return ApiCallSettings.newBuilder().setRetrySettingsBuilder(builder);
}

private static <V> Future<V> translate(ListenableFuture<V> from, final boolean idempotent,
int... returnNullOn) {
private static <V> ListenableFuture<V> translate(ListenableFuture<V> from,
final boolean idempotent, int... returnNullOn) {
final Set<Integer> returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length);
for (int value : returnNullOn) {
returnNullOnSet.add(value);
}
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
Expand Down Expand Up @@ -224,8 +250,8 @@ public Future<Empty> acknowledge(AcknowledgeRequest request) {
}

@Override
public Future<PullResponse> pull(PullRequest request) {
return translate(subscriberApi.pullCallable().futureCall(request), false);
public PullFuture pull(PullRequest request) {
return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,44 @@

public interface PubSubRpc extends AutoCloseable {

/**
* A callback that can be registered to {@link PullFuture} objects. Objects of this class allow
* to asynchronously react to the success or failure of a pull RPC.
*/
interface PullCallback {

/**
* This method is invoked with the result of a {@link PullFuture} when it was successful.
*
* @param response the pull response
*/
void success(PullResponse response);

/**
* This method is invoked when the {@link PullFuture} failed or was cancelled.
*
* @param error the execption that caused the {@link PullFuture} to fail
*/
void failure(Throwable error);
}

/**
* A {@link Future} implementation for pull RPCs. This class also allows users to register
* callbacks via {@link #addCallback(PullCallback)}.
*/
interface PullFuture extends Future<PullResponse> {

/**
* Registers a callback to be run on the given executor. The listener will run when the pull
* future completed its computation or, if the computation is already complete, immediately.
* There is no guaranteed ordering of execution of callbacks.
*
* <p>Registered callbacks are run using the same thread that run the RPC call. Only lightweight
* callbacks should be registered via this method.
*/
void addCallback(final PullCallback callback);
}

// in all cases root cause of ExecutionException is PubSubException
Future<Topic> create(Topic topic);

Expand All @@ -66,7 +104,7 @@ public interface PubSubRpc extends AutoCloseable {

Future<Empty> acknowledge(AcknowledgeRequest request);

Future<PullResponse> pull(PullRequest request);
PullFuture pull(PullRequest request);

Future<Empty> modify(ModifyPushConfigRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.client.util.Lists;
import com.google.cloud.AsyncPage;
import com.google.cloud.Page;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import org.junit.Ignore;
Expand Down