Add option in NIO for re-opening the channel to retry for some errors by jean-philippe-martin · Pull Request #1715 · googleapis/google-cloud-java · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public CloudStorageFileSystemProvider() {

CloudStorageFileSystemProvider(@Nullable StorageOptions gcsStorageOptions) {
this.storageOptions = gcsStorageOptions;

}

// Initialize this.storage, once. This may throw an exception if default authentication
Expand Down Expand Up @@ -226,6 +225,7 @@ public SeekableByteChannel newByteChannel(
private SeekableByteChannel newReadChannel(Path path, Set<? extends OpenOption> options)
throws IOException {
initStorage();
int maxChannelReopens = ((CloudStorageFileSystem) path.getFileSystem()).config().maxChannelReopens();
for (OpenOption option : options) {
if (option instanceof StandardOpenOption) {
switch ((StandardOpenOption) option) {
Expand All @@ -247,6 +247,8 @@ private SeekableByteChannel newReadChannel(Path path, Set<? extends OpenOption>
default:
throw new UnsupportedOperationException(option.toString());
}
} else if (option instanceof OptionMaxChannelReopens) {
maxChannelReopens = ((OptionMaxChannelReopens) option).maxChannelReopens();
} else {
throw new UnsupportedOperationException(option.toString());
}
Expand All @@ -255,7 +257,7 @@ private SeekableByteChannel newReadChannel(Path path, Set<? extends OpenOption>
if (cloudPath.seemsLikeADirectoryAndUsePseudoDirectories()) {
throw new CloudStoragePseudoDirectoryException(cloudPath);
}
return CloudStorageReadChannel.create(storage, cloudPath.getBlobId(), 0);
return CloudStorageReadChannel.create(storage, cloudPath.getBlobId(), 0, maxChannelReopens);
}

private SeekableByteChannel newWriteChannel(Path path, Set<? extends OpenOption> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,5 +91,15 @@ public static CloudStorageOption.OpenCopy withBlockSize(int size) {
return OptionBlockSize.create(size);
}

/**
* Sets the max number of times that the channel can be reopen if reading
* fails because the channel unexpectedly closes.
*
* <p>The default is 0.
*/
public static CloudStorageOption.OpenCopy withChannelReopen(int count) {
return OptionMaxChannelReopens.create(count);
}

private CloudStorageOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -41,27 +42,42 @@
@ThreadSafe
final class CloudStorageReadChannel implements SeekableByteChannel {

private final ReadChannel channel;
private final Storage gcsStorage;
private final BlobId file;
// max # of times we may reopen the file
private final int maxChannelReopens;
// how many times we re-opened the file
private int reopens;
private ReadChannel channel;
private long position;
private long size;

/**
* @param maxChannelReopens max number of times to try re-opening the channel if it closes on us unexpectedly.
*/
@CheckReturnValue
@SuppressWarnings("resource")
static CloudStorageReadChannel create(Storage gcsStorage, BlobId file, long position)
static CloudStorageReadChannel create(Storage gcsStorage, BlobId file, long position, int maxChannelReopens)
throws IOException {
return new CloudStorageReadChannel(gcsStorage, file, position, maxChannelReopens);
}

private CloudStorageReadChannel(Storage gcsStorage, BlobId file, long position, int maxChannelReopens) throws IOException {
this.gcsStorage = gcsStorage;
this.file = file;
this.position = position;
this.reopens = 0;
this.maxChannelReopens = maxChannelReopens;
// XXX: Reading size and opening file should be atomic.
long size = fetchSize(gcsStorage, file);
ReadChannel channel = gcsStorage.reader(file);
this.size = fetchSize(gcsStorage, file);
innerOpen();
}

private void innerOpen() throws IOException {
this.channel = gcsStorage.reader(file);
if (position > 0) {
channel.seek((int) position);
}
return new CloudStorageReadChannel(position, size, channel);
}

private CloudStorageReadChannel(long position, long size, ReadChannel channel) {
this.position = position;
this.size = size;
this.channel = channel;
}

@Override
Expand All @@ -78,11 +94,36 @@ public void close() throws IOException {
}
}


@Override
public int read(ByteBuffer dst) throws IOException {
synchronized (this) {
checkOpen();
int amt = channel.read(dst);
int amt;
int retries = 0;
int maxRetries = 3;
dst.mark();
while (true) {
try {

This comment was marked as spam.

This comment was marked as spam.

dst.reset();
amt = channel.read(dst);
break;
} catch (StorageException exs) {
if (exs.getMessage().contains("Connection closed prematurely") && reopens < maxChannelReopens) {
// this error isn't marked as retryable since the channel is closed;
// but here at this higher level we can retry it.
reopens++;
sleepForAttempt(reopens);
innerOpen();
continue;
} else if ((exs.getCode() == 500 || exs.getCode() == 503) && retries < maxRetries) {
retries++;
sleepForAttempt(retries);
continue;
}
throw exs;
}
}
if (amt > 0) {
position += amt;
// XXX: This would only ever happen if the fetchSize() race-condition occurred.
Expand All @@ -94,6 +135,15 @@ public int read(ByteBuffer dst) throws IOException {
}
}

private void sleepForAttempt(int attempt) {
try {
Thread.sleep((attempt - 1) * 500);
} catch (InterruptedException iex) {
// reset interrupt flag
Thread.currentThread().interrupt();
}
}

@Override
public long size() throws IOException {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.contrib.nio;

import com.google.auto.value.AutoValue;

@AutoValue
abstract class OptionMaxChannelReopens implements CloudStorageOption.OpenCopy {

/**
* Re-open the channel if it's closed unexpectedly while we're reading it.
*/
static OptionMaxChannelReopens create(int retryCount) {
return new AutoValue_OptionMaxChannelReopens(retryCount);
}

abstract int maxChannelReopens();
}