2.x: Add MulticastProcessor#6002
Conversation
vanniktech
left a comment
There was a problem hiding this comment.
Somehow I missed this PR
| * upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be | ||
| * immediately completed. | ||
| * <p> | ||
| * Because of {@code MulticastProcessor} implements the {@link Subscriber} interface, calling |
| * {@code onSubscribe} is mandatory (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>). | ||
| * If {@code MulticastProcessor} shoud run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, | ||
| * use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer. | ||
| * Failing to do so will lead to {@link NullPointerException} at runtime. |
| public static <T> MulticastProcessor<T> create() { | ||
| return new MulticastProcessor<T>(bufferSize(), false); | ||
| } | ||
|
|
| * @param <T> the input and output value type | ||
| * @return the new MulticastProcessor instance | ||
| */ | ||
| public static <T> MulticastProcessor<T> create(int bufferSize) { |
There was a problem hiding this comment.
let's add the @CheckReturnValue annotation here
| * is cancelled | ||
| * @return the new MulticastProcessor instance | ||
| */ | ||
| public static <T> MulticastProcessor<T> create(boolean refCount) { |
There was a problem hiding this comment.
let's add the @CheckReturnValue annotation here
| * @param <T> the input and output value type | ||
| * @return the new MulticastProcessor instance | ||
| */ | ||
| public static <T> MulticastProcessor<T> create(int bufferSize, boolean refCount) { |
There was a problem hiding this comment.
let's add the @CheckReturnValue annotation here
| } | ||
|
|
||
| /** | ||
| * Constructs a fresh instance with the given prefetch amount and the optional |
There was a problem hiding this comment.
There's no real need of having the documentation here, is there? It's not a public method and the create method already cover everything
There was a problem hiding this comment.
It may help those who dig into the source code for some reason (learning, debugging, etc.). I'll leave it there.
| @Override | ||
| public void onError(Throwable t) { | ||
| if (t == null) { | ||
| throw new NullPointerException("t is null"); |
There was a problem hiding this comment.
Didn't we also have another second sentence that was stating that nulls are not allowed per definition?
|
|
||
| mp.test().assertResult(); | ||
| } | ||
|
|
There was a problem hiding this comment.
Could you just use assertValuesOnly and then save the assertNotComplete call?

This PR adds the
MulticastProcessorfrom the extensions project to be a standard processor option.This type of processor fills the gap of having a backpressure-coordinating processor type as
PublishProcessordoesn't coordinate backpressure on its own andFlowable.publish()often can't be used because the upstream may not yet exist when the dowstream consumers are setup.Example:
Resolves: #5999