Skip to content
Navigation Menu
{{ message }}
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.py
More file actions
497 lines (437 loc) · 21.3 KB
/
Copy pathqueue.py
File metadata and controls
497 lines (437 loc) · 21.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
"""Main module with DistributeQueue class"""
import logging
import six
from time import sleep
import traceback
from distributed_queue import core, serializers, routers
from .backends import BackendConnectionError
from .utils import StoppableThread
logger = logging.getLogger('distributed_queue')
class DistributedQueueError(Exception):
"""DistributedQueueError - base class for all errors in this queue
"""
pass
class RegisterError(Exception):
"""Exception that is used to indicate configuration error in Register.
"""
pass
class NotRegisteredError(Exception):
"""Exception that is used to indicate, that requested task is not registered yet.
"""
pass
DEFAULT_BACKEND_SETTINGS_GROUP = 'default'
class DistributedQueue(object):
"""DistributedQueue - inteface for distributed queues
"""
def __init__(self, queue_settings):
"""Create DistributedQueue by specifying queue settings.
QUEUES_SETTINGS = {
'default': {
'backend': 'redis', # required, now available dummy/redis
'serializer': CustomSerializer(), # optional, specify custom serializer
# instead of JSON default
'router': CustomRouter(), # optional, specify custom router
# instead of dummy returning constant `default_queue`
'queues': ['tasks', ], # backend-specific, but required for all currently
# available backends
'default_queue': 'tasks', # required for default router `DefaultRouter`
'host': 'localhost', # backend-specific
'port': 6379, # backend-specific
'database': 0, # backend-specific
}
}
"""
register.attach_task_queue(self)
self.backends = {}
for settings_group, backend_preferences in six.iteritems(queue_settings):
if 'backend' not in backend_preferences:
raise DistributedQueueError("`backend` is required option for backend settings")
if 'queues' not in backend_preferences:
raise DistributedQueueError("`queues` is required option for backend settings")
_backend_preferences = backend_preferences.copy()
router = _backend_preferences.pop('router', None)
if router is None:
if 'default_queue' not in backend_preferences:
raise DistributedQueueError("`queues` is required option for backend settings")
if backend_preferences['default_queue'] not in backend_preferences['queues']:
raise DistributedQueueError("`default_queue` should be listed in `queues`")
router = routers.DefaultRouter(_backend_preferences.pop('default_queue'))
backend_settings = {
'serializer': _backend_preferences.pop('serializer', serializers.JsonSerializer()),
'router': router,
'queues': _backend_preferences.pop('queues'),
}
backend = _backend_preferences.pop('backend')
if isinstance(backend, str):
if backend in core.BACKEND_LIST:
backend = core.create_backend(backend, **_backend_preferences)
else:
raise DistributedQueueError("Unknown backend name `%s`. Expected: %s"\
% (backend, ','.join(core.BACKEND_LIST)))
elif not (hasattr(backend, 'send') and hasattr(backend, 'receive')):
raise DistributedQueueError("Backend is incompatible: " +\
"should have send and receive methods.")
backend_settings['backend'] = backend
self.backends[settings_group] = backend_settings
# pylint: disable=R0913
def send_custom(self, task, args, kwargs, backend_settings_group=None,
queue_name=None, retries=1):
"""Send task to distributed queue, with serializing to string
`.send_custom(
'build_model',
(arg1, arg2),
{'kw1': 1, 'kw2': 2},
queue_name='custom_queue',
retries=0
)`
If `retries` is 0 than retry until success.
"""
# Don't move this to default argument value as we want to accept None
# as an indicator to use DEFAULT value.
if backend_settings_group is None:
backend_settings_group = DEFAULT_BACKEND_SETTINGS_GROUP
backend_settings = self.backends[backend_settings_group]
if queue_name is None:
queue_name = backend_settings['router']\
.get_queue_name(self.backends, task, args, kwargs)
item = backend_settings['serializer'].dumps((task, args, kwargs))
while 1:
try:
return backend_settings['backend'].send(queue_name, item)
except BackendConnectionError:
if retries > 0:
retries -= 1
if retries <= 0:
raise
sleep(1)
def send(self, task, *args, **kwargs):
"""Send task shortcut for pythonic execution style.
`.send('build_model', arg1, arg2, kw1=1, kw2=2)`
"""
return self.send_custom(task, args, kwargs)
def receive(self, backend_settings_group=None, queue_name=None, timeout=0):
"""Receive task from distributed queue, with deserializing from string
If timeout is 0, then block indefinitely.
"""
# Don't move this to default argument value as we want to accept None
# as an indicator to use DEFAULT value.
if backend_settings_group is None:
backend_settings_group = DEFAULT_BACKEND_SETTINGS_GROUP
backend_settings = self.backends[backend_settings_group]
queue_name_list = [queue_name] if queue_name is not None else backend_settings['queues']
received_data = None
while 1:
try:
received_data = backend_settings['backend']\
.receive(queue_name_list, timeout=timeout)
except BackendConnectionError:
if timeout != 0:
timeout -= 1
if timeout <= 0:
break
sleep(1)
else:
break
if received_data is None:
return None
# pylint: disable=W0633
task_id, serialized_task = received_data
return task_id, backend_settings['serializer'].loads(serialized_task)
def keep_alive(self, task_id, backend_settings_group=None, queue_name=None):
"""Send keep-alive message and return False if the task has been canceled.
"""
# Don't move this to default argument value as we want to accept None
# as an indicator to use DEFAULT value.
if backend_settings_group is None:
backend_settings_group = DEFAULT_BACKEND_SETTINGS_GROUP
backend_settings = self.backends[backend_settings_group]
try:
return backend_settings['backend'].keep_alive(task_id, queue_name=queue_name)
except BackendConnectionError:
pass
# Assume that task is still marked alive if backend is unreachable
# It's naive, but what can we do?
return True
def acknowledge(self, task_id, backend_settings_group=None,
queue_name=None, retries=5):
"""Acknowledge task means that it was done and we want to mark it as
done on the backend queue.
"""
# Don't move this to default argument value as we want to accept None
# as an indicator to use DEFAULT value.
if backend_settings_group is None:
backend_settings_group = DEFAULT_BACKEND_SETTINGS_GROUP
backend_settings = self.backends[backend_settings_group]
# pylint: disable=W0612,E1101
for retries_left in six.moves.range(retries, 0, -1):
try:
backend_settings['backend'].acknowledge(task_id, queue_name=queue_name)
except BackendConnectionError:
pass
else:
break
def reject(self, task_id, backend_settings_group=None, queue_name=None):
"""Reject task means that we want to ignore/delete the task.
"""
# Don't move this to default argument value as we want to accept None
# as an indicator to use DEFAULT value.
if backend_settings_group is None:
backend_settings_group = DEFAULT_BACKEND_SETTINGS_GROUP
backend_settings = self.backends[backend_settings_group]
try:
backend_settings['backend'].reject(task_id, queue_name=queue_name)
except BackendConnectionError:
pass
def process(self, backend_settings_group=None):
"""Infinity task processing loop.
"""
# Don't move this to default argument value as we want to accept None
# as an indicator to use DEFAULT value.
if backend_settings_group is None:
backend_settings_group = DEFAULT_BACKEND_SETTINGS_GROUP
while 1:
try:
received_data = self.receive(backend_settings_group=backend_settings_group)
if received_data is not None:
# pylint: disable=W0633
task_id, (task, args, kwargs) = received_data
logger.debug("Received task '%s' with id = %s.", task, task_id)
try:
register.process(task_id, task, args, kwargs)
except NotRegisteredError:
self.reject(task_id, backend_settings_group=backend_settings_group)
logger.info("Task '%s' has been rejected.", task)
except Exception:
logger.exception("Unexpected error.")
else:
logger.info("Task '%s' is finished.", task)
else:
sleep(1)
except KeyboardInterrupt:
logger.info("Keyboard interrupted")
break
except Exception:
logger.exception("Unexpected error")
class Register(object):
"""Register class for storing task names with task processor functions.
"""
CT_STARTED = 'started'
CT_FINISHED = 'finished'
CT_ERROR = 'error'
CALLBACK_TYPE_LIST = [CT_STARTED, CT_FINISHED, CT_ERROR]
registered_task_processors = {}
registered_available_tasks = {}
task_queue = None
def __init__(self):
def register_callback_type_decorator(callback_type):
"""Helper function to register user-friendly methods of callback
registrator decorators.
"""
self.__dict__['task_%s_callback' % callback_type] = lambda *args, **kwargs: \
self._task_CALLBACK_TYPE_callback(callback_type, *args, **kwargs)
for callback_type in self.CALLBACK_TYPE_LIST:
register_callback_type_decorator(callback_type)
def attach_task_queue(self, task_queue):
"""Attach 'task_queue' to registry as queue to use for sending outgoing tasks
"""
self.task_queue = task_queue
def register(self, task_uid, func):
"""Register local task processor.
"""
if task_uid in self.registered_task_processors:
raise RegisterError("Task processor with task_uid '%s' is already "
"registered" % task_uid)
if task_uid not in self.registered_available_tasks:
raise RegisterError("Task processor with task_uid '%s' is not in "
"available tasks. Register available tasks before registering "
"processors." % task_uid)
self.registered_task_processors[task_uid] = func
logger.info("Task processor for task '%s' is registered.", task_uid)
def _send(self, task_settings, *args, **kwargs):
"""Send task to specified queue via specified backend
"""
logger.debug("Sending task '%s'", task_settings['task_uid'])
self.task_queue.send_custom(task_settings['task_uid'], args, kwargs,
backend_settings_group=task_settings.get('backend_settings'),
queue_name=task_settings.get('static_route_queue'))
logger.debug("Task '%s' is sent", task_settings['task_uid'])
def send(self, task_uid, *args, **kwargs):
"""Send task based on registry
"""
if task_uid in self.registered_available_tasks:
self._send(self.registered_available_tasks[task_uid], *args, **kwargs)
else:
logger.error("The task '%s' is not registered", task_uid)
def _keep_alive(self, task_settings, task_id):
"""Send keep-alive message and return True if task has been canceled
"""
logger.debug("Keeping alive task with id = %s", task_id)
return self.task_queue.keep_alive(task_id,
backend_settings_group=task_settings.get('backend_settings'),
queue_name=task_settings.get('static_route_queue'))
def _acknowledge(self, task_settings, task_id):
"""Send keep-alive message and return True if task has been canceled
"""
logger.debug("Acknowledging task with id = %s", task_id)
return self.task_queue.acknowledge(task_id,
backend_settings_group=task_settings.get('backend_settings'),
queue_name=task_settings.get('static_route_queue'))
def register_available_task(self, func_name, task_settings=None):
"""Register available remote (or local) task to run it like
`register.send_build_model(arg1, arg2)`
"""
if task_settings is None:
task_settings = {'task_uid': func_name}
elif 'task_uid' not in task_settings:
task_settings['task_uid'] = func_name
task_uid = task_settings['task_uid']
if task_uid in self.registered_available_tasks:
raise RegisterError("Task with task_uid '%s' is already registered" % task_uid)
self.registered_available_tasks[task_uid] = task_settings
if func_name is not None:
self.__dict__['send_' + func_name] = lambda *args, **kwargs: \
self._send(task_settings, *args, **kwargs)
logger.info("Task %s is available now.",
"'%s' (%s)" % (task_uid, func_name) if func_name else "'%s'" % task_uid)
if 'callbacks_queue_settings' in task_settings:
for callback_type in self.CALLBACK_TYPE_LIST:
try:
self.register_available_task(None, task_settings={
'task_uid': '%s:%s' % (task_uid, callback_type),
'backend_settings': task_settings['callbacks_queue_settings'],
})
except RegisterError:
logger.debug("Callback '%s' couldn't be registered for "
"task uid '%s'. Exception:\n%s",
callback_type, task_uid, traceback.format_exc())
def register_available_tasks(self, available_tasks):
"""Register available remote tasks. We need only task id to get
started, although you can specify backend settings group and
queue_name.
For example:
T_BUILD_MODEL = 'build_model'
register.register_available_tasks([T_BUILD_MODEL])
T_PROCESS_NEW_DATASOURCE = 1
T_TRANSFORM_DATASET = 2
available_tasks = {
'process_new_datasource': {
# you can specify custom task id, i.e. number constant
'task_uid': T_PROCESS_NEW_DATASOURCE,
}
'make_dataset': {
'task_uid': T_TRANSFORM_DATASET,
'backend_settings': 'default',
'callbacks_queue_settings': 'hadoop_results',
# you can even specify the queue_name to apply static routing
'static_route_queue': 'hadoop_workers',
}
}
register.register_available_tasks(available_tasks)
...
register.send(T_PROCESS_NEW_DATASOURCE, link)
register.send_make_dataset(link)
register.send_build_model(settings)
"""
if isinstance(available_tasks, list):
available_tasks = {name: {'task_uid': name} for name in available_tasks}
for func_name, task_settings in six.iteritems(available_tasks):
self.register_available_task(func_name, task_settings)
def process(self, task_id, task, args, kwargs):
"""Process received task.
"""
if task not in self.registered_task_processors:
raise NotRegisteredError
kwargs['dq_task_settings'] = self.registered_available_tasks[task]
kwargs['dq_task_id'] = task_id
self.registered_task_processors[task](*args, **kwargs)
# pylint: disable=R0912
def task(self, task_uid=None, ignore_result=False, preserve_args=None):
"""This is a decorator, which registers a decorated function as a task.
It will not change the function behaviour if it executes directly.
`task_uid` is a unique task function identifier, which is synced
between master and workers. It's suggested to use integers, but you can
use strings as well.
`ignore_results` is a boolean flag than is used to determine whether or
not task status callbacks ought to be sent.
`preserver_args` is a list of arguments that task will receive and will
send back with status callbacks.
"""
if isinstance(task_uid, int):
_task_str_uid = six.text_type(task_uid)
else:
_task_str_uid = task_uid
task_started = _task_str_uid + ':' + self.CT_STARTED
task_finished = _task_str_uid + ':' + self.CT_FINISHED
task_error = _task_str_uid + ':' + self.CT_ERROR
def wrapper(func):
"""Wrapper that registers the task.
"""
def wrapped_func(*args, **kwargs):
"""Wrapped function that will be called by register
when an appropriate task is received
"""
task_settings = kwargs.pop('dq_task_settings')
task_id = kwargs.pop('dq_task_id')
if not ignore_result:
preserved_args = {arg: kwargs[arg] for arg in preserve_args}
register.send(task_started, **preserved_args)
try:
# Run user's function asyncronously, use thread to do that.
func_async = StoppableThread(target=func, args=args, kwargs=kwargs)
func_async.start()
# Send keep-alive and monitor whether task has not been
# canceled yet.
keep_alive_helper = 0
is_stopped = False
while func_async.is_alive():
sleep(0.1)
keep_alive_helper += 1
# Keep-alive every (0.1 * 200) == 20 seconds
if keep_alive_helper == 200:
if not self._keep_alive(task_settings, task_id):
is_stopped = True
func_async.stop()
keep_alive_helper = 0
if is_stopped:
logger.info("Task with id = %s was stopped.", task_id)
func_result = func_async.get_result()
if isinstance(func_result, str):
logger.error("Exception was raised while calling "
"the task '%s'.\n%s", task_uid, func_result)
if not ignore_result:
if isinstance(func_result, str):
register.send(task_error, exception=func_result,
**preserved_args)
else:
register.send(task_finished, _was_stopped=is_stopped,
**dict(preserved_args, **func_result))
self._acknowledge(task_settings, task_id)
except Exception: # pylint: disable=W0703
logger.exception("Exception was raised while calling the task '%s'.", task_uid)
if not ignore_result:
register.send(task_error, exception=traceback.format_exc(),
**preserved_args)
register.register(task_uid, wrapped_func)
return func
return wrapper
def _task_CALLBACK_TYPE_callback(self, callback_type, task_uid):
"""Template of callback function decorator.
"""
def default_error_handler_decorator(task_wrapper):
def wrapper(func):
def wrapped_func(*args, **kwargs):
logger.error("Task crashed on a backend with exception:\n%s", kwargs['exception'])
func(*args, **kwargs)
return task_wrapper(wrapped_func)
return wrapper
if isinstance(task_uid, int):
_task_str_uid = six.text_type(task_uid)
else:
_task_str_uid = task_uid
callback_task_uid = _task_str_uid + ':' + callback_type
wrapper = self.task(task_uid=callback_task_uid, ignore_result=True)
# Set default error handler
if callback_type == self.CT_ERROR:
return default_error_handler_decorator(wrapper)
return wrapper
register = Register()
You can’t perform that action at this time.
