Add support for Lambda asynchronous event retries by dominikschubert · Pull Request #7259 · localstack/localstack · GitHub
Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions localstack/config.py
1 change: 1 addition & 0 deletions localstack/services/awslambda/invocation/lambda_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Invocation:
invoked_arn: str
client_context: Optional[str]
invocation_type: InvocationType
invoke_time: datetime


@dataclasses.dataclass(frozen=True)
Expand Down
2 changes: 2 additions & 0 deletions localstack/services/awslambda/invocation/lambda_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random
import uuid
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from datetime import datetime
from hashlib import sha256
from threading import RLock
from typing import TYPE_CHECKING, Dict, Optional
Expand Down Expand Up @@ -229,6 +230,7 @@ def invoke(
invoked_arn=invoked_arn,
client_context=client_context,
invocation_type=invocation_type,
invoke_time=datetime.now(),
)
)

Expand Down
98 changes: 76 additions & 22 deletions localstack/services/awslambda/invocation/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import logging
import queue
import threading
import time
import uuid
from concurrent.futures import Future
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Queue
from threading import Thread
Expand Down Expand Up @@ -132,6 +133,8 @@ class LambdaVersionManager(ServiceEndpoint):
# TODO not sure about this backlink, maybe a callback is better?
lambda_service: "LambdaService"

destination_execution_pool: ThreadPoolExecutor

def __init__(
self,
function_arn: str,
Expand All @@ -150,6 +153,9 @@ def __init__(
self.shutdown_event = threading.Event()
self.state = None
self.log_handler = LogHandler()
self.destination_execution_pool = ThreadPoolExecutor(
thread_name_prefix=f"lambda-destination-processor-{function_version.id.function_name}"
)

def start(self) -> None:
try:
Expand Down Expand Up @@ -183,6 +189,10 @@ def stop(self) -> None:
state=State.Inactive, code=StateReasonCode.Idle, reason="Shutting down"
)
self.shutdown_event.set()
self.destination_execution_pool.shutdown(
cancel_futures=True
) # TODO: give it a grace period waiting, otherwise fail

self.queued_invocations.put(QUEUE_SHUTDOWN)
self.available_environments.put(QUEUE_SHUTDOWN)
if self.invocation_thread:
Expand Down Expand Up @@ -286,12 +296,16 @@ def invocation_loop(self) -> None:
except Exception as e:
queued_invocation.result_future.set_exception(e)

def invoke(self, *, invocation: Invocation) -> Future[InvocationResult] | None:
def invoke(
self, *, invocation: Invocation, current_retry: int = 0, invocation_id: str | None = None
) -> Future[InvocationResult] | None:
future = Future() if invocation.invocation_type == "RequestResponse" else None
if invocation_id is None:
invocation_id = str(uuid.uuid4())
invocation_storage = QueuedInvocation(
invocation_id=str(uuid.uuid4()),
invocation_id=invocation_id,
result_future=future,
retries=1,
retries=current_retry,
invocation=invocation,
)
self.queued_invocations.put(invocation_storage)
Expand Down Expand Up @@ -333,11 +347,11 @@ def store_logs(self, invocation_result: InvocationResult, executor: RuntimeEnvir
)

def process_event_destinations(
self, invocation_result: InvocationResult | InvocationError, original_payload: bytes
self,
invocation_result: InvocationResult | InvocationError,
original_invocation: RunningInvocation,
original_payload: bytes,
) -> None:
"""
TODO: handle this asynchronously, to not block the rest of the execution
"""
LOG.debug("Got event invocation with id %s", invocation_result.invocation_id)

# 1. Handle DLQ routing
Expand All @@ -360,25 +374,27 @@ def process_event_destinations(
if event_invoke_config is None:
return

# TODO: we need more information about the invocation event

if isinstance(invocation_result, InvocationResult):
LOG.debug("Handling success destination for %s", self.function_arn)
success_destination = event_invoke_config.destination_config.get("OnSuccess", {}).get(
"Destination"
)
if success_destination is None:
return
destination_payload = {
"version": "1.0",
"timestamp": timestamp_millis(), # TODO
"timestamp": timestamp_millis(),
"requestContext": {
"requestId": invocation_result.invocation_id,
"functionArn": self.function_version.qualified_arn,
"condition": "Success",
"approximateInvokeCount": 1,
"approximateInvokeCount": original_invocation.invocation.retries + 1,
},
"requestPayload": json.loads(to_str(original_payload)),
"responseContext": {"statusCode": 200, "executedVersion": "$LATEST"},
"responseContext": {
"statusCode": 200,
"executedVersion": self.function_version.id.qualifier,
},
"responsePayload": json.loads(to_str(invocation_result.payload or {})),
}

Expand All @@ -388,25 +404,61 @@ def process_event_destinations(
)

elif isinstance(invocation_result, InvocationError):
LOG.debug("Handling error destination for %s", self.function_arn)

failure_destination = event_invoke_config.destination_config.get("OnFailure", {}).get(
"Destination"
)

max_retry_attempts = event_invoke_config.maximum_retry_attempts
previous_retry_attempts = original_invocation.invocation.retries

if max_retry_attempts > 0 and max_retry_attempts > previous_retry_attempts:
delay_queue_invoke_seconds = config.LAMBDA_RETRY_BASE_DELAY_SECONDS * (
previous_retry_attempts + 1
)

time_passed = datetime.now() - original_invocation.invocation.invocation.invoke_time
enough_time_for_retry = (
event_invoke_config.maximum_event_age_in_seconds
and time_passed.seconds + delay_queue_invoke_seconds
<= event_invoke_config.maximum_event_age_in_seconds
)

if (
event_invoke_config.maximum_event_age_in_seconds is None
or enough_time_for_retry
):
time.sleep(delay_queue_invoke_seconds)
LOG.debug("Retrying lambda invocation for %s", self.function_arn)
self.invoke(
invocation=original_invocation.invocation.invocation,
current_retry=previous_retry_attempts + 1,
invocation_id=original_invocation.invocation.invocation_id,
)
return

failure_cause = "EventAgeExceeded"
else:
failure_cause = "RetriesExhausted"

if failure_destination is None:
return

destination_payload = {
"version": "1.0",
"timestamp": timestamp_millis(), # TODO
"timestamp": timestamp_millis(),
"requestContext": {
"requestId": invocation_result.invocation_id,
"functionArn": self.function_version.qualified_arn,
"condition": "RetriesExhausted", # TODO we don't know that here
"approximateInvokeCount": 1,
"condition": failure_cause,
"approximateInvokeCount": previous_retry_attempts + 1,
},
"requestPayload": json.loads(to_str(original_payload)),
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled", # TODO
"executedVersion": self.function_version.id.qualifier,
"functionError": "Unhandled",
},
"responsePayload": json.loads(to_str(invocation_result.payload)),
}
Expand All @@ -418,8 +470,6 @@ def process_event_destinations(
else:
raise ValueError("Unknown type for invocation result received.")

# TODO make async to not block other executions

def invocation_response(
self, invoke_id: str, invocation_result: Union[InvocationResult, InvocationError]
) -> None:
Expand All @@ -432,18 +482,22 @@ def invocation_response(
invocation_result.logs = running_invocation.logs
invocation_result.executed_version = self.function_version.id.qualifier
executor = running_invocation.executor

if running_invocation.invocation.invocation.invocation_type == "RequestResponse":
running_invocation.invocation.result_future.set_result(invocation_result)
else:
self.process_event_destinations(
self.destination_execution_pool.submit(
Comment thread
dominikschubert marked this conversation as resolved.
self.process_event_destinations,
invocation_result=invocation_result,
original_invocation=running_invocation,
original_payload=running_invocation.invocation.invocation.payload,
)

self.store_logs(invocation_result=invocation_result, executor=executor)

# mark executor available again
executor.invocation_done()
self.available_environments.put(executor)
self.store_logs(invocation_result=invocation_result, executor=executor)

# Service Endpoint implementation
def invocation_result(self, invoke_id: str, invocation_result: InvocationResult) -> None:
Expand Down
2 changes: 1 addition & 1 deletion localstack/services/awslambda/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -2399,7 +2399,7 @@ def list_function_event_invoke_configs(

event_invoke_configs = PaginatedList(event_invoke_configs)
page, token = event_invoke_configs.get_page(
lambda x: x,
lambda x: x["FunctionArn"],
marker,
max_items,
)
Expand Down
4 changes: 3 additions & 1 deletion localstack/testing/snapshots/transformer_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ def sqs_api():
"""
return [
TransformerUtility.key_value("ReceiptHandle"),
TransformerUtility.key_value("SenderId"),
TransformerUtility.key_value(
"SenderId"
), # TODO: flaky against AWS (e.g. /Attributes/SenderId '<sender-id:1>' → '<sender-id:2>' ... (expected → actual))
TransformerUtility.key_value("SequenceNumber"),
TransformerUtility.jsonpath("$..MessageAttributes.RequestID.StringValue", "request-id"),
KeyValueBasedTransformer(_resource_name_transformer, "resource"),
Expand Down
7 changes: 7 additions & 0 deletions tests/integration/awslambda/functions/lambda_echofail.py
Loading