[ISSUE-137][Improvement][AQE] Sort MapId before the data are flushed#293
Conversation
|
cc @leixm . Maybe you have interest about this feature. |
| import org.apache.uniffle.common.ShuffleIndexResult; | ||
| import org.apache.uniffle.common.exception.RssException; | ||
|
|
||
| public class LocalOrderSegmentSplitter implements SegmentSplitter { |
There was a problem hiding this comment.
Our blockId have the taskAttemptId information. We usually use the blockId to filter the data. You can see
https://github.com/apache/incubator-uniffle/blob/47effb25044780013ff51cc516261464435b5829/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java#L101
There was a problem hiding this comment.
What's the difference between LocalOrderSegmentSplitter and FixedSizeSegementSplitter?
There was a problem hiding this comment.
I have removed the startMapId and endMapId.
What's the difference between LocalOrderSegmentSplitter and FixedSizeSegementSplitter?
LocalOrderSegmentSplitter is to get the segments according to the index file local order.
FixedSizeSegementSplitter is to keep the consistent with the original segment split logic
There was a problem hiding this comment.
In the future, maybe we could introduce the global order strategy which will be implemented in current abstraction easier.
| fileOffset = -1; | ||
| } | ||
|
|
||
| if (expectTaskIds.contains(taskAttemptId)) { |
There was a problem hiding this comment.
The data of ShuffleDataSegment seems to be continuous. It will be wrong when you filter one record.
There was a problem hiding this comment.
This point need to be ensured.
There was a problem hiding this comment.
Could you describe more in detail ?
There was a problem hiding this comment.
We have 4 blocks, block 1, block 2, block 3, block4, block 5. Block 3 is filtered. Block 1, Block 2, Block 4, Block 5 can't be ShuffleDataSegment. The blocks in one ShuffleDataSegment must be continous. You can see https://github.com/apache/incubator-uniffle/blob/c69f1735e5bf93852a82de6202e1e5b7c1bd7092/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java#L182 for details.
There was a problem hiding this comment.
It sounds we still need to use startMapId to get the data.
startMapId will also filter the data. It's the same.
There was a problem hiding this comment.
It seems that my worried case won't happen because our data is sorted. But we also need to check this point again and add some comment.
There was a problem hiding this comment.
This splitter is bound to the LOCAL_ORDER. I don't quite understand what you mean.
There was a problem hiding this comment.
ExpectTaskId should be a range. The range is between startMapId and endMapId. If a blockId A isn't in this range, there won't be any other blockId after blockId A in this ShuffleDataSegement. Because data is local order in the ShuffleDataSegment. So there won't discontinuous blockIds in a ShuffleDataSegement. The discontinuous blockIds in a ShuffleDataSegment isn't permitted in our system. You can see related logic for details in https://github.com/apache/incubator-uniffle/blob/c69f1735e5bf93852a82de6202e1e5b7c1bd7092/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java#L182
There was a problem hiding this comment.
Got it. This won’t happen, the index data will be split to multi ordered parts. And then to get the segments in the range by expectTaskIds.
Maybe I need to add more comment on this class header.
There was a problem hiding this comment.
I rethink the discontinuous problem you mentioned will only occur if the block's taskId is not in this reader partition and this taskId is in the expectTaskIds range in this index file. like this index file, its blocks are as follow:
Block-A (partition: 1, taskId: 1)
Block-B (partition: 2, taskId: 1)
Block-C (partition: 1, taskId: 2)
If the reader want to get the partition-1 data and the taskId range is in [1, 3). And so the Block-B will be filtered, which will cause discontinuous problem.
I think we need to sort the index file by two factors, first one is partitionId, second one is the taskId.
It's a pity that I was busy and didn't participate in the review. |

What changes were proposed in this pull request?
Introduce a new mechanism to determine how to write data to file, including directly append or append after sort.
Why are the changes needed?
In our internal uniffle deployment, 200+ shuffle-servers are in service. A single shuffle-server uses 4 SATA SSDs to be used as the localfile storage, the max network bandwidth is limited to 5G/s. The storageType of the shuffle-server is MEMORY_LOCALFILE.
After monitoring the read_data_rate metric, I found it always will reach the max network bandwidth. However, at that time, the number of apps running was low. And only single disk usage is 100%.
After digging into the shuffle-server’s log, I found almost all requests with the same AppId and the same Partition to get the shuffle data from the same partition data file. This indicates the reason for high disk utilization due to the hotspots of reading.
It was found that this App’s shuffle-read was optimized by AQE skew data split, which causes the Uniffle shuffle-server high-pressure of network and diskIO.
After catching this point, I analyzed the performance of historical tasks using different shuffle-services briefly.
And in current implementation, one partition’s buffer will be flushed to disk once the size reaches the threshold of 64M. And the spark/mr uniffle client will fetch one batch data of 14M size(default value). That means for one buffer of one partition, the client needs to have 5 network interactions with the shuffle-server if the data with MapId is relatively discrete.
To solve this problem, we could make the 64M buffer’s data sorted by MapId. That means for the uniffle client, ideally it will read one time in a single buffer.
Does this PR introduce any user-facing change?
How was this patch tested?
Benchmark
Table1: 100g, dtypes: Array[(String, String)] = Array((v1,StringType), (k1,IntegerType)). And all columns of k1 have the same value (value = 10)
Table2: 10 records, dtypes: Array[(String, String)] = Array((k2,IntegerType), (v2,StringType)). And it has the only one record of k2=10
Environment: 100 executors(1core2g)
SQL: spark.sql("select * from Table1,Table2 where k1 = k2").write.mode("overwrite").parquet("xxxxxx")
Reference