fix: correct lastChunk retry logic in BlobWriteChannel by BenWhitehead · Pull Request #918 · googleapis/java-storage · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.Maps;
import java.math.BigInteger;
import java.net.URL;
import java.util.Map;
Expand Down Expand Up @@ -78,12 +77,6 @@ private long getRemotePosition() {
return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
}

private StorageObject getRemoteStorageObject() {
return getOptions()
.getStorageRpcV1()
.get(getEntity().toPb(), Maps.newEnumMap(StorageRpc.Option.class));
}

private static StorageException unrecoverableState(
String uploadId,
int chunkOffset,
Expand Down Expand Up @@ -212,8 +205,12 @@ public void run() {
if (uploadAlreadyComplete && lastChunk) {
// Case 6
// Request object metadata if not available
long totalBytes = getPosition() + length;
if (storageObject == null) {
storageObject = getRemoteStorageObject();
storageObject =
getOptions()
.getStorageRpcV1()
.queryCompletedResumableUpload(getUploadId(), totalBytes);
}
// the following checks are defined here explicitly to provide a more
// informative if either storageObject is unable to be resolved or it's size is
Expand All @@ -239,7 +236,7 @@ public void run() {
remotePosition,
lastChunk);
}
if (size.longValue() != getPosition() + length) {
if (size.longValue() != totalBytes) {
throw unrecoverableState(
getUploadId(),
chunkOffset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,25 @@ public long getCurrentUploadOffset(String uploadId) {
}
}

@Override
public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) {
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest req = storage.getRequestFactory().buildPutRequest(url, new EmptyContent());
req.getHeaders().setContentRange(String.format("bytes */%s", totalBytes));
req.setParser(storage.getObjectParser());
HttpResponse response = req.execute();
// If the response is 200
if (response.getStatusCode() == 200) {
return response.parseAs(StorageObject.class);
} else {
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this means that the caller would have made a request to do an upload, but they get an error from the request to check the upload status which is a separate RPC. This might be confusing-- is there a way to wrap this in an error that corresponds to the method they originally called?

Also, if the response is a 308, I would assume that instead of failing we should retry from the last offset, no?

(Ignore me if I'm just missing how this works).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, when a resumable upload completes the shallow Object will be returned in the response. If the response is not received for any reason (network failure usually), this method allows for querying for that shallow Object. This method is used as part of the retry evaluation logic to determine if a final chunk must be retransmitted or not. This method will only return cleanly if the resumable upload returns 200, if a 308 is returned due to an incomplete upload a StorageException will be thrown here to signal that fact. In the case of BlobWriteChannel which uses this method, the exception would be caught by the retry handler and be evaluated there.

I've added a javadoc to the method on there interface with an explanation of why it's there.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, the javadoc really helps, thank you.

So, in the 308 case-- would we then (if in BlobWriteChannel) make another identical PUT request to query the correct offset to retry at? If so, is there a way to preserve the information from the 308 response here instead?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of BlobWriteChannel it performs a pre-check to try and determine the remote offset, if the remote offset indicates that the data has been written but for some reason it wasn't able to get the object metadata as part of the final PUT, it will then use this method to try and get that object metadata.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a 308 happens, that would land us in the normal retry flow, where right now unfortunately we don't have the ability to rewind and retry a chunk to try and recover.

In practice, with BlobWriteChannel any call to this new method is guarded by a check to see if the resumable session is complete or not (in fact I would have prefered to make this check an internal implementation detail but unfortunately BlobWriteChannel does not have the means of accessing the http client directly in order to be able to make this call). So, for all intents and purposes we side step the need to be able to rewind a chunk.

}
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down Expand Up @@ -875,10 +894,7 @@ public StorageObject writeWithResponse(
if (exception != null) {
throw exception;
}
GoogleJsonError error = new GoogleJsonError();
error.setCode(code);
error.setMessage(message);
throw translate(error);
throw buildStorageException(code, message);
}
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
Expand Down Expand Up @@ -925,10 +941,7 @@ public String open(StorageObject object, Map<Option, ?> options) {
setEncryptionHeaders(requestHeaders, "x-goog-encryption-", options);
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 200) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
Expand Down Expand Up @@ -958,10 +971,7 @@ public String open(String signedURL) {
requestHeaders.set("x-goog-resumable", "start");
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
throw buildStorageException(response.getStatusCode(), response.getStatusMessage());
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
Expand Down Expand Up @@ -1621,4 +1631,11 @@ public ServiceAccount getServiceAccount(String projectId) {
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

private static StorageException buildStorageException(int statusCode, String statusMessage) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(statusCode);
error.setMessage(statusMessage);
return translate(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,24 @@ void write(
*/
long getCurrentUploadOffset(String uploadId);

/**
* Attempts to retrieve the StorageObject from a completed resumable upload. When a resumable
* upload completes, the response will be the up-to-date StorageObject metadata. This up-to-date
* metadata can then be used to validate the total size of the object along with new generation
* and other information.
*
* <p>If for any reason, the response to the final PUT to a resumable upload is not received, this
* method can be used to query for the up-to-date StorageObject. If the upload is complete, this
* method can be used to access the StorageObject independently from any other liveness or
* conditional criteria requirements that are otherwise applicable when using {@link
* #get(StorageObject, Map)}.
*
* @param uploadId resumable upload ID URL
* @param totalBytes the total number of bytes that should have been written.
* @throws StorageException if the upload is incomplete or does not exist
*/
StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes);

/**
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
* returns metadata of the updated object, otherwise returns null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public long getCurrentUploadOffset(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject queryCompletedResumableUpload(String uploadId, long totalBytes) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.cloud.storage.spi.StorageRpcFactory;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.math.BigInteger;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -334,10 +333,10 @@ public void testWriteWithRetryAndObjectMetadata() throws IOException {
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
Expand Down Expand Up @@ -487,7 +486,7 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
eq(true)))
.andThrow(socketClosedException);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
expect(storageRpcMock.get(BLOB_INFO.toPb(), Maps.newEnumMap(StorageRpc.Option.class)))
expect(storageRpcMock.queryCompletedResumableUpload(eq(UPLOAD_ID), eq((long) MIN_CHUNK_SIZE)))
.andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE)));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
Expand Down