In our production systems we observe sporadic IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber when an EssentialFilter uses Accumulator.flatten and the resulting accumulator is composed with other filters.
Accumulator.flatten uses Sink.asPublisher(fanout = false) internally, which creates a VirtualPublisher that allows only a single Reactive Streams subscriber. When materialized, Pekko schedules a StreamSubscriptionTimeout (default 5s). The actual subscriber only connects when the inner Future[Accumulator] resolves and triggers a nested Source.fromPublisher(publisher).run(). If that future resolves after the timeout—due to GC pauses, thread pool saturation, or slow filter composition—the timeout injects a CancellingSubscriber, and the real subscription is rejected as a duplicate with the IllegalStateException
The following code reproduces the issue consistently:
// Shorten the subscription timeout so we can trigger it without waiting 5s
val config = ConfigFactory.parseString(
"pekko.stream.materializer.subscription-timeout.timeout = 300ms"
).withFallback(ConfigFactory.load())
val system = ActorSystem("demo", config)
val mat = SystemMaterializer(system).materializer
val promise = Promise[Accumulator[ByteString, ByteString]]()
val result = Source.single(ByteString("hello"))
.runWith(Accumulator.flatten(promise.future)(mat).toSink)(mat)
// Wait past the timeout, so a CancellingSubscriber now occupies the VirtualPublisher
Thread.sleep(500)
// Resolving the future triggers a second subscribe(), which is rejected
promise.success(Accumulator(Sink.fold(ByteString.empty)(_ ++ _)))
println(Await.result(result, 5.seconds))
This will throw:
Exception in thread "main" java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.11)
at org.apache.pekko.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:72)
at org.apache.pekko.stream.impl.VirtualPublisher.rec$6(StreamLayout.scala:491)
at org.apache.pekko.stream.impl.VirtualPublisher.subscribe(StreamLayout.scala:496)
at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:177)
at org.apache.pekko.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:316)
at org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:631)
at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:740)
at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:789)
at org.apache.pekko.actor.Actor.aroundPreStart(Actor.scala:558)
at org.apache.pekko.actor.Actor.aroundPreStart$(Actor.scala:481)
at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:729)
at org.apache.pekko.actor.ActorCell.create(ActorCell.scala:654)
at org.apache.pekko.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
at org.apache.pekko.actor.ActorCell.systemInvoke(ActorCell.scala:545)
at org.apache.pekko.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:305)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:240)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
We can eliminate Sink.asPublisher(fanout = false) by using Pekko's Sink.futureSink in our futureToSink helper:
private[streams] def futureToSink[E, A](
future: Future[Accumulator[E, A]]
)(implicit ec: ExecutionContext): Sink[E, Future[A]] = {
Sink.futureSink(future.map(_.toSink)).mapMaterializedValue(_.flatten)
}
This has a problem though: Sink.futureSink is backed by LazySink, which throws NeverMaterializedException if the upstream completes before any element arrives (e.g., a GET request with no body). This is because LazySink only materializes the inner sink on the first element push, and if no elements arrive, there's no materialized value to return. Unlike Sink.lazySink (where the creation function takes the first element as input), Sink.futureSink doesn't actually need an element to resolve the inner sink, which is arguably a design gap in Pekko's Sink.futureSink implementation.
To work around this, we can check whether the future is already resolved at materialization time (the common case), and only fall back to Sink.futureSink when the future is still pending:
private[streams] def futureToSink[E, A](
future: Future[Accumulator[E, A]]
)(implicit ec: ExecutionContext): Sink[E, Future[A]] = {
Sink
.fromMaterializer { (mat, _) =>
future.value match {
case Some(result) =>
result.fold(
e => Sink.cancelled[E].mapMaterializedValue(_ => Future.failed[A](e)),
acc => acc.toSink,
)
case None =>
Sink
.futureSink(future.map(_.toSink))
.mapMaterializedValue(_.flatten.recoverWith {
case _: NeverMaterializedException =>
future.flatMap(acc => Source.empty[E].runWith(acc.toSink)(mat))
})
}
}
.mapMaterializedValue(_.flatten)
}
It may be worth trying to get a fix into Pekko so we don't need the workaround though.
In our production systems we observe sporadic
IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriberwhen anEssentialFilterusesAccumulator.flattenand the resulting accumulator is composed with other filters.Accumulator.flattenusesSink.asPublisher(fanout = false)internally, which creates aVirtualPublisherthat allows only a single Reactive Streams subscriber. When materialized, Pekko schedules aStreamSubscriptionTimeout(default 5s). The actual subscriber only connects when the innerFuture[Accumulator]resolves and triggers a nestedSource.fromPublisher(publisher).run(). If that future resolves after the timeout—due to GC pauses, thread pool saturation, or slow filter composition—the timeout injects aCancellingSubscriber, and the real subscription is rejected as a duplicate with theIllegalStateExceptionThe following code reproduces the issue consistently:
This will throw:
We can eliminate
Sink.asPublisher(fanout = false)by using Pekko'sSink.futureSinkin ourfutureToSinkhelper:This has a problem though:
Sink.futureSinkis backed byLazySink, which throwsNeverMaterializedExceptionif the upstream completes before any element arrives (e.g., a GET request with no body). This is becauseLazySinkonly materializes the inner sink on the first element push, and if no elements arrive, there's no materialized value to return. UnlikeSink.lazySink(where the creation function takes the first element as input),Sink.futureSinkdoesn't actually need an element to resolve the inner sink, which is arguably a design gap in Pekko'sSink.futureSinkimplementation.To work around this, we can check whether the future is already resolved at materialization time (the common case), and only fall back to
Sink.futureSinkwhen the future is still pending:It may be worth trying to get a fix into Pekko so we don't need the workaround though.