Bigtable: 10. Implementation: integrate ReadRows by igorbernstein2 · Pull Request #2940 · googleapis/google-cloud-java · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.data.v2.models.RowAdapter;

/**
* Remove the special marker rows generated by {@link RowMergingCallable}.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class FilterMarkerRowsCallable<RowT> extends ServerStreamingCallable<ReadRowsRequest, RowT> {
private final ServerStreamingCallable<ReadRowsRequest, RowT> innerCallable;
private final RowAdapter<RowT> rowAdapter;

public FilterMarkerRowsCallable(
ServerStreamingCallable<ReadRowsRequest, RowT> inner, RowAdapter<RowT> rowAdapter) {
this.rowAdapter = rowAdapter;
this.innerCallable = inner;
}

@Override
public void call(
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
FilteringResponseObserver innerObserver = new FilteringResponseObserver(responseObserver);
innerCallable.call(request, innerObserver, context);
}

private class FilteringResponseObserver implements ResponseObserver<RowT> {
private final ResponseObserver<RowT> outerObserver;
private StreamController innerController;
private boolean autoFlowControl = true;

FilteringResponseObserver(ResponseObserver<RowT> outerObserver) {
this.outerObserver = outerObserver;
}

@Override
public void onStart(final StreamController controller) {
innerController = controller;

outerObserver.onStart(
new StreamController() {
@Override
public void cancel() {
controller.cancel();
}

@Override
public void disableAutoInboundFlowControl() {
autoFlowControl = false;
controller.disableAutoInboundFlowControl();
}

@Override
public void request(int count) {
controller.request(count);
}
});
}

@Override
public void onResponse(RowT response) {
if (rowAdapter.isScanMarkerRow(response)) {
if (!autoFlowControl) {
innerController.request(1);
}
} else {
outerObserver.onResponse(response);
}
}

@Override
public void onError(Throwable t) {
outerObserver.onError(t);
}

@Override
public void onComplete() {
outerObserver.onComplete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.bigtable.data.v2.models.Query;

/**
* Enhancement for `readRowsCallable().first()` to gracefully limit the row count instead of
* cancelling the RPC
*/
class ReadRowsFirstCallable<RowT> extends UnaryCallable<Query, RowT> {
private final UnaryCallable<Query, RowT> inner;

ReadRowsFirstCallable(UnaryCallable<Query, RowT> inner) {
this.inner = inner;
}

@Override
public ApiFuture<RowT> futureCall(Query query, ApiCallContext context) {
return inner.futureCall(query.limit(1), context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.readrows;

import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.models.Query;

/**
* Simple wrapper for ReadRows to wrap the request protobufs.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class ReadRowsUserCallable<RowT> extends ServerStreamingCallable<Query, RowT> {
private final ServerStreamingCallable<ReadRowsRequest, RowT> inner;
private final RequestContext requestContext;
private final ReadRowsFirstCallable<RowT> firstCallable;

public ReadRowsUserCallable(
ServerStreamingCallable<ReadRowsRequest, RowT> inner, RequestContext requestContext) {
this.inner = inner;
this.requestContext = requestContext;

this.firstCallable = new ReadRowsFirstCallable<>(super.first());

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

}

@Override
public void call(Query request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
ReadRowsRequest innerRequest = request.toProto(requestContext);
inner.call(innerRequest, responseObserver, context);
}

// Optimization: since the server supports row limits, override the first callable.
// This way unnecessary data doesn't need to be buffered and the number of CANCELLED request
// statuses is minimized
@Override
public UnaryCallable<Query, RowT> first() {
return firstCallable;
}
}
Loading