feat: add BaileysMessageProcessor for improved message handling by Santosl2 · Pull Request #1726 · EvolutionAPI/evolution-api · GitHub
Skip to content

feat: add BaileysMessageProcessor for improved message handling#1726

Merged
DavidsonGomes merged 3 commits intoEvolutionAPI:developfrom
Santosl2:feat/add-async-lock
Jul 16, 2025
Merged

feat: add BaileysMessageProcessor for improved message handling#1726
DavidsonGomes merged 3 commits intoEvolutionAPI:developfrom
Santosl2:feat/add-async-lock

Conversation

@Santosl2
Copy link
Copy Markdown
Contributor

@Santosl2 Santosl2 commented Jul 16, 2025

PT

Essa implementação adiciona uma fila reativa para o processamento de mensagens recebidas pelo Baileys, utilizando RxJS. O objetivo é garantir que os lotes de mensagens sejam processados de forma sequencial, evitando concorrência e race conditions que poderiam ocorrer em fluxos com alto volume.

Com o uso do Subject em conjunto com concatMap, garantimos que uma nova mensagem só será processada após a conclusão da anterior. Isso é especialmente importante em cenários onde o onMessageReceive realiza operações assíncronas que não devem ocorrer em paralelo (ex: gravação em banco, chamadas de API, etc).

Além disso, o tratamento de erros impede que falhas isoladas interrompam o fluxo completo de mensagens.

Essa abordagem torna o processamento mais previsível, seguro e resiliente, principalmente sob carga.

ENHere’s the English version for the PR:


This implementation introduces a reactive queue for processing incoming Baileys messages using RxJS. The goal is to ensure that message batches are processed sequentially, avoiding concurrency issues and race conditions that could occur under high message volume.

By leveraging a Subject combined with concatMap, we ensure that each batch is only processed after the previous one completes. This is critical in scenarios where onMessageReceive performs asynchronous operations that must not run in parallel (e.g., database writes, external API calls).

Error handling is also included to prevent a single failure from breaking the entire message stream.

Overall, this approach improves the reliability, safety, and predictability of message processing, especially under heavy load.

Summary by Sourcery

Introduce BaileysMessageProcessor leveraging RxJS to asynchronously batch and process incoming WhatsApp messages, integrate it into BaileysStartupService for message handling and cleanup

New Features:

  • Introduce BaileysMessageProcessor class to asynchronously process incoming WhatsApp messages using RxJS streams
  • Mount BaileysMessageProcessor in BaileysStartupService to handle messages and teardown on logout

Enhancements:

  • Refactor message upsert handling in BaileysStartupService to route through the new processor instead of direct invocation

Build:

  • Add rxjs as a dependency in package.json

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai Bot commented Jul 16, 2025

@Santosl2 Santosl2 changed the base branch from main to develop July 16, 2025 00:35
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Santosl2 - I've reviewed your changes - here's some feedback:

  • Consider guarding in processMessage to log or throw if mount wasn’t called first, so you don’t emit into an uninitialized Subject.
  • To prevent unbounded queue growth under load, think about adding a concurrency/backpressure strategy (e.g. mergeMap with a concurrency limit) instead of using an unbounded Subject with concatMap.
  • Once BaileysMessageProcessor is fully in place, remove the commented-out direct call to messageHandle to keep the event loop code clean.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Consider guarding in processMessage to log or throw if mount wasn’t called first, so you don’t emit into an uninitialized Subject.
- To prevent unbounded queue growth under load, think about adding a concurrency/backpressure strategy (e.g. mergeMap with a concurrency limit) instead of using an unbounded Subject with concatMap.
- Once BaileysMessageProcessor is fully in place, remove the commented-out direct call to messageHandle to keep the event loop code clean.

## Individual Comments

### Comment 1
<location> `src/api/integrations/channel/whatsapp/baileysMessage.processor.ts:44` </location>
<code_context>
+
+  processMessage(payload: MessageUpsertPayload, settings: any) {
+    const { messages, type, requestId } = payload;
+    this.messageSubject.next({ messages, type, requestId, settings });
+  }
+
</code_context>

<issue_to_address>
No backpressure or queue size limit is enforced on messageSubject.

This could cause memory issues if many messages are processed rapidly. Consider implementing a queue size limit or backpressure mechanism.

Suggested implementation:

```typescript
  // Limit the queue to 100 messages (adjust as needed)
  protected readonly MESSAGE_QUEUE_LIMIT = 100;
  protected messageQueue: Array<{ messages: any; type: any; requestId: any; settings: any }> = [];
  protected messageSubject = new Subject<{

```

```typescript
  processMessage(payload: MessageUpsertPayload, settings: any) {
    const { messages, type, requestId } = payload;
    if (this.messageQueue.length >= this.MESSAGE_QUEUE_LIMIT) {
      // Drop the oldest message to make room for the new one (or log/drop new, as preferred)
      this.processorLogs.warn(
        `Message queue full (${this.MESSAGE_QUEUE_LIMIT}). Dropping oldest message.`
      );
      this.messageQueue.shift();
    }
    const message = { messages, type, requestId, settings };
    this.messageQueue.push(message);
    this.messageSubject.next(message);
  }

```
</issue_to_address>

### Comment 2
<location> `src/api/integrations/channel/whatsapp/baileysMessage.processor.ts:48` </location>
<code_context>
+  }
+
+  onDestroy() {
+    this.subscription?.unsubscribe();
+    this.messageSubject.complete();
+  }
+}
</code_context>

<issue_to_address>
Unsubscribing and completing the Subject may cause issues if processMessage is called after onDestroy.

Calling processMessage after onDestroy will throw an error due to the completed Subject. Please guard processMessage or document its usage constraints.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.


processMessage(payload: MessageUpsertPayload, settings: any) {
const { messages, type, requestId } = payload;
this.messageSubject.next({ messages, type, requestId, settings });
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): No backpressure or queue size limit is enforced on messageSubject.

This could cause memory issues if many messages are processed rapidly. Consider implementing a queue size limit or backpressure mechanism.

Suggested implementation:

  // Limit the queue to 100 messages (adjust as needed)
  protected readonly MESSAGE_QUEUE_LIMIT = 100;
  protected messageQueue: Array<{ messages: any; type: any; requestId: any; settings: any }> = [];
  protected messageSubject = new Subject<{
  processMessage(payload: MessageUpsertPayload, settings: any) {
    const { messages, type, requestId } = payload;
    if (this.messageQueue.length >= this.MESSAGE_QUEUE_LIMIT) {
      // Drop the oldest message to make room for the new one (or log/drop new, as preferred)
      this.processorLogs.warn(
        `Message queue full (${this.MESSAGE_QUEUE_LIMIT}). Dropping oldest message.`
      );
      this.messageQueue.shift();
    }
    const message = { messages, type, requestId, settings };
    this.messageQueue.push(message);
    this.messageSubject.next(message);
  }

Comment on lines +48 to +49
this.subscription?.unsubscribe();
this.messageSubject.complete();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Unsubscribing and completing the Subject may cause issues if processMessage is called after onDestroy.

Calling processMessage after onDestroy will throw an error due to the completed Subject. Please guard processMessage or document its usage constraints.

@Santosl2 Santosl2 changed the title feat: add BaileysMessageProcessor for improved message handling and i… feat: add BaileysMessageProcessor for improved message handling Jul 16, 2025
@DavidsonGomes
Copy link
Copy Markdown
Collaborator

@DavidsonGomes DavidsonGomes merged commit 9fd40a4 into EvolutionAPI:develop Jul 16, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants