[Improvement] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster#139
Conversation
…quest of AccessCluster
|
|
||
|
|
||
| public static final ConfigEntry<Long> RSS_CLIENT_FALLBACK_RETRY_INTERVAL = createLongBuilder( | ||
| new ConfigBuilder("spark.rss.client.fallback.retry.interval") |
There was a problem hiding this comment.
Could we use spark.rss.client.access.retry.interval.ms? Because we don't attempt to fallback, we want to access RSS.The variable's name should have unit, it improves readability.
This is a user-facing change. It add some config options, you should add some documents to explain how to use it. |
| .createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE); | ||
|
|
||
|
|
||
| public static final ConfigEntry<Long> RSS_CLIENT_FALLBACK_RETRY_INTERVAL = createLongBuilder( |
There was a problem hiding this comment.
Could we usespark.rss.client.access.retry.interval.ms? Because we want to access RSS instead of fallback, the variable's name contains unit, it will improve readability.
There was a problem hiding this comment.
Could we change RSS_CLIENT_FALLBACK_RETRY_INTERVAL to RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS together?
| .doc("Interval between retries fallback to SortShuffleManager")) | ||
| .createWithDefault(20000L); | ||
|
|
||
| public static final ConfigEntry<Integer> RSS_CLIENT_FALLBACK_RETRY_TIMES = createIntegerBuilder( |
| public static final ConfigEntry<Integer> RSS_CLIENT_FALLBACK_RETRY_TIMES = createIntegerBuilder( | ||
| new ConfigBuilder("spark.rss.client.fallback.retry.times") | ||
| .doc("Number of retries fallback to SortShuffleManager")) | ||
| .createWithDefault(3); |
There was a problem hiding this comment.
Could we use default value 0? Because we want to keep consistent with the previous behaviour.
There was a problem hiding this comment.
Agree with you, users can change it on their own if necessary.
Added. |
| .createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE); | ||
|
|
||
|
|
||
| public static final ConfigEntry<Long> RSS_CLIENT_FALLBACK_RETRY_INTERVAL = createLongBuilder( |
There was a problem hiding this comment.
Could we change RSS_CLIENT_FALLBACK_RETRY_INTERVAL to RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS together?
| .createWithDefault(20000L); | ||
|
|
||
| public static final ConfigEntry<Integer> RSS_CLIENT_FALLBACK_RETRY_TIMES = createIntegerBuilder( | ||
| new ConfigBuilder("spark.rss.client.access.retry.times") |
There was a problem hiding this comment.
Could we change RSS_CLIENT_FALLBACK_RETRY_TIMES to RSS_CLIENT_ACCESS_RETRY_TIMES together?
There was a problem hiding this comment.
I forgot to change it. Updated.
|
Could we add some test cases for this pr? Could you add documents in https://github.com/apache/incubator-uniffle/blob/master/docs/client_guide.md? |
I think this PR has more logic of retry, and I think it is enough to have the test class RetryUtilsTest, or do you have any better suggestions? |
Could we add a test case in DelegationRssShuffleManagerTest? If you change the logic, we'd better have test case. |
I added a simple test. Do you have a good idea? |
|
|
||
| @Test | ||
| public void testTryAccessCluster() { | ||
| SparkConf conf = new SparkConf(); |
There was a problem hiding this comment.
Could we mock coordinator client? Could we use the method tryAccessCluster in this case?
There was a problem hiding this comment.
I thought so at first, but tryAccessCluster is not directly called by the CoordinatorClient, and the tryAccessClustermethod depends on the list of global object coordinatorClients, so I don't have a better way for the time being.
There was a problem hiding this comment.
coordinatorClients is created by RssSparkShuffleUtils.createCoordinatorClients(sparkConf). Could we imitate https://github.com/apache/incubator-uniffle/blob/79804c544b560ae3e872964a428d328dd71489a7/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java#L79
There was a problem hiding this comment.
Could we add a case that the the access fail 4 times and we need to create sort shuffle manager?
roryqi
left a comment
There was a problem hiding this comment.
Thanks @smallzhongfeng LGTM

What changes were proposed in this pull request?
To solve issue #127
Why are the changes needed?
Avoid some memory shortage situations, and retry to ensure that the tasks run in the RSS cluster as much as possible.
Does this PR introduce any user-facing change?
Two new parameters are added on the client side,
spark.rss.client.access.retry.timesthe number of retry reconnection andspark.rss.client.access.retry.interval.msthe reconnection interval. The user can set these two parameters within his expected time to make the task run in the RSS cluster as much as possible.How was this patch tested?
No need.