2.x: Add MulticastProcessor by akarnokd · Pull Request #6002 · ReactiveX/RxJava · GitHub
Skip to content

2.x: Add MulticastProcessor#6002

Merged
akarnokd merged 3 commits into
ReactiveX:2.xfrom
akarnokd:MulticastProcessor
May 17, 2018
Merged

2.x: Add MulticastProcessor#6002
akarnokd merged 3 commits into
ReactiveX:2.xfrom
akarnokd:MulticastProcessor

Conversation

@akarnokd

@akarnokd akarnokd commented May 9, 2018

Copy link
Copy Markdown
Member

This PR adds the MulticastProcessor from the extensions project to be a standard processor option.

This type of processor fills the gap of having a backpressure-coordinating processor type as PublishProcessor doesn't coordinate backpressure on its own and Flowable.publish() often can't be used because the upstream may not yet exist when the dowstream consumers are setup.

MulticastProcessor

Example:

MulticastProcessor<Integer> mp = Flowable.range(1, 10)
    .subscribeWith(MulticastProcessor.create());

mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

// --------------------

MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();

assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));

assertFalse(mp2.offer(5));

mp2.onComplete();

mp2.test().assertResult(1, 2, 3, 4);

Resolves: #5999

@akarnokd akarnokd added this to the 2.2 milestone May 9, 2018
@codecov

codecov Bot commented May 9, 2018

Copy link
Copy Markdown

@vanniktech vanniktech left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow I missed this PR

* upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be
* immediately completed.
* <p>
* Because of {@code MulticastProcessor} implements the {@link Subscriber} interface, calling

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd nuke the of here

* {@code onSubscribe} is mandatory (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>).
* If {@code MulticastProcessor} shoud run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher},
* use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer.
* Failing to do so will lead to {@link NullPointerException} at runtime.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lead to a

public static <T> MulticastProcessor<T> create() {
return new MulticastProcessor<T>(bufferSize(), false);
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double new line

* @param <T> the input and output value type
* @return the new MulticastProcessor instance
*/
public static <T> MulticastProcessor<T> create(int bufferSize) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the @CheckReturnValue annotation here

* is cancelled
* @return the new MulticastProcessor instance
*/
public static <T> MulticastProcessor<T> create(boolean refCount) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the @CheckReturnValue annotation here

* @param <T> the input and output value type
* @return the new MulticastProcessor instance
*/
public static <T> MulticastProcessor<T> create(int bufferSize, boolean refCount) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add the @CheckReturnValue annotation here

}

/**
* Constructs a fresh instance with the given prefetch amount and the optional

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real need of having the documentation here, is there? It's not a public method and the create method already cover everything

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may help those who dig into the source code for some reason (learning, debugging, etc.). I'll leave it there.

@Override
public void onError(Throwable t) {
if (t == null) {
throw new NullPointerException("t is null");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we also have another second sentence that was stating that nulls are not allowed per definition?


mp.test().assertResult();
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double new line

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, removed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just use assertValuesOnly and then save the assertNotComplete call?

@akarnokd akarnokd merged commit f87879d into ReactiveX:2.x May 17, 2018
@akarnokd akarnokd deleted the MulticastProcessor branch May 17, 2018 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants