EngineClient: EnginePort improvements by thiell · Pull Request #481 · clustershell/clustershell · GitHub
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions lib/ClusterShell/Defaults.py
2 changes: 1 addition & 1 deletion lib/ClusterShell/Engine/Engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ def start_ports(self):
for port in self._ports:
if not port.registered:
self._debug("START PORT %s" % port)
self.register(port)
self.register(port._start())

def start_clients(self):
"""Start and register regular engine clients in respect of fanout."""
Expand Down
39 changes: 27 additions & 12 deletions lib/ClusterShell/Event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#
# Copyright (C) 2007-2015 CEA/DAM
# Copyright (C) 2015-2017 Stephane Thiell <sthiell@stanford.edu>
# Copyright (C) 2015-2022 Stephane Thiell <sthiell@stanford.edu>
#
# This file is part of ClusterShell.
#
Expand All @@ -23,18 +23,20 @@

This module contains the base class :class:`.EventHandler` which defines
a simple interface to handle events generated by :class:`.Worker`,
:class:`.EventTimer` and :class:`.EventPort` objects.
:class:`.EngineTimer` and :class:`.EnginePort` objects.
"""


class EventHandler(object):
"""ClusterShell EventHandler interface.

Derived class should implement any of the following methods to listen for
:class:`.Worker`, :class:`.EventTimer` or :class:`.EventPort` selected
events. If not implemented, the default behavior is to do nothing.
:class:`.Worker`, :class:`.EnginePort` or :class:`.EngineTimer` events.
If not implemented, the default behavior is to do nothing.
"""

### Worker events

def ev_start(self, worker):
"""
Called to indicate that a worker has just started.
Expand Down Expand Up @@ -142,25 +144,38 @@ def ev_close(self, worker, timedout):
:param timedout: boolean set to True if the worker has timed out
"""

def _ev_routing(self, worker, arg):
"""
Routing event (private). Called to indicate that a (meta)worker has just
updated one of its route path. You can safely ignore this event.
"""

### EnginePort events

def ev_port_start(self, port):
"""
Called to indicate that a :class:`.EnginePort` object has just started.

:param port: :class:`.EnginePort` derived object
"""

def ev_msg(self, port, msg):
"""
Called to indicate that a message has been received on an EnginePort.
Called to indicate that a message has been received on an
:class:`.EnginePort`.

Used to deliver messages reliably between tasks.

:param port: EnginePort object on which a message has been received
:param port: :class:`.EnginePort` object on which a message has been
received
:param msg: the message object received
"""

### EngineTimer events

def ev_timer(self, timer):
"""
Called to indicate that a timer is firing.

:param timer: :class:`.EngineTimer` object that is firing
"""

def _ev_routing(self, worker, arg):
"""
Routing event (private). Called to indicate that a (meta)worker has just
updated one of its route path. You can safely ignore this event.
"""
13 changes: 8 additions & 5 deletions lib/ClusterShell/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,16 @@ class _SyncMsgHandler(EventHandler):
"""Special task control port event handler.
When a message is received on the port, call appropriate
task method."""
def __init__(self, task):
EventHandler.__init__(self)
self.task = task

def ev_msg(self, port, msg):
"""Message received: call appropriate task method."""
# pull out function and its arguments from message
func, (args, kwargs) = msg[0], msg[1:]
# call task method
func(port.task, *args, **kwargs)
func(self.task, *args, **kwargs)

class tasksyncmethod(object):
"""Class encapsulating a function that checks if the calling
Expand Down Expand Up @@ -320,9 +324,8 @@ def __init__(self, thread=None, defaults=None):
self._reset()

# special engine port for task method dispatching
self._dispatch_port = EnginePort(self,
handler=Task._SyncMsgHandler(),
autoclose=True)
self._dispatch_port = EnginePort(handler=Task._SyncMsgHandler(self),
autoclose=True)
self._engine.add(self._dispatch_port)

# set taskid used as Thread name
Expand Down Expand Up @@ -703,7 +706,7 @@ def port(self, handler=None, autoclose=False):
is not set, the task can only receive messages on the port by
calling port.msg_recv().
"""
port = EnginePort(self, handler, autoclose)
port = EnginePort(handler, autoclose)
self._add_port(port)
return port

Expand Down
15 changes: 7 additions & 8 deletions lib/ClusterShell/Worker/EngineClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@

import threading

from ClusterShell.Defaults import DEFAULTS
from ClusterShell.Worker.fastsubprocess import Popen, PIPE, STDOUT, \
set_nonblock_flag

from ClusterShell.Engine.Engine import EngineBaseTimer, E_READ, E_WRITE


Expand Down Expand Up @@ -251,7 +251,7 @@ def _fire(self):
def _start(self):
"""
Starts client and returns client instance as a convenience.
Derived classes (except EnginePort) must implement.
Derived classes must implement.
"""
raise NotImplementedError("Derived classes must implement.")

Expand Down Expand Up @@ -470,18 +470,17 @@ def sync(self):
if self._sync_msg:
self.reply_lock.acquire()

def __init__(self, task, handler=None, autoclose=False):
def __init__(self, handler=None, autoclose=False):
"""
Initialize EnginePort object.
"""
EngineClient.__init__(self, None, None, False, -1, autoclose)
self.task = task
self.eh = handler
# ports are no subject to fanout
self.delayable = False

# Port messages queue
self._msgq = queue.Queue(self.task.default("port_qlimit"))
self._msgq = queue.Queue(DEFAULTS.port_qlimit)

# Request pipe
(readfd, writefd) = os.pipe()
Expand All @@ -505,6 +504,8 @@ def __repr__(self):

def _start(self):
"""Start port."""
if self.eh is not None:
self.eh.ev_port_start(self)
return self

def _close(self, abort, timeout):
Expand All @@ -514,9 +515,7 @@ def _close(self, abort, timeout):
try:
while not self._msgq.empty():
pmsg = self._msgq.get(block=False)
if self.task.info("debug", False):
self.task.info("print_debug")(self.task,
"EnginePort: dropped msg: %s" % str(pmsg.get()))
LOGGER.debug('%r: dropped msg: %s', self, pmsg.get())
except queue.Empty:
pass
self._msgq = None
Expand Down
42 changes: 42 additions & 0 deletions tests/DefaultsTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,48 @@ def test_002_changed(self):
self.assertFalse(self.defaults.stdout_msgtree)
self.assertFalse(self.defaults.stderr_msgtree)
self.assertEqual(self.defaults.engine, 'select')
self.assertEqual(self.defaults.port_qlimit, 1000) # 1.8 compat
self.assertFalse(self.defaults.auto_tree)
self.assertEqual(self.defaults.local_workername, 'none')
self.assertEqual(self.defaults.distant_workername, 'pdsh')
# task_info
self.assertTrue(self.defaults.debug)
self.assertEqual(self.defaults.fanout, 256)
self.assertEqual(self.defaults.grooming_delay, 0.5)
self.assertEqual(self.defaults.connect_timeout, 12.5)

def test_003_engine(self):
"""test Defaults config file (engine section)"""
conf_test = make_temp_file(dedent("""
[nodeset]
fold_axis: -1

[task.default]
stderr: true
stdout_msgtree: false
stderr_msgtree: false
engine: select
auto_tree: false
local_workername: none
distant_workername: pdsh

[task.info]
debug: true
fanout: 256
grooming_delay: 0.5
connect_timeout: 12.5
command_timeout: 30.5

[engine]
port_qlimit: 1000""").encode('ascii'))
self.defaults = Defaults(filenames=[conf_test.name])
# nodeset
self.assertEqual(self.defaults.fold_axis, (-1,))
# task_default
self.assertTrue(self.defaults.stderr)
self.assertFalse(self.defaults.stdout_msgtree)
self.assertFalse(self.defaults.stderr_msgtree)
self.assertEqual(self.defaults.engine, 'select')
self.assertEqual(self.defaults.port_qlimit, 1000)
self.assertFalse(self.defaults.auto_tree)
self.assertEqual(self.defaults.local_workername, 'none')
Expand Down
9 changes: 7 additions & 2 deletions tests/TaskPortTest.py