Fix a race condition in OperatorMerge.InnerSubscriber#onError#5851
Conversation
Inner subscriber must queue the error first before setting done, so that after emitLoop() removes the subscriber, emitLoop is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, and at the same time the error queue was empty.
There was a problem hiding this comment.
Please add an unit test that checks this race condition.
There was a problem hiding this comment.
How do I do this? This looks like a very rare / hard to trigger one. A thread would have to stop for a moment after setting done and before queuing the error at the time when emitLoop was running on another thread.
There was a problem hiding this comment.
That's what TestUtil.race is for. Have one subject onCompleted() and another onError(), mergeDelayError them. Loop this at least 1000 times, check a TestSubscriber for error termination. Remember to create the subjects and subscriber inside the loop due to the terminal event nature.
Codecov Report
@@ Coverage Diff @@
## 1.x #5851 +/- ##
============================================
+ Coverage 84.18% 84.25% +0.07%
- Complexity 2889 2891 +2
============================================
Files 290 290
Lines 18264 18264
Branches 2495 2495
============================================
+ Hits 15375 15389 +14
+ Misses 2006 1992 -14
Partials 883 883
Continue to review full report at Codecov.
|
|
Wow! I wrote a test and I'm surprised it reproduced the issue so easily. When I reverted the patch, the test failed with: |
|
@akarnokd Thanks, can you shed some light when it will be released to maven? |
|
Your other PR still incomplete without verifying the fix via a similar unit test. I usually announce the next version 3-5 days before release, however, since this affects a critical operator |
|
I know, working on it. Sorry, different time zone maybe. I've just started the day. ;) |
|
Thanks for the fixes. These were long-standing bugs you stumbled upon very close to the end-of-life of RxJava 1.x (March 31, 2018), after which no further development and bugfixes will be accepted. Are you in the process of moving to RxJava 2.x by any chance? |
|
We'd like to, however the limiting factor at the moment is lack of RxScala for 2.x. A large part of our project is already on RxJava 2.x, so using just one version everywhere would be much better for everyone. The options are either going to Scala 2.12 and using RxJava 2.x with no wrapper at all, thanks to much better Scala-Java lambda/SAM interop in Scala 2.12, or switching to something totally else. Monix maybe. |

Inner subscriber must queue the error first before setting done, so that after
emitLoopremoves the subscriber,emitLoopis guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, and at the same time the error queue was empty.We observed that sometimes a result of flattening two observables doesn't properly mark the resulting observable as failed, despite the inner observable properly reporting onError. Instead, the error condition was ignored and the result observable was completing normally.