Accumulator.flatten causes sporadic IllegalStateException in filter chains · Issue #13793 · playframework/playframework · GitHub
Skip to content

Accumulator.flatten causes sporadic IllegalStateException in filter chains #13793

@gmethvin

Description

@gmethvin

In our production systems we observe sporadic IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber when an EssentialFilter uses Accumulator.flatten and the resulting accumulator is composed with other filters.

Accumulator.flatten uses Sink.asPublisher(fanout = false) internally, which creates a VirtualPublisher that allows only a single Reactive Streams subscriber. When materialized, Pekko schedules a StreamSubscriptionTimeout (default 5s). The actual subscriber only connects when the inner Future[Accumulator] resolves and triggers a nested Source.fromPublisher(publisher).run(). If that future resolves after the timeout—due to GC pauses, thread pool saturation, or slow filter composition—the timeout injects a CancellingSubscriber, and the real subscription is rejected as a duplicate with the IllegalStateException

The following code reproduces the issue consistently:

// Shorten the subscription timeout so we can trigger it without waiting 5s
val config = ConfigFactory.parseString(
  "pekko.stream.materializer.subscription-timeout.timeout = 300ms"
).withFallback(ConfigFactory.load())
val system = ActorSystem("demo", config)
val mat = SystemMaterializer(system).materializer
val promise = Promise[Accumulator[ByteString, ByteString]]()
val result = Source.single(ByteString("hello"))
  .runWith(Accumulator.flatten(promise.future)(mat).toSink)(mat)
// Wait past the timeout, so a CancellingSubscriber now occupies the VirtualPublisher
Thread.sleep(500)
// Resolving the future triggers a second subscribe(), which is rejected
promise.success(Accumulator(Sink.fold(ByteString.empty)(_ ++ _)))
println(Await.result(result, 5.seconds))

This will throw:

Exception in thread "main" java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.11)
	at org.apache.pekko.stream.impl.ReactiveStreamsCompliance$.rejectAdditionalSubscriber(ReactiveStreamsCompliance.scala:72)
	at org.apache.pekko.stream.impl.VirtualPublisher.rec$6(StreamLayout.scala:491)
	at org.apache.pekko.stream.impl.VirtualPublisher.subscribe(StreamLayout.scala:496)
	at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter$BatchingActorInputBoundary.preStart(ActorGraphInterpreter.scala:177)
	at org.apache.pekko.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:316)
	at org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:631)
	at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:740)
	at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:789)
	at org.apache.pekko.actor.Actor.aroundPreStart(Actor.scala:558)
	at org.apache.pekko.actor.Actor.aroundPreStart$(Actor.scala:481)
	at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:729)
	at org.apache.pekko.actor.ActorCell.create(ActorCell.scala:654)
	at org.apache.pekko.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
	at org.apache.pekko.actor.ActorCell.systemInvoke(ActorCell.scala:545)
	at org.apache.pekko.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:305)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:240)
	at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)

We can eliminate Sink.asPublisher(fanout = false) by using Pekko's Sink.futureSink in our futureToSink helper:

private[streams] def futureToSink[E, A](
  future: Future[Accumulator[E, A]]
)(implicit ec: ExecutionContext): Sink[E, Future[A]] = {
  Sink.futureSink(future.map(_.toSink)).mapMaterializedValue(_.flatten)
}

This has a problem though: Sink.futureSink is backed by LazySink, which throws NeverMaterializedException if the upstream completes before any element arrives (e.g., a GET request with no body). This is because LazySink only materializes the inner sink on the first element push, and if no elements arrive, there's no materialized value to return. Unlike Sink.lazySink (where the creation function takes the first element as input), Sink.futureSink doesn't actually need an element to resolve the inner sink, which is arguably a design gap in Pekko's Sink.futureSink implementation.

To work around this, we can check whether the future is already resolved at materialization time (the common case), and only fall back to Sink.futureSink when the future is still pending:

private[streams] def futureToSink[E, A](
  future: Future[Accumulator[E, A]]
)(implicit ec: ExecutionContext): Sink[E, Future[A]] = {
  Sink
    .fromMaterializer { (mat, _) =>
      future.value match {
        case Some(result) =>
          result.fold(
            e => Sink.cancelled[E].mapMaterializedValue(_ => Future.failed[A](e)),
            acc => acc.toSink,
          )
        case None =>
          Sink
            .futureSink(future.map(_.toSink))
            .mapMaterializedValue(_.flatten.recoverWith {
              case _: NeverMaterializedException =>
                future.flatMap(acc => Source.empty[E].runWith(acc.toSink)(mat))
            })
      }
    }
    .mapMaterializedValue(_.flatten)
}

It may be worth trying to get a fix into Pekko so we don't need the workaround though.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions