[#808] feat(spark): ensure thread safe and data consistency when spilling#848
Conversation
|
I propose a new way to keep thread-safe and data consistency. PTAL @jerqi |
|
Have you verify the patch in your production environment? |
No. I hope community could review this. If OK, I will put into prod env. |
| private final String appId; | ||
| private final int shuffleId; | ||
| private final WriteBufferManager bufferManager; | ||
| private WriteBufferManager bufferManager; |
There was a problem hiding this comment.
Because the partial test cases use the original RssShuffleWriter constructor. To reduce the test changes, remove the final modifier.
There was a problem hiding this comment.
You can raise a refactor pr after this pr. I prefer using final here.
There was a problem hiding this comment.
No. This is not a refactor.
| private List<WrappedBuffer> buffers = Lists.newArrayList(); | ||
| private int dataLength = 0; | ||
| private int memoryUsed = 0; | ||
| private boolean isAvailable = false; |
There was a problem hiding this comment.
WIll it be accessed by multi threads?
There was a problem hiding this comment.
No. Buffer will only be access in a single thread.
There was a problem hiding this comment.
Is it necessary?
Yes. This is a mark to indicate the buffer has been processed.
There was a problem hiding this comment.
When adding record, the memory is not sufficient. This will trigger the spill operation. Once buffer is flushed to remote server, the current buffer held by the writer should be dropped. So this is a mark for write to analyze whether it is available.
There was a problem hiding this comment.
Do we change this value in the spill thread?
There was a problem hiding this comment.
No, it won't
Any buffer flush operation in spill function only will do in the adding record thread.
There was a problem hiding this comment.
When do we change this value?
| assertEquals(2, partitionToBlockIds.get(0).size()); | ||
| assertEquals(2, partitionToBlockIds.get(2).size()); | ||
| partitionToBlockIds.clear(); | ||
| sc.stop(); |
There was a problem hiding this comment.
Not. I could remove this change.
| return sentBlocks; | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Do you have any thought on this part? @jerqi This is the main logic
|
We would better have a config option to control whether to use this function. It seems a little complex. |
It's OK. |
| if (wb.isAvailable()) { | ||
| // The readonly of writer buffer means the data has been sent when spilling. To avoid data lost, | ||
| // we should re-insert the record. | ||
| requestMemoryAndInsert(partitionId, serializedData, serializedDataLength, sentBlocks); |
There was a problem hiding this comment.
This seems bad code. It may cause more concurrency problems.
There was a problem hiding this comment.
This will do in a thread. It's thread safe
There was a problem hiding this comment.
Could we simplify the logic here? It's weird that we use recursion here.
There was a problem hiding this comment.
+1. This logic should be simplified.
It seems obvious that it could be split into two functions:
request memory: wait until enough memory is spilled.
insert records: only insert records when memory is satisfied.
There was a problem hiding this comment.
Got it. I will check this PR in our internal env. Let's freeze this until all test finish.
| requestMemoryAndInsert(partitionId, serializedData, serializedDataLength, sentBlocks); | ||
| shuffleWriteMetrics.incRecordsWritten(1L); | ||
|
|
||
| if (usedBytes.get() - inSendListBytes.get() > spillSize || spillTriggered.compareAndSet(true, false)) { |
There was a problem hiding this comment.
It's weird synchroinization.
There was a problem hiding this comment.
emm, sorry I don't get your point.
There was a problem hiding this comment.
if SpillTrigger is syncrhonization value, why do we put the variable after ||? It's strange.
There was a problem hiding this comment.
You design too complex synchronization mechnism. It will be buggy usually.
There was a problem hiding this comment.
Bring too much cost. Every record with one lock is unreasonable.
There was a problem hiding this comment.
Bring too much cost. Every record with one lock is unreasonable.
If we don't have lock conflicts, it won't bring too much cost. It is about several nano seconds.
There was a problem hiding this comment.
Optimistic lock will. But I still think this is not complex. @advancedxy If you have time, could you help check this?
There was a problem hiding this comment.
if
SpillTriggeris syncrhonization value, why do we put the variable after||? It's strange.
Done.
There was a problem hiding this comment.
Optimistic lock will. But I still think this is not complex. @advancedxy If you have time, could you help check this?
In my opinion, there is a bug, it reflects that the mechinsm is too complex.
| return 0L; | ||
| // If it is triggered by other consumers, it will spilled async by current consumer. | ||
| if (trigger != this) { | ||
| spillTriggered.set(true); |
There was a problem hiding this comment.
Will the variable accessed by multi threads?
Done |
roryqi
left a comment
There was a problem hiding this comment.
In general, the pr don't propose a simple or clear method to solve the conflicts of multi threads. You can't use several atomic variables to guarantee the thread safe unless it's learned from other successful projects. It is verified by many production environments.
| private List<WrappedBuffer> buffers = Lists.newArrayList(); | ||
| private int dataLength = 0; | ||
| private int memoryUsed = 0; | ||
| private boolean isAvailable = false; |
There was a problem hiding this comment.
When do we change this value?
| // A best effort strategy to wait. | ||
| // If timeout exception occurs, the underlying tasks won't be cancelled. | ||
| } finally { | ||
| long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x -> { |
There was a problem hiding this comment.
How to handle the rpc failure?
| return 0L; | ||
| } | ||
|
|
||
| List<CompletableFuture<Long>> futures = spillFunc.apply(clear()); |
There was a problem hiding this comment.
You will call the method clear in different threads. Is it thread safe?
There was a problem hiding this comment.
If I'm correct, I think you don't understand this design. Please see the below explaintion.
|
I want to re-explain this design to make more people understand. @jerqi As we know, in the master codebase of But when we want to flush buffer to release memory in spill method, it's thread unsafe. Because spill function will be invoked by
Based on above examples, for one |
| requestMemory(Math.max(bufferSegmentSize, serializedDataLength)); | ||
| requestMemory(required); | ||
| if (wb.isAvailable()) { | ||
| // The readonly of writer buffer means the data has been sent when spilling. To avoid data lost, |
There was a problem hiding this comment.
The comment isn't clear enough. You should explain more information.
|
The idea seems simple enough and should work. But it's better than current impl. |
|
Sorry for the late reply. I have updated this code! PTAL @jerqi @advancedxy Thanks! |
|
Although this has a long history, do you mind take a look? @leixm @smallzhongfeng @xianjingfeng |
roryqi
left a comment
There was a problem hiding this comment.
LGTM. Now it's ok for me.
…sure data correctness (#1558) ### What changes were proposed in this pull request? Verify the number of written records to enhance data accuracy. Make sure all data records are sent by clients. Make sure bugs like #714 will never be introduced into the code. ### Why are the changes needed? A follow-up PR for #848. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.

What changes were proposed in this pull request?
RssShuffleWriterandWriteBufferManagerto ensure the data consistencyWhy are the changes needed?
Fix: #808
In this PR, we use the two ways to solve the concurrent problem for
addRecordandspillfunctionthread safe. So it will do the spill sync.
After this, we could avoid lock(may cause performance regression, like #811 did) to keep thread safe
Does this PR introduce any user-facing change?
No.
How was this patch tested?