import toto
import cPickle as pickle
import zlib
import logging
from toto.exceptions import *
from toto.workerconnection import WorkerConnection
from threading import Thread, Lock
from tornado.options import options
from tornado.gen import coroutine, Return
from tornado.ioloop import IOLoop
from collections import deque
from tornado.httpclient import HTTPRequest, AsyncHTTPClient, HTTPError
from tornado.concurrent import Future
from sys import exc_info
from time import time
from uuid import uuid4
from traceback import format_exc
from toto.options import safe_define
from random import shuffle
class _Request(object):
def __init__(self, headers, body, timeout, retry_count, future, callback=None):
self.headers = headers
self.body = body
self.timeout = timeout
self.retry_count = retry_count
self.callback = callback
self.future = future
self.request_id = uuid4()
def request(self, url):
return HTTPRequest(url=url, method='POST', headers=self.headers, body=self.body)
def handle_response(self, response):
self.callback(self, response)
def run_request(self, url):
client = AsyncHTTPClient()
client.fetch(self.request(url), callback=self.handle_response, raise_error=False)
[docs]class HTTPWorkerConnection(WorkerConnection):
'''Use a ``WorkerConnection`` to make RPCs to the remote worker service(s) or worker/router specified by ``address``.
``address`` may be either an enumerable of address strings or a string of comma separated addresses. RPC retries
and timeouts will happen by at most every ``abs(timeout)`` seconds when a periodic callback runs through all active
messages and checks for prolonged requests. This is also the default timeout for any new calls. ``timeout`` must not be
``0``.
Optionally pass any object or module with ``compress`` and ``decompress`` methods as the ``compression`` parameter to
compress messages. The module must implement the same algorithm used on the worker service. By default, messages are not
compressed.
Optionally pass any object or module with ``dumps`` and ``loads`` methods that convert an ``object`` to and from a
``str`` to replace the default ``cPickle`` serialization with a protocol of your choice.
Pass ``serialization_mime`` to set the ``Content-Type`` header for worker requests.
Use ``auto_retry_count`` to specify whether or not messages should be retried by default. Retrying messages can cause substantial
congestion in your worker service. Use with caution.
'''
def __init__(self, address, timeout=10.0, compression=None, serialization=None, serialization_mime='application/pickle', auto_retry_count=0):
if not address:
self.active_connections = set()
elif isinstance(address, str):
self.active_connections = {i.strip() for i in address.split(',')}
else:
self.active_connections = set(address)
self.__update_addresses()
self.__connection_lock = Lock()
self.__active_requests = {}
self.mime = serialization_mime
self.auto_retry_count = auto_retry_count
self.timeout = timeout
self.loads = serialization and serialization.loads or pickle.loads
self.dumps = serialization and serialization.dumps or pickle.dumps
self.compress = compression and compression.compress or (lambda x: x)
self.decompress = compression and compression.decompress or (lambda x: x)
def handle_response(self, request, response):
if request.request_id not in self.__active_requests:
logging.info("Received response for unknown request %s - it has likely already been answered." % request.request_id)
return
try:
del(self.__active_requests[request.request_id])
if response.error:
if isinstance(response.error, HTTPError):
if response.error.code == 599: #tornado special
request.future.set_exception(response.error)
else:
request.future.set_result(self.loads(response.body))
else:
request.future.set_exception(response.error)
return
request.future.set_result(self.loads(response.body))
except Exception as e:
self.log_error(e)
request.future.set_exc_info(exc_info())
def handle_timeout(self, request):
if request.request_id not in self.__active_requests:
return #answered
if request.retry_count <= 0:
request.future.set_exception(TotoException(-1, "Timeout"))
return
request.retry_count -= 1
request.run_request(self.__next_endpoint())
if request.retry_count and request.timeout:
IOLoop.current().add_timeout(time() + request.timeout, self.handle_timeout, request)
[docs] def invoke(self, method, parameters={}, timeout=None, auto_retry_count=None, **kwargs):
'''Invoke a ``method`` to be run on a remote worker process with the given ``parameters``. If specified, ``callback`` will be
invoked with any response from the remote worker. By default the worker will timeout or retry based on the settings of the
current ``WorkerConnection`` but ``timeout`` and ``auto_retry_count`` can be used for invocation specific behavior.
``invoke()`` returns a future that may be used to yield the result.
Alternatively, you can invoke methods with ``WorkerConnection.<module>.<method>(*args, **kwargs)``
where ``"<module>.<method>"`` will be passed as the ``method`` argument to ``invoke()``.
'''
headers = {'Content-Type': self.mime}
body = self.compress(self.dumps({'method': method, 'parameters': parameters}))
timeout = timeout if timeout is not None else self.timeout
auto_retry_count = auto_retry_count if auto_retry_count is not None else self.auto_retry_count
future = Future()
request = _Request(headers, body, timeout, auto_retry_count, future, self.handle_response)
self.__active_requests[request.request_id] = request
request.run_request(self.__next_endpoint())
if auto_retry_count and timeout:
IOLoop.current().add_timeout(time() + timeout, self.handle_timeout, request)
return future
def __update_addresses(self):
ordered_connections = list(self.active_connections)
shuffle(ordered_connections)
self.ordered_connections = ordered_connections
self.next_connection_index = 0
[docs] def add_connection(self, address):
'''Connect to the worker at ``address``. Worker invocations will be round robin load balanced between all connected workers.'''
with self.__connection_lock:
self.active_connections.add(address)
self.__update_addresses()
[docs] def remove_connection(self, address):
'''Disconnect from the worker at ``address``. Worker invocations will be round robin load balanced between all connected workers.'''
with self.__connection_lock:
self.active_connections.remove(address)
self.__update_addresses()
[docs] def set_connections(self, addresses):
'''A convenience method to set the connected addresses. A connection will be made to any new address included in the ``addresses``
enumerable and any currently connected address not included in ``addresses`` will be disconnected. If an address in ``addresses``
is already connected, it will not be affected.
'''
with self.__connection_lock:
self.active_connections = set(addresses)
self.__update_addresses()
def __next_endpoint(self):
with self.__connection_lock:
if self.next_connection_index >= len(self.ordered_connections):
raise TotoException(-1, "No active connections")
connection = self.ordered_connections[self.next_connection_index]
self.next_connection_index += 1
if self.next_connection_index >= len(self.ordered_connections):
self.next_connection_index = 0
return connection
def __len__(self):
return len(self.__active_requests)
@classmethod
[docs] def instance(cls):
'''Returns the default instance of ``HTTPWorkerConnection`` as configured by the options prefixed
with ``worker_``, instantiating it if necessary. Import the ``workerconnection`` module within
your ``TotoService`` and run it with ``--help`` to see all available options.
'''
if not hasattr(cls, '_instance'):
cls._instance = cls(options.worker_address, timeout=options.worker_timeout, compression=options.worker_compression_module and __import__(options.worker_compression_module), serialization=options.worker_serialization_module and __import__(options.worker_serialization_module), serialization_mime=options.worker_serialization_mime, auto_retry_count=options.worker_retry_count)
return cls._instance