[#808] feat(spark): ensure thread safe and data consistency when spilling by zuston · Pull Request #848 · apache/uniffle · GitHub
Skip to content

[#808] feat(spark): ensure thread safe and data consistency when spilling#848

Merged
zuston merged 2 commits into
apache:masterfrom
zuston:issue-808-2
Jul 22, 2023
Merged

[#808] feat(spark): ensure thread safe and data consistency when spilling#848
zuston merged 2 commits into
apache:masterfrom
zuston:issue-808-2

Conversation

@zuston

@zuston zuston commented Apr 28, 2023

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

  1. Guarantees thread safe by only allowing spills to be triggered by the current thread
  2. Using the same logic of processing blocks in the RssShuffleWriter and WriteBufferManager to ensure the data consistency

Why are the changes needed?

Fix: #808

In this PR, we use the two ways to solve the concurrent problem for addRecord and spill function

  1. For the same thread, the spill will be invoked when adding records and unsuffcient memory. This case could ensure
    thread safe. So it will do the spill sync.
  2. When spill is invoked by other consumers, it will do nothing in this thread and just set a signal to let owner to release when adding record.

After this, we could avoid lock(may cause performance regression, like #811 did) to keep thread safe

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  1. UTs

@codecov-commenter

codecov-commenter commented Apr 28, 2023

Copy link
Copy Markdown

@zuston zuston marked this pull request as draft April 28, 2023 03:11
@zuston zuston marked this pull request as ready for review April 28, 2023 03:43
@zuston

zuston commented Apr 28, 2023

Copy link
Copy Markdown
Member Author

I propose a new way to keep thread-safe and data consistency. PTAL @jerqi

@roryqi

roryqi commented Apr 28, 2023

Copy link
Copy Markdown
Contributor

Have you verify the patch in your production environment?

@zuston

zuston commented Apr 28, 2023

Copy link
Copy Markdown
Member Author

Have you verify the patch in your production environment?

No. I hope community could review this. If OK, I will put into prod env.

private final String appId;
private final int shuffleId;
private final WriteBufferManager bufferManager;
private WriteBufferManager bufferManager;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove final?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the partial test cases use the original RssShuffleWriter constructor. To reduce the test changes, remove the final modifier.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can raise a refactor pr after this pr. I prefer using final here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is not a refactor.

private List<WrappedBuffer> buffers = Lists.newArrayList();
private int dataLength = 0;
private int memoryUsed = 0;
private boolean isAvailable = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIll it be accessed by multi threads?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Buffer will only be access in a single thread.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary?

Yes. This is a mark to indicate the buffer has been processed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When adding record, the memory is not sufficient. This will trigger the spill operation. Once buffer is flushed to remote server, the current buffer held by the writer should be dropped. So this is a mark for write to analyze whether it is available.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we change this value in the spill thread?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it won't

Any buffer flush operation in spill function only will do in the adding record thread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we change this value?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this part.

assertEquals(2, partitionToBlockIds.get(0).size());
assertEquals(2, partitionToBlockIds.get(2).size());
partitionToBlockIds.clear();
sc.stop();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this?

@zuston zuston Apr 28, 2023

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not. I could remove this change.

return sentBlocks;
}

/**

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any thought on this part? @jerqi This is the main logic

@roryqi

roryqi commented May 4, 2023

Copy link
Copy Markdown
Contributor

We would better have a config option to control whether to use this function. It seems a little complex.

@zuston

zuston commented May 4, 2023

Copy link
Copy Markdown
Member Author

We would better have a config option to control whether to use this function. It seems a little complex.

It's OK.

if (wb.isAvailable()) {
// The readonly of writer buffer means the data has been sent when spilling. To avoid data lost,
// we should re-insert the record.
requestMemoryAndInsert(partitionId, serializedData, serializedDataLength, sentBlocks);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems bad code. It may cause more concurrency problems.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will do in a thread. It's thread safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we simplify the logic here? It's weird that we use recursion here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. This logic should be simplified.

It seems obvious that it could be split into two functions:
request memory: wait until enough memory is spilled.
insert records: only insert records when memory is satisfied.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will check this PR in our internal env. Let's freeze this until all test finish.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

requestMemoryAndInsert(partitionId, serializedData, serializedDataLength, sentBlocks);
shuffleWriteMetrics.incRecordsWritten(1L);

if (usedBytes.get() - inSendListBytes.get() > spillSize || spillTriggered.compareAndSet(true, false)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird synchroinization.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, sorry I don't get your point.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if SpillTrigger is syncrhonization value, why do we put the variable after ||? It's strange.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. It's bug

@roryqi roryqi May 4, 2023

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You design too complex synchronization mechnism. It will be buggy usually.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bring too much cost. Every record with one lock is unreasonable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bring too much cost. Every record with one lock is unreasonable.

If we don't have lock conflicts, it won't bring too much cost. It is about several nano seconds.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimistic lock will. But I still think this is not complex. @advancedxy If you have time, could you help check this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if SpillTrigger is syncrhonization value, why do we put the variable after ||? It's strange.

Done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimistic lock will. But I still think this is not complex. @advancedxy If you have time, could you help check this?

In my opinion, there is a bug, it reflects that the mechinsm is too complex.

return 0L;
// If it is triggered by other consumers, it will spilled async by current consumer.
if (trigger != this) {
spillTriggered.set(true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the variable accessed by multi threads?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, It will

@zuston

zuston commented May 4, 2023

Copy link
Copy Markdown
Member Author

We would better have a config option to control whether to use this function. It seems a little complex.

Done

@zuston zuston requested a review from roryqi May 4, 2023 07:41

@roryqi roryqi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the pr don't propose a simple or clear method to solve the conflicts of multi threads. You can't use several atomic variables to guarantee the thread safe unless it's learned from other successful projects. It is verified by many production environments.

private List<WrappedBuffer> buffers = Lists.newArrayList();
private int dataLength = 0;
private int memoryUsed = 0;
private boolean isAvailable = false;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we change this value?

// A best effort strategy to wait.
// If timeout exception occurs, the underlying tasks won't be cancelled.
} finally {
long releasedSize = futures.stream().filter(x -> x.isDone()).mapToLong(x -> {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle the rpc failure?

return 0L;
}

List<CompletableFuture<Long>> futures = spillFunc.apply(clear());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will call the method clear in different threads. Is it thread safe?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm correct, I think you don't understand this design. Please see the below explaintion.

@zuston

zuston commented May 4, 2023

Copy link
Copy Markdown
Member Author

I want to re-explain this design to make more people understand. @jerqi

As we know, in the master codebase of WriteBufferManager , concurrency doesn't exist. Buffer flushing operation will only be acted in the processing of adding records, which is owned by one thread.

But when we want to flush buffer to release memory in spill method, it's thread unsafe. Because spill function will be invoked by TaskMemoryManager . So does it means the spill method and addRecord are acted in the different threads?
I think no. Reasons are as follows.

  1. When adding records, if memory is tight, it will request memory from TaskMemoryManager . And then the free memory is empty, manager will spill the underlying MemoryConsumer . So the current WriteBufferManager 's spill method will be invoked in this process. From this example, the two methods will be invoked in the same thread, so this is thread safe. Right?
  2. If the current WriteBufferManager 's spill method is invoked by TaskMemoryManager , due to others' memory consumers' unsufficient memory. This is thread unsafe.

Based on above examples, for one WriteBufferManager , if one consumer is not itself, it should avoid doing flushing buffer due to the concurrency problem of adding records. If the consumer is itself, it could doing buffer flushing, it's thread safe.

@roryqi roryqi self-requested a review May 4, 2023 11:02
requestMemory(Math.max(bufferSegmentSize, serializedDataLength));
requestMemory(required);
if (wb.isAvailable()) {
// The readonly of writer buffer means the data has been sent when spilling. To avoid data lost,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment isn't clear enough. You should explain more information.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

@advancedxy

advancedxy commented May 8, 2023

Copy link
Copy Markdown
Contributor

The idea seems simple enough and should work.
I'm a bit of concerning that whether would it work well for production cases, since it doesn't respond to other tasks' spill request.

But it's better than current impl.

@roryqi roryqi changed the title [#808] fix(spark): ensure thread safe and data consistency when spilling [#808] feat(spark): ensure thread safe and data consistency when spilling May 10, 2023
@zuston zuston requested review from advancedxy and roryqi July 20, 2023 07:45
@zuston

zuston commented Jul 20, 2023

Copy link
Copy Markdown
Member Author

Sorry for the late reply. I have updated this code! PTAL @jerqi @advancedxy Thanks!

@zuston

zuston commented Jul 21, 2023

Copy link
Copy Markdown
Member Author

Although this has a long history, do you mind take a look? @leixm @smallzhongfeng @xianjingfeng

@roryqi roryqi left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Now it's ok for me.

@zuston

zuston commented Jul 22, 2023

Copy link
Copy Markdown
Member Author

@zuston zuston merged commit 03b04b9 into apache:master Jul 22, 2023
@zuston zuston deleted the issue-808-2 branch July 22, 2023 13:10
zuston pushed a commit that referenced this pull request Mar 7, 2024
…sure data correctness (#1558)

### What changes were proposed in this pull request?

Verify the number of written records to enhance data accuracy.
Make sure all data records are sent by clients.
Make sure bugs like #714 will never be introduced into the code.

### Why are the changes needed?

A follow-up PR for #848.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] NullPointerException of WriterBuffer.getData due to race condition

4 participants