[#1472][part-2] fix(server): Reuse ByteBuf when decoding shuffle blocks instead of reallocating it#1521
Conversation
…e blocks instead of reallocating it
45ace37 to
f85291a
Compare
| int dataLength = byteBuf.readInt(); | ||
| ByteBuf data = NettyUtils.getNettyBufferAllocator().directBuffer(dataLength); | ||
| data.writeBytes(byteBuf, dataLength); | ||
| ByteBuf data = byteBuf.retain().readSlice(dataLength); |
There was a problem hiding this comment.
Will byteBuf be spitted into muliple parts? Every part will released multiple times? Will it bring errors?
There was a problem hiding this comment.
ByteBuf will not be splitted into multiple parts. It will be used by a SendShuffleDataRequest as a whole.
It will not bring errors. Because we retain the ByteBuf(refCnf++) everytime when we do a readSlice.
public static SendShuffleDataRequest decode(ByteBuf byteBuf) {
long requestId = byteBuf.readLong();
String appId = ByteBufUtils.readLengthAndString(byteBuf);
int shuffleId = byteBuf.readInt();
long requireId = byteBuf.readLong();
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = decodePartitionData(byteBuf);
long timestamp = byteBuf.readLong();
return new SendShuffleDataRequest(
requestId, appId, shuffleId, requireId, partitionToBlocks, timestamp);
}
But it might slow down the flushing process.
Because it will not trigger the actual flushing process util all the ShufflePartitionedData is flushed(refCnt decreased to 0):
List<ShufflePartitionedData> shufflePartitionedData = toPartitionedData(sendShuffleDataRequest);
...
for (ShufflePartitionedData spd : shufflePartitionedData) {
...
ret = manager.cacheShuffleData(appId, shuffleId, isPreAllocated, spd);
...
}
ByteBuf cannot be splitted, once splitted we have to allocate new ByteBufs.
So maybe we can hold this PR and find a better way to do this.
But it will speed up the decoding process on the other hand.
|
I reopened this PR. After stress testing the shuffle server without this PR, we will easily encounter [epollEventLoopGroup-3-45] [WARN] TransportChannelHandler.exceptionCaught - Exception in connection from /127.0.0.1:58767 We can see that each time an out-of-direct-memory error occurs, it is caused by the code Because when a large number of requests arrive simultaneously, there might be a brief period (before the So, we need this PR anyway. It might slow down the flushing process a little bit, but the shuffle server will at least remain available during the whole stress test. From the results of my stress tests, there doesn't seem to be any impact on performance. In fact, it may even be faster, as it can speed up the decoding process by not reallocating new PTAL @jerqi |
Maybe we should modify our flush strategy, too. Now we will flush a larger reduce partition. But if the map partition contains a smaller reduce partition. The memory won't be released, too. |
|
I prefer adding a config option for this improvement. |
Flushing strategy will be changed in the final PR. |
…mory issue causing OOM (#1534) ### What changes were proposed in this pull request? When we use `UnpooledByteBufAllocator` to allocate off-heap `ByteBuf`, Netty directly requests off-heap memory from the operating system instead of allocating it according to `pageSize` and `chunkSize`. This way, we can obtain the exact `ByteBuf` size during the pre-allocation of memory, avoiding distortion of metrics such as `usedMemory`. Moreover, we have restored the code submission of the PR [#1521](#1521). We ensure that there is sufficient direct memory for the Netty server during decoding `sendShuffleDataRequest` by taking into account the `encodedLength` of `ByteBuf` in advance during the pre-allocation of memory, thus avoiding OOM during decoding `sendShuffleDataRequest`. Since we are not using `PooledByteBufAllocator`, the PR [#1524](#1524) is no longer needed. ### Why are the changes needed? A sub PR for: #1519 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.


What changes were proposed in this pull request?
Reuse ByteBuf when decoding shuffle blocks instead of reallocating it
Why are the changes needed?
A sub PR for: #1519
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.