Add auto-modifying cyclic tasks by bessman · Pull Request #703 · hardbyte/python-can · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions can/broadcastmanager.py
20 changes: 15 additions & 5 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, ABCMeta, abstractmethod
from enum import Enum, auto
from time import time
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, cast
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union, cast

import can
import can.typechecking
Expand Down Expand Up @@ -195,6 +195,7 @@ def send_periodic(
period: float,
duration: Optional[float] = None,
store_task: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

Expand All @@ -216,6 +217,10 @@ def send_periodic(
:param store_task:
If True (the default) the task will be attached to this Bus instance.
Disable to instead manage tasks manually.
:param modifier_callback:
Function which should be used to modify each message's data before
sending. The callback modifies the :attr:`~can.Message.data` of the
message and returns ``None``.
:return:
A started task instance. Note the task can be stopped (and depending on
the backend modified) by calling the task's
Expand All @@ -230,7 +235,7 @@ def send_periodic(

.. note::

For extremely long running Bus instances with many short lived
For extremely long-running Bus instances with many short-lived
tasks the default api with ``store_task==True`` may not be
appropriate as the stopped tasks are still taking up memory as they
are associated with the Bus instance.
Expand All @@ -247,9 +252,8 @@ def send_periodic(
# Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
task = cast(
_SelfRemovingCyclicTask,
self._send_periodic_internal(msgs, period, duration),
self._send_periodic_internal(msgs, period, duration, modifier_callback),
)

# we wrap the task's stop method to also remove it from the Bus's list of tasks
periodic_tasks = self._periodic_tasks
original_stop_method = task.stop
Expand All @@ -275,6 +279,7 @@ def _send_periodic_internal(
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Expand All @@ -298,7 +303,12 @@ def _send_periodic_internal(
threading.Lock()
)
task = ThreadBasedCyclicSendTask(
self, self._lock_send_periodic, msgs, period, duration
bus=self,
lock=self._lock_send_periodic,
messages=msgs,
period=period,
duration=duration,
modifier_callback=modifier_callback,
)
return task

Expand Down
22 changes: 17 additions & 5 deletions can/interfaces/ixxat/canlib.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from typing import Optional
from typing import Callable, Optional, Sequence, Union

import can.interfaces.ixxat.canlib_vcinpl as vcinpl
import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2
from can import BusABC, Message
from can.bus import BusState
from can import (
BusABC,
BusState,
CyclicSendTaskABC,
Message,
)


class IXXATBus(BusABC):
Expand Down Expand Up @@ -145,8 +149,16 @@ def _recv_internal(self, timeout):
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
return self.bus.send(msg, timeout)

def _send_periodic_internal(self, msgs, period, duration=None):
return self.bus._send_periodic_internal(msgs, period, duration)
def _send_periodic_internal(
self,
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> CyclicSendTaskABC:
return self.bus._send_periodic_internal(
msgs, period, duration, modifier_callback
)

def shutdown(self) -> None:
super().shutdown()
Expand Down
56 changes: 41 additions & 15 deletions can/interfaces/ixxat/canlib_vcinpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import functools
import logging
import sys
from typing import Callable, Optional, Tuple

from can import BusABC, CanProtocol, Message
from can.broadcastmanager import (
import warnings
from typing import Callable, Optional, Sequence, Tuple, Union

from can import (
BusABC,
BusState,
CanProtocol,
CyclicSendTaskABC,
LimitedDurationCyclicSendTaskABC,
Message,
RestartableCyclicTaskABC,
)
from can.bus import BusState
from can.ctypesutil import HANDLE, PHANDLE, CLibrary
from can.ctypesutil import HRESULT as ctypes_HRESULT
from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError
Expand Down Expand Up @@ -785,17 +789,39 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
# Want to log outgoing messages?
# log.log(self.RECV_LOGGING_LEVEL, "Sent: %s", message)

def _send_periodic_internal(self, msgs, period, duration=None):
def _send_periodic_internal(
self,
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
modifier_callback: Optional[Callable[[Message], None]] = None,
) -> CyclicSendTaskABC:
"""Send a message using built-in cyclic transmit list functionality."""
if self._scheduler is None:
self._scheduler = HANDLE()
_canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler)
caps = structures.CANCAPABILITIES()
_canlib.canSchedulerGetCaps(self._scheduler, caps)
self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
return CyclicSendTask(
self._scheduler, msgs, period, duration, self._scheduler_resolution
if modifier_callback is None:
if self._scheduler is None:
self._scheduler = HANDLE()
_canlib.canSchedulerOpen(
self._device_handle, self.channel, self._scheduler
)
caps = structures.CANCAPABILITIES()
_canlib.canSchedulerGetCaps(self._scheduler, caps)
self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
return CyclicSendTask(
self._scheduler, msgs, period, duration, self._scheduler_resolution
)

# fallback to thread based cyclic task
warnings.warn(
f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
"when the `modifier_callback` argument is given."
)
return BusABC._send_periodic_internal(
self,
msgs=msgs,
period=period,
duration=duration,
modifier_callback=modifier_callback,
)

def shutdown(self):
Expand Down
56 changes: 41 additions & 15 deletions can/interfaces/ixxat/canlib_vcinpl2.py
Loading