Bigtable: add a separate callable for point reads by igorbernstein2 · Pull Request #4264 · googleapis/google-cloud-java · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.bigtable.data.v2.internal.RequestContext;
import com.google.cloud.bigtable.data.v2.internal.RowSetUtil;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -264,4 +266,34 @@ private static ByteString wrapKey(String key) {
}
return ByteString.copyFromUtf8(key);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Query query = (Query) o;
return Objects.equal(tableId, query.tableId)
&& Objects.equal(builder.build(), query.builder.build());
}

@Override
public int hashCode() {
return Objects.hashCode(tableId, builder.build());
}

@Override
public String toString() {
ReadRowsRequest request = builder.build();

return MoreObjects.toStringHelper(this)
.add("tableId", tableId)
.add("keys", request.getRows().getRowKeysList())
.add("ranges", request.getRows().getRowRangesList())
.add("filter", request.getFilter())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
private final RequestContext requestContext;

private final ServerStreamingCallable<Query, Row> readRowsCallable;
private final UnaryCallable<Query, Row> readRowCallable;
private final UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable;
private final UnaryCallable<RowMutation, Void> mutateRowCallable;
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
Expand Down Expand Up @@ -151,6 +152,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
RequestContext.create(settings.getInstanceName(), settings.getAppProfileId());

readRowsCallable = createReadRowsCallable(new DefaultRowAdapter());
readRowCallable = createReadRowCallable(new DefaultRowAdapter());
sampleRowKeysCallable = createSampleRowKeysCallable();
mutateRowCallable = createMutateRowCallable();
bulkMutateRowsCallable = createBulkMutateRowsCallable();
Expand All @@ -162,7 +164,7 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
// <editor-fold desc="Callable creators">

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
* Creates a callable chain to handle streaming ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
Expand All @@ -176,6 +178,48 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
*/
public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(settings.readRowsSettings(), rowAdapter);
}

/**
* Creates a callable chain to handle point ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*/
public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT> rowAdapter) {
return createReadRowsCallable(
ServerStreamingCallSettings.<Query, Row>newBuilder()
.setRetryableCodes(settings.readRowSettings().getRetryableCodes())
.setRetrySettings(settings.readRowSettings().getRetrySettings())
.setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout())
.build(),
rowAdapter)
.first();
}

/**
* Creates a callable chain to handle ReadRows RPCs. The chain will:
*
* <ul>
* <li>Convert a {@link Query} into a {@link com.google.bigtable.v2.ReadRowsRequest} and
* dispatch the RPC.
* <li>Upon receiving the response stream, it will merge the {@link
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
* implementation can be configured in by the {@code rowAdapter} parameter.
* <li>Retry/resume on failure.
* <li>Filter out marker rows.
* </ul>
*/
private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<Query, Row> readRowsSettings, RowAdapter<RowT> rowAdapter) {

ServerStreamingCallable<ReadRowsRequest, RowT> merging =
new RowMergingCallable<>(stub.readRowsCallable(), rowAdapter);
Expand All @@ -185,9 +229,9 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
ServerStreamingCallSettings<ReadRowsRequest, RowT> innerSettings =
ServerStreamingCallSettings.<ReadRowsRequest, RowT>newBuilder()
.setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter))
.setRetryableCodes(settings.readRowsSettings().getRetryableCodes())
.setRetrySettings(settings.readRowsSettings().getRetrySettings())
.setIdleTimeout(settings.readRowsSettings().getIdleTimeout())
.setRetryableCodes(readRowsSettings.getRetryableCodes())
.setRetrySettings(readRowsSettings.getRetrySettings())
.setIdleTimeout(readRowsSettings.getIdleTimeout())
.build();

// Retry logic is split into 2 parts to workaround a rare edge case described in
Expand Down Expand Up @@ -356,10 +400,16 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
// </editor-fold>

// <editor-fold desc="Callable accessors">
/** Returns a streaming read rows callable */
public ServerStreamingCallable<Query, Row> readRowsCallable() {
return readRowsCallable;
}

/** Return a point read callable */
public UnaryCallable<Query, Row> readRowCallable() {
return readRowCallable;
}

public UnaryCallable<String, List<KeyOffset>> sampleRowKeysCallable() {
return sampleRowKeysCallable;
}
Expand Down
Loading