[Improvement][AQE] Avoid calling getShuffleResult multiple times by leixm · Pull Request #190 · apache/uniffle · GitHub
Skip to content

[Improvement][AQE] Avoid calling getShuffleResult multiple times#190

Merged
roryqi merged 12 commits into
apache:masterfrom
leixm:issue_136
Sep 8, 2022
Merged

[Improvement][AQE] Avoid calling getShuffleResult multiple times#190
roryqi merged 12 commits into
apache:masterfrom
leixm:issue_136

Conversation

@leixm

@leixm leixm commented Aug 27, 2022

Copy link
Copy Markdown
Contributor

###What changes were proposed in this pull request?
For issue #136 , When we use AQE, we may call shuffleWriteClient.getShuffleResult multiple times. But if both partition 1 and partition 2 are on the server A, we call getShuffleResult(partition 1) to get data form server A, and then we call getShuffleResult(partition 2) to get data form server A, it's not necassray. We can get getShuffleResult(partition 1, partition 2) instead.

###Why are the changes needed?
Improve getShuffleResult

###Does this PR introduce any user-facing change?
No

###How was this patch tested?
Added UT

@leixm

leixm commented Aug 27, 2022

Copy link
Copy Markdown
Contributor Author

@codecov-commenter

codecov-commenter commented Aug 27, 2022

Copy link
Copy Markdown

Codecov Report

Attention: Patch coverage is 42.57426% with 58 lines in your changes missing coverage. Please review.

Project coverage is 58.99%. Comparing base (5eb2758) to head (08c858f).
Report is 922 commits behind head on master.

Files with missing lines Patch % Lines
...pache/uniffle/server/ShuffleServerGrpcService.java 0.00% 29 Missing ⚠️
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 0.00% 28 Missing ⚠️
.../java/org/apache/uniffle/common/util/RssUtils.java 95.45% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master     #190      +/-   ##
============================================
+ Coverage     58.40%   58.99%   +0.58%     
- Complexity     1273     1325      +52     
============================================
  Files           158      160       +2     
  Lines          8446     8699     +253     
  Branches        784      815      +31     
============================================
+ Hits           4933     5132     +199     
- Misses         3260     3303      +43     
- Partials        253      264      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@roryqi

roryqi commented Aug 27, 2022

Copy link
Copy Markdown
Contributor

Is it a compatible feature? If this is a breaking change, we should dicuss about it in the mail list first. But it seems to become a compatible feature if you adjust your implement.

@leixm

leixm commented Aug 27, 2022

Copy link
Copy Markdown
Contributor Author

I think it's a compatible feature, could you give me more detail? @jerqi

@leixm

leixm commented Aug 27, 2022

Copy link
Copy Markdown
Contributor Author

Or we should keep the original implementation getShuffleResult(partitionId), and then add a new implementation getShuffleResult(startPartition, endPartition)?

@roryqi

roryqi commented Aug 27, 2022

Copy link
Copy Markdown
Contributor

A compatible feature means that old version server can use the new version server, the new version client can use the old version server. You change the protobuf field name, it will cause incompatibility in my thought.

@leixm

leixm commented Aug 27, 2022

Copy link
Copy Markdown
Contributor Author

Thank you for your reminder, i will make it compatible.

@leixm

leixm commented Aug 28, 2022

Copy link
Copy Markdown
Contributor Author

@jerqi can you help review this pr plz?

@leixm

leixm commented Aug 28, 2022

Copy link
Copy Markdown
Contributor Author

org.apache.uniffle.common.security.HadoopSecurityContextTest test failed cause by Address already in use, It seems to be caused by running multiple tests at the same time

@roryqi

roryqi commented Aug 28, 2022

Copy link
Copy Markdown
Contributor

@zuston Could you help me solve the problem of the flaky kerberos test?

Comment thread proto/src/main/proto/Rss.proto Outdated
@zuston

zuston commented Aug 28, 2022

Copy link
Copy Markdown
Member

@zuston Could you help me solve the problem of the flaky kerberos test?

OK. Let me solve this tomorrow.

@roryqi

roryqi commented Aug 28, 2022

Copy link
Copy Markdown
Contributor

I think we should test this with some SQL query in integration test.

@leixm

leixm commented Aug 28, 2022

Copy link
Copy Markdown
Contributor Author

You're right , I will add some test in integration test.

@roryqi

roryqi commented Aug 29, 2022

Copy link
Copy Markdown
Contributor

Flaky test was fixed by #191 .Thanks @zuston

@leixm

leixm commented Aug 30, 2022

Copy link
Copy Markdown
Contributor Author

I think we should test this with some SQL query in integration test.

Already added integration test, Can you help review? Thank you @jerqi

@roryqi

roryqi commented Aug 30, 2022

Copy link
Copy Markdown
Contributor

If the new version client access the old version server, what will happen? Could we fallback to use old way to access the old version server when there are exceptions?

@leixm

leixm commented Aug 30, 2022

Copy link
Copy Markdown
Contributor Author

If the new version client access the old version server, what will happen? Could we fallback to use old way to access the old version server when there are exceptions?

I think the new version of the client does not need to be compatible with the old version of the server.

@roryqi

roryqi commented Aug 30, 2022

Copy link
Copy Markdown
Contributor

If the new version client access the old version server, what will happen? Could we fallback to use old way to access the old version server when there are exceptions?

I think the new version of the client does not need to be compatible with the old version of the server.

OK, I make a mistake. Maybe we just guarantee that old client can access the new server.

int32 shuffleId = 2;
repeated int32 partitions = 3;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a new message for response?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind

Comment thread common/src/main/java/org/apache/uniffle/common/util/RssUtils.java Outdated
return hostName.replaceAll("[\\.-]", "_");
}

public static Map<Integer, Roaring64NavigableMap> shuffleBitmapToPartitionBitmap(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need startPartition and endPartition?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the partition may not have a block, but we need to fill it to avoid NPE when used, See

for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
      result.putIfAbsent(partitionId, Roaring64NavigableMap.bitmapOf());
    }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the partition may not have a block, but we need to fill it to avoid NPE when used, See

for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
      result.putIfAbsent(partitionId, Roaring64NavigableMap.bitmapOf());
    }

A little weird.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a better name.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't resolved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind

public static Map<Integer, Roaring64NavigableMap> shuffleBitmapToPartitionBitmap(
Roaring64NavigableMap shuffleBitmap, int startPartition, int endPartition) {
Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need <= instead of <?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think we need <= here.

@roryqi roryqi Sep 8, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think we need <= here.

Sorry, my mistake. We need < here

@roryqi

roryqi commented Aug 30, 2022

Copy link
Copy Markdown
Contributor

Could you provide some data about benchmark or performance test?

@leixm

leixm commented Aug 30, 2022

Copy link
Copy Markdown
Contributor Author

Could you provide some data about benchmark or performance test?

Ok, I will do some performance tests to try to prove that the performance will improve in some extreme scenarios of AQE

@leixm

leixm commented Sep 7, 2022

Copy link
Copy Markdown
Contributor Author

Environment

Shuffle Server Num : 5
Shuffle Write: 48G
Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974

After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

@roryqi

roryqi commented Sep 7, 2022

Copy link
Copy Markdown
Contributor

Environment

Shuffle Server Num : 5 Shuffle Write: 48G Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974
After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

What's your test case? TPC-DS?

@leixm

leixm commented Sep 7, 2022

Copy link
Copy Markdown
Contributor Author

No, this is our production task.

@roryqi

roryqi commented Sep 7, 2022

Copy link
Copy Markdown
Contributor

Please resolve the comments left.

@leixm

leixm commented Sep 8, 2022

Copy link
Copy Markdown
Contributor Author

Can you help review please? @jerqi

return hostName.replaceAll("[\\.-]", "_");
}

public static Map<Integer, Roaring64NavigableMap> shuffleBitmapToPartitionBitmap(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment isn't resolved.

Comment thread common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java Outdated
roryqi
roryqi previously approved these changes Sep 8, 2022

@roryqi roryqi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some comments that is not resolved.

@roryqi roryqi dismissed their stale review September 8, 2022 09:29

sorry, i miss the unresolved comments.

@leixm

leixm commented Sep 8, 2022

Copy link
Copy Markdown
Contributor Author

Can you help review please? @jerqi

@roryqi roryqi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @leixm @zuston

@roryqi roryqi merged commit 6aa4379 into apache:master Sep 8, 2022
@leixm

leixm commented Sep 8, 2022

Copy link
Copy Markdown
Contributor Author

@jerqi @zuston Thank you very much, good brother.

@roryqi

roryqi commented Oct 26, 2022

Copy link
Copy Markdown
Contributor

Environment

Shuffle Server Num : 5 Shuffle Write: 48G Configuration: --conf spark.sql.shuffle.partitions=5000 --conf spark.sql.adaptive.enabled=true --conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=64MB

We measure the performance of get_shuffle_result by the following metrics:

  • get_shuffle_result_times: The number of calls of the get_shuffle_result interface
  • get_shuffle_result_cost: Time consumption of get_shuffle_result interface
  • get_shuffle_result_for_multi_part_times:The number of calls of the get_shuffle_result_for_multi_part interface
  • get_shuffle_result_for_multi_part_cost: Time consumption of get_shuffle_result_for_multi_part interface

Test Results

Before issue_136

serverId get_shuffle_result_times get_shuffle_result_cost(ms)
Server1 1000 157614
Server2 1000 426897
Server3 1000 269488
Server4 1000 906758
Server5 1001 123217
sum 5001 1883974
After issue_136

serverId get_shuffle_result_for_multi_part_times get_shuffle_result_for_multi_part_cost(ms)
Server1 833 870720
Server2 833 260865
Server3 834 333202
Server4 833 90277
Server5 835 94113
sum 4168 1649177

Summarize

The number of interface requests is reduced by 16%, and the total time is reduced by 12.5%. If we assign consecutive partitions to a server, the improvement will be more obvious.

Could you raise a new issue that Coordinator support to assign consecutive partitions to a server?

@leixm

leixm commented Oct 26, 2022

Copy link
Copy Markdown
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants