Toto 1.0.10 documentation

toto.workerconnection

Contents

Source code for toto.workerconnection

import toto
import cPickle as pickle
import zlib
import logging
from threading import Thread
from tornado.options import options
from tornado.gen import Task
from collections import deque
from time import time
from uuid import uuid4
from traceback import format_exc
from toto.options import safe_define

safe_define("worker_compression_module", type=str, help="The module to use for compressing and decompressing messages to workers. The module must have 'decompress' and 'compress' methods. If not specified, no compression will be used. Only the default instance will be affected")
safe_define("worker_serialization_module", type=str, help="The module to use for serializing and deserializing messages to workers. The module must have 'dumps' and 'loads' methods. If not specified, cPickle will be used. Only the default instance will be affected")
safe_define("worker_serialization_mime", type=str, default='application/pickle', help="Used by HttpWorkerConnection in its Content-Type header.")
safe_define("worker_timeout", default=10.0, help="The default worker (instance()) will wait at least this many seconds before retrying a request (if retry is true), or timing out (if retry is false). Negative values will never retry or timeout. Note: This abs(value) is also the minimum resolution of any request-specific timeouts. Must not be 0.")
safe_define("worker_auto_retry", default=False, help="If True, the default timeout behavior of a worker RPC will be to retry instead of failing when the timeout is reached.")
safe_define("worker_retry_count", default=0, help="The maximum number of times to retry a request after timeout. Used by HttpWorkerConnection instead of worker_auto_retry.")
safe_define("worker_address", default='', help="This is the address that toto.workerconnection.invoke(method, params) will send tasks too (As specified in the worker conf file). A comma separated list may be used to round-robin load balance tasks between workers.")
safe_define("worker_transport", default='zmq', help="Either zmq or http to select which transport to use for worker communication.")

WORKER_SOCKET_CONNECT = 'CONNECT'
WORKER_SOCKET_DISCONNECT = 'DISCONNECT'

[docs]class WorkerConnection(object): '''Use a ``WorkerConnection`` to make RPCs to the remote worker service(s) or worker/router specified by ``address``. ``address`` may be either an enumerable of address strings or a string of comma separated addresses. RPC retries and timeouts will happen by at most every ``abs(timeout)`` seconds when a periodic callback runs through all active messages and checks for prolonged requests. This is also the default timeout for any new calls. ``timeout`` must not be ``0``. Optionally pass any object or module with ``compress`` and ``decompress`` methods as the ``compression`` parameter to compress messages. The module must implement the same algorithm used on the worker service. By default, messages are not compressed. Optionally pass any object or module with ``dumps`` and ``loads`` methods that convert an ``object`` to and from a ``str`` to replace the default ``cPickle`` serialization with a protocol of your choice. Use ``auto_retry`` to specify whether or not messages should be retried by default. Retrying messages can cause substantial congestion in your worker service. Use with caution. ''' def __getattr__(self, path): return WorkerInvocation(path, self) def log_error(self, error): logging.error(repr(error)) def enable_traceback_logging(self): from new import instancemethod from traceback import format_exc def log_error(self, e): logging.error(format_exc()) self.log_error = instancemethod(log_error, self) @classmethod
[docs] def instance(cls): '''Returns the default instance of ``WorkerConnection`` as configured by the options prefixed with ``worker_``, instantiating it if necessary. Import the ``workerconnection`` module within your ``TotoService`` and run it with ``--help`` to see all available options. ''' if not hasattr(cls, '_instance'): if options.worker_transport == 'http': from toto.httpworkerconnection import HTTPWorkerConnection cls._instance = HTTPWorkerConnection.instance() else: from toto.zmqworkerconnection import ZMQWorkerConnection cls._instance = ZMQWorkerConnection.instance() return cls._instance
class WorkerInvocation(object): def __init__(self, path, connection): self._path = path self._connection = connection def __call__(self, *args, **kwargs): return self._connection.invoke(self._path, *args, **kwargs) def __getattr__(self, path): return getattr(self._connection, self._path + '.' + path)

Contents