xds: gRFC A88 - Changes to XdsClient Watcher APIs (#12446) · grpc/grpc-java@f385add · GitHub
Skip to content

Commit f385add

Browse files
authored
1 parent 02e98a8 commit f385add

17 files changed

Lines changed: 1225 additions & 723 deletions

xds/src/main/java/io/grpc/xds/XdsDependencyManager.java

Lines changed: 24 additions & 24 deletions

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 62 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.grpc.ServerServiceDefinition;
4343
import io.grpc.Status;
4444
import io.grpc.StatusException;
45+
import io.grpc.StatusOr;
4546
import io.grpc.SynchronizationContext;
4647
import io.grpc.SynchronizationContext.ScheduledHandle;
4748
import io.grpc.internal.GrpcUtil;
@@ -401,18 +402,30 @@ private DiscoveryState(String resourceName) {
401402
}
402403

403404
@Override
404-
public void onChanged(final LdsUpdate update) {
405+
public void onResourceChanged(final StatusOr<LdsUpdate> update) {
405406
if (stopped) {
406407
return;
407408
}
408-
logger.log(Level.FINEST, "Received Lds update {0}", update);
409-
if (update.listener() == null) {
410-
onResourceDoesNotExist("Non-API");
409+
410+
if (!update.hasValue()) {
411+
Status status = update.getStatus();
412+
StatusException statusException = Status.UNAVAILABLE.withDescription(
413+
String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
414+
.withCause(status.asException())
415+
.asException();
416+
handleConfigNotFoundOrMismatch(statusException);
411417
return;
412418
}
413419

414-
String ldsAddress = update.listener().address();
415-
if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
420+
final LdsUpdate ldsUpdate = update.getValue();
421+
logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
422+
if (ldsUpdate.listener() == null) {
423+
handleConfigNotFoundOrMismatch(
424+
Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
425+
return;
426+
}
427+
String ldsAddress = ldsUpdate.listener().address();
428+
if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
416429
|| !ipAddressesMatch(ldsAddress)) {
417430
handleConfigNotFoundOrMismatch(
418431
Status.UNKNOWN.withDescription(
@@ -421,16 +434,15 @@ public void onChanged(final LdsUpdate update) {
421434
listenerAddress, ldsAddress)).asException());
422435
return;
423436
}
437+
424438
if (!pendingRds.isEmpty()) {
425439
// filter chain state has not yet been applied to filterChainSelectorManager and there
426-
// are two sets of sslContextProviderSuppliers, so we release the old ones.
427440
releaseSuppliersInFlight();
428441
pendingRds.clear();
429442
}
430443

431-
filterChains = update.listener().filterChains();
432-
defaultFilterChain = update.listener().defaultFilterChain();
433-
// Filters are loaded even if the server isn't serving yet.
444+
filterChains = ldsUpdate.listener().filterChains();
445+
defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
434446
updateActiveFilters();
435447

436448
List<FilterChain> allFilterChains = filterChains;
@@ -469,43 +481,33 @@ public void onChanged(final LdsUpdate update) {
469481
}
470482
}
471483

472-
private boolean ipAddressesMatch(String ldsAddress) {
473-
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
474-
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
475-
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
476-
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
477-
return false;
478-
}
479-
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
480-
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
481-
return listenerIp.equals(ldsIp);
482-
}
483-
484-
@Override
485-
public void onResourceDoesNotExist(final String resourceName) {
486-
if (stopped) {
487-
return;
488-
}
489-
StatusException statusException = Status.UNAVAILABLE.withDescription(
490-
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
491-
xdsClient.getBootstrapInfo().node().getId())).asException();
492-
handleConfigNotFoundOrMismatch(statusException);
493-
}
494-
495484
@Override
496-
public void onError(final Status error) {
485+
public void onAmbientError(final Status error) {
497486
if (stopped) {
498487
return;
499488
}
500489
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
501490
Status errorWithNodeId = error.withDescription(
502491
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
503492
logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
493+
504494
if (!isServing) {
505495
listener.onNotServing(errorWithNodeId.asException());
506496
}
507497
}
508498

499+
private boolean ipAddressesMatch(String ldsAddress) {
500+
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
501+
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
502+
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
503+
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
504+
return false;
505+
}
506+
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
507+
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
508+
return listenerIp.equals(ldsIp);
509+
}
510+
509511
private void shutdown() {
510512
stopped = true;
511513
cleanUpRouteDiscoveryStates();
@@ -794,54 +796,42 @@ private RouteDiscoveryState(String resourceName) {
794796
}
795797

796798
@Override
797-
public void onChanged(final RdsUpdate update) {
798-
syncContext.execute(new Runnable() {
799-
@Override
800-
public void run() {
801-
if (!routeDiscoveryStates.containsKey(resourceName)) {
802-
return;
803-
}
804-
if (savedVirtualHosts == null && !isPending) {
805-
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
806-
}
807-
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
808-
updateRdsRoutingConfig();
809-
maybeUpdateSelector();
799+
public void onResourceChanged(final StatusOr<RdsUpdate> update) {
800+
syncContext.execute(() -> {
801+
if (!routeDiscoveryStates.containsKey(resourceName)) {
802+
return; // Watcher has been cancelled.
810803
}
811-
});
812-
}
813804

814-
@Override
815-
public void onResourceDoesNotExist(final String resourceName) {
816-
syncContext.execute(new Runnable() {
817-
@Override
818-
public void run() {
819-
if (!routeDiscoveryStates.containsKey(resourceName)) {
820-
return;
805+
if (update.hasValue()) {
806+
if (savedVirtualHosts == null && !isPending) {
807+
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
821808
}
822-
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
809+
savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
810+
} else {
811+
logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
812+
new Object[]{resourceName, update.getStatus()});
823813
savedVirtualHosts = null;
824-
updateRdsRoutingConfig();
825-
maybeUpdateSelector();
826814
}
815+
// In both cases, a change has occurred that requires a config update.
816+
updateRdsRoutingConfig();
817+
maybeUpdateSelector();
827818
});
828819
}
829820

830821
@Override
831-
public void onError(final Status error) {
832-
syncContext.execute(new Runnable() {
833-
@Override
834-
public void run() {
835-
if (!routeDiscoveryStates.containsKey(resourceName)) {
836-
return;
837-
}
838-
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
839-
Status errorWithNodeId = error.withDescription(
840-
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
841-
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
842-
new Object[]{resourceName, errorWithNodeId});
843-
maybeUpdateSelector();
822+
public void onAmbientError(final Status error) {
823+
syncContext.execute(() -> {
824+
if (!routeDiscoveryStates.containsKey(resourceName)) {
825+
return; // Watcher has been cancelled.
844826
}
827+
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
828+
Status errorWithNodeId = error.withDescription(
829+
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
830+
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
831+
new Object[]{resourceName, errorWithNodeId});
832+
833+
// Per gRFC A88, ambient errors should not trigger a configuration change.
834+
// Therefore, we do NOT call maybeUpdateSelector() here.
845835
});
846836
}
847837

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,9 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
262262
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
263263
if (serverFeatures != null) {
264264
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
265-
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
265+
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
266+
ignoreResourceDeletion = true;
267+
}
266268
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
267269
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
268270
}

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,10 +457,10 @@ private void handleRpcStreamClosed(Status status) {
457457
if (responseReceived) {
458458
// A closed ADS stream after a successful response is not considered an error. Servers may
459459
// close streams for various reasons during normal operation, such as load balancing or
460-
// underlying connection hitting its max connection age limit (see gRFC A9).
460+
// underlying connection hitting its max connection age limit (see gRFC A9).
461461
if (!status.isOk()) {
462462
newStatus = Status.OK;
463-
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
463+
logger.log(XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
464464
+ "response was received, so this will not be treated as an error. Cause: {2}",
465465
status.getCode(), status.getDescription(), status.getCause());
466466
} else {

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 16 additions & 16 deletions

0 commit comments

Comments
 (0)