3030
3131from ._record_update import RecordUpdate
3232from ._utils .asyncio import get_running_loop , run_coro_with_timeout
33+ from ._utils .net import (
34+ InterfacesType ,
35+ IPVersion ,
36+ add_multicast_member ,
37+ drop_multicast_member ,
38+ new_respond_socket ,
39+ normalize_interface_choice ,
40+ )
3341from ._utils .time import current_time_millis
3442from .const import _CACHE_CLEANUP_INTERVAL
3543
4351_CLOSE_TIMEOUT = 3000 # ms
4452
4553
54+ def _strip_zone (address : str ) -> str :
55+ """Drop a ``%zone`` suffix from an IPv6 address string."""
56+ percent = address .find ("%" )
57+ return address [:percent ] if percent != - 1 else address
58+
59+
60+ def _interface_bind_address (interface : str | tuple [tuple [str , int , int ], int ]) -> str :
61+ """Return the bind address an interface choice maps to, for diffing."""
62+ if isinstance (interface , tuple ):
63+ return _strip_zone (interface [0 ][0 ])
64+ return interface
65+
66+
67+ def _wrapped_bind_address (wrapped : _WrappedTransport ) -> str :
68+ """Return the bound address of a sender transport, for diffing."""
69+ return _strip_zone (wrapped .sock_name [0 ])
70+
71+
72+ def _wrapped_interface (wrapped : _WrappedTransport ) -> str | tuple [tuple [str , int , int ], int ]:
73+ """Reconstruct the interface representation for a sender, for membership drops."""
74+ sock_name = wrapped .sock_name
75+ if wrapped .is_ipv6 :
76+ scope_id = cast (int , sock_name [3 ]) if len (sock_name ) > 3 else 0
77+ return ((_strip_zone (sock_name [0 ]), 0 , 0 ), scope_id )
78+ return cast (str , sock_name [0 ])
79+
80+
4681class AsyncEngine :
4782 """An engine wraps sockets in the event loop."""
4883
4984 __slots__ = (
5085 "_cleanup_timer" ,
5186 "_listen_socket" ,
87+ "_listen_transport" ,
5288 "_respond_sockets" ,
5389 "_setup_task" ,
5490 "loop" ,
@@ -72,6 +108,7 @@ def __init__(
72108 self .senders : list [_WrappedTransport ] = []
73109 self .running_future : asyncio .Future [bool | None ] | None = None
74110 self ._listen_socket = listen_socket
111+ self ._listen_transport : _WrappedTransport | None = None
75112 self ._respond_sockets = respond_sockets
76113 self ._cleanup_timer : asyncio .TimerHandle | None = None
77114 self ._setup_task : asyncio .Task [None ] | None = None
@@ -114,14 +151,19 @@ async def _async_create_endpoints(self) -> None:
114151 lambda : AsyncListener (self .zc ), # type: ignore[arg-type, return-value]
115152 sock = s ,
116153 )
154+ datagram_transport = cast (asyncio .DatagramTransport , transport )
155+ reader = make_wrapped_transport (datagram_transport )
117156 # Register the wrapped transport before releasing the engine's
118157 # handle so a concurrent shutdown always sees ``s`` in exactly
119158 # one place; do not add an ``await`` between these two steps.
120159 self .protocols .append (cast (AsyncListener , protocol ))
121- self .readers .append (make_wrapped_transport ( cast ( asyncio . DatagramTransport , transport )) )
160+ self .readers .append (reader )
122161 if s in sender_sockets :
123- self .senders .append (make_wrapped_transport (cast ( asyncio . DatagramTransport , transport ) ))
162+ self .senders .append (make_wrapped_transport (datagram_transport ))
124163 if s is self ._listen_socket :
164+ # Keep a handle to the shared listen socket so interface
165+ # rescans can add/drop multicast memberships on it.
166+ self ._listen_transport = reader
125167 self ._listen_socket = None
126168 if s in self ._respond_sockets :
127169 self ._respond_sockets .remove (s )
@@ -134,12 +176,93 @@ def _async_remove_listener(self, listener: AsyncListener) -> None:
134176 instead of raising ``EHOSTUNREACH`` on every send forever.
135177 """
136178 wrapped = listener .transport
137- transport = wrapped .transport if wrapped is not None else None
138179 if listener in self .protocols :
139180 self .protocols .remove (listener )
140- if transport is not None :
141- self .readers = [w for w in self .readers if w .transport is not transport ]
142- self .senders = [w for w in self .senders if w .transport is not transport ]
181+ if wrapped is not None :
182+ self ._async_drop_transport_wrappers (wrapped .transport )
183+
184+ def _async_drop_transport_wrappers (self , transport : asyncio .DatagramTransport ) -> None :
185+ """Drop every reader/sender wrapper that holds ``transport``."""
186+ self .readers = [w for w in self .readers if w .transport is not transport ]
187+ self .senders = [w for w in self .senders if w .transport is not transport ]
188+
189+ async def async_update_interfaces (
190+ self ,
191+ interfaces : InterfacesType ,
192+ ip_version : IPVersion ,
193+ apple_p2p : bool ,
194+ ) -> None :
195+ """Reconcile sender/reader sockets to the live interface set.
196+
197+ Adds a per-interface responder socket for each interface that
198+ appeared and tears down the socket for each interface that
199+ disappeared, diffing on the bound address. The shared listen
200+ socket (including the Default single-family dual-use socket) is
201+ never torn down here.
202+ """
203+ assert self .loop is not None
204+ normalized = normalize_interface_choice (interfaces , ip_version )
205+ desired = {_interface_bind_address (interface ): interface for interface in normalized }
206+ current = {_wrapped_bind_address (wrapped ): wrapped for wrapped in self .senders }
207+ listen_transport = self ._listen_transport
208+ listen_socket = listen_transport .sock if listen_transport is not None else None
209+
210+ for bind_address , wrapped in current .items ():
211+ if bind_address in desired :
212+ continue
213+ if listen_transport is not None and wrapped .transport is listen_transport .transport :
214+ # The shared listen / dual-use socket is not a per-interface
215+ # sender; leaving the group or closing it would break receive.
216+ continue
217+ self ._async_close_sender (wrapped , listen_socket )
218+
219+ for bind_address , interface in desired .items ():
220+ if bind_address in current :
221+ continue
222+ await self ._async_add_interface (interface , listen_socket , apple_p2p )
223+
224+ async def _async_add_interface (
225+ self ,
226+ interface : str | tuple [tuple [str , int , int ], int ],
227+ listen_socket : socket .socket | None ,
228+ apple_p2p : bool ,
229+ ) -> None :
230+ """Join the multicast group and adopt a responder socket for one interface."""
231+ # A unicast instance has no listen socket, so membership is only
232+ # ever managed when ``listen_socket`` is present.
233+ if listen_socket is not None and not add_multicast_member (listen_socket , interface ):
234+ return
235+ respond_socket = new_respond_socket (interface , apple_p2p = apple_p2p , unicast = self .zc .unicast )
236+ if respond_socket is None :
237+ if listen_socket is not None :
238+ drop_multicast_member (listen_socket , interface )
239+ return
240+ await self ._async_adopt_respond_socket (respond_socket )
241+
242+ async def _async_adopt_respond_socket (self , sock : socket .socket ) -> None :
243+ """Wrap a freshly created per-interface responder socket as reader + sender."""
244+ assert self .loop is not None
245+ transport , protocol = await self .loop .create_datagram_endpoint ( # type: ignore[type-var]
246+ lambda : AsyncListener (self .zc ), # type: ignore[arg-type, return-value]
247+ sock = sock ,
248+ )
249+ datagram_transport = cast (asyncio .DatagramTransport , transport )
250+ # No ``await`` between wrapping and registering so a concurrent
251+ # shutdown always sees the transport in exactly one place.
252+ self .protocols .append (cast (AsyncListener , protocol ))
253+ self .readers .append (make_wrapped_transport (datagram_transport ))
254+ self .senders .append (make_wrapped_transport (datagram_transport ))
255+
256+ def _async_close_sender (self , wrapped : _WrappedTransport , listen_socket : socket .socket | None ) -> None :
257+ """Drop a per-interface sender's wrappers/protocol and close its transport."""
258+ transport = wrapped .transport
259+ self .protocols = [
260+ p for p in self .protocols if p .transport is None or p .transport .transport is not transport
261+ ]
262+ self ._async_drop_transport_wrappers (transport )
263+ if listen_socket is not None :
264+ drop_multicast_member (listen_socket , _wrapped_interface (wrapped ))
265+ transport .close ()
143266
144267 def _async_cache_cleanup (self ) -> None :
145268 """Periodic cache cleanup."""
0 commit comments