[ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed by zuston · Pull Request #293 · apache/uniffle · GitHub
Skip to content

[ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed#293

Merged
roryqi merged 16 commits into
apache:masterfrom
zuston:aqe
Nov 5, 2022
Merged

[ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed#293
roryqi merged 16 commits into
apache:masterfrom
zuston:aqe

Conversation

@zuston

@zuston zuston commented Nov 1, 2022

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Introduce a new mechanism to determine how to write data to file, including directly append or append after sort.

Why are the changes needed?

In our internal uniffle deployment, 200+ shuffle-servers are in service. A single shuffle-server uses 4 SATA SSDs to be used as the localfile storage, the max network bandwidth is limited to 5G/s. The storageType of the shuffle-server is MEMORY_LOCALFILE.

After monitoring the read_data_rate metric, I found it always will reach the max network bandwidth. However, at that time, the number of apps running was low. And only single disk usage is 100%.

After digging into the shuffle-server’s log, I found almost all requests with the same AppId and the same Partition to get the shuffle data from the same partition data file. This indicates the reason for high disk utilization due to the hotspots of reading.

It was found that this App’s shuffle-read was optimized by AQE skew data split, which causes the Uniffle shuffle-server high-pressure of network and diskIO.

After catching this point, I analyzed the performance of historical tasks using different shuffle-services briefly.

And in current implementation, one partition’s buffer will be flushed to disk once the size reaches the threshold of 64M. And the spark/mr uniffle client will fetch one batch data of 14M size(default value). That means for one buffer of one partition, the client needs to have 5 network interactions with the shuffle-server if the data with MapId is relatively discrete.

To solve this problem, we could make the 64M buffer’s data sorted by MapId. That means for the uniffle client, ideally it will read one time in a single buffer.

Does this PR introduce any user-facing change?

  1. Introduce the shuffle-data distribution type, NORMAL or LOCAL_ORDER, which can have other implementations, like GLOBAL_ORDER.
  2. Make the segment split strategy as a general interface for above different data distribution type.

How was this patch tested?

  1. UTs
  2. Spark Tests on offline hadoop cluster

Benchmark

Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). And all columns of k1 have the same value (value = 10)

Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). And it has the only one record of k2=10

Environment: 100 executors(1core2g)
SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")

  • Uniffle without patch: cost 12min
  • Uniffle with patch: cost 4min

Reference

  1. Design doc: https://docs.google.com/document/d/1G0cOFVJbYLf2oX1fiadh7zi2M6DlEcjTQTh4kSkb0LA/edit?usp=sharing

@codecov-commenter

codecov-commenter commented Nov 2, 2022

Copy link
Copy Markdown

@zuston zuston changed the title [WIP][Improvement][AQE] Sort MapId before the data are flushed [Improvement][AQE] Sort MapId before the data are flushed Nov 3, 2022
@roryqi roryqi linked an issue Nov 3, 2022 that may be closed by this pull request
@roryqi roryqi changed the title [Improvement][AQE] Sort MapId before the data are flushed [ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed Nov 3, 2022
@roryqi

roryqi commented Nov 4, 2022

Copy link
Copy Markdown
Contributor

cc @leixm . Maybe you have interest about this feature.

import org.apache.uniffle.common.ShuffleIndexResult;
import org.apache.uniffle.common.exception.RssException;

public class LocalOrderSegmentSplitter implements SegmentSplitter {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think twice.

@roryqi roryqi Nov 4, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between LocalOrderSegmentSplitter and FixedSizeSegementSplitter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the startMapId and endMapId.

What's the difference between LocalOrderSegmentSplitter and FixedSizeSegementSplitter?

LocalOrderSegmentSplitter is to get the segments according to the index file local order.

FixedSizeSegementSplitter is to keep the consistent with the original segment split logic

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, maybe we could introduce the global order strategy which will be implemented in current abstraction easier.

Comment thread server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java Outdated
fileOffset = -1;
}

if (expectTaskIds.contains(taskAttemptId)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data of ShuffleDataSegment seems to be continuous. It will be wrong when you filter one record.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point need to be ensured.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe more in detail ?

@roryqi roryqi Nov 4, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have 4 blocks, block 1, block 2, block 3, block4, block 5. Block 3 is filtered. Block 1, Block 2, Block 4, Block 5 can't be ShuffleDataSegment. The blocks in one ShuffleDataSegment must be continous. You can see https://github.com/apache/incubator-uniffle/blob/c69f1735e5bf93852a82de6202e1e5b7c1bd7092/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java#L182 for details.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds we still need to use startMapId to get the data.

startMapId will also filter the data. It's the same.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that my worried case won't happen because our data is sorted. But we also need to check this point again and add some comment.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This splitter is bound to the LOCAL_ORDER. I don't quite understand what you mean.

@roryqi roryqi Nov 4, 2022

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectTaskId should be a range. The range is between startMapId and endMapId. If a blockId A isn't in this range, there won't be any other blockId after blockId A in this ShuffleDataSegement. Because data is local order in the ShuffleDataSegment. So there won't discontinuous blockIds in a ShuffleDataSegement. The discontinuous blockIds in a ShuffleDataSegment isn't permitted in our system. You can see related logic for details in https://github.com/apache/incubator-uniffle/blob/c69f1735e5bf93852a82de6202e1e5b7c1bd7092/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java#L182

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. This won’t happen, the index data will be split to multi ordered parts. And then to get the segments in the range by expectTaskIds.

Maybe I need to add more comment on this class header.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rethink the discontinuous problem you mentioned will only occur if the block's taskId is not in this reader partition and this taskId is in the expectTaskIds range in this index file. like this index file, its blocks are as follow:

Block-A (partition: 1, taskId: 1)
Block-B (partition: 2, taskId: 1)
Block-C (partition: 1, taskId: 2)

If the reader want to get the partition-1 data and the taskId range is in [1, 3). And so the Block-B will be filtered, which will cause discontinuous problem.

I think we need to sort the index file by two factors, first one is partitionId, second one is the taskId.

@roryqi roryqi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minors.

@roryqi roryqi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, wait for CI. Great work! thanks @zuston

@roryqi roryqi merged commit 74949f5 into apache:master Nov 5, 2022
@leixm

leixm commented Nov 7, 2022

Copy link
Copy Markdown
Contributor

cc @leixm . Maybe you have interest about this feature.

It's a pity that I was busy and didn't participate in the review.

@zuston

zuston commented Nov 8, 2022

Copy link
Copy Markdown
Member Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement][AQE] Sort MapId before the data are flushed

4 participants