Toto 1.0.10 documentation

toto.worker

Contents

Source code for toto.worker

'''The Toto worker and worker connection classes are designed to help build RPC systems,
allowing you to pass CPU intensive work to other processeses or machines. Workers
were originally designed for use with the Toto server, making it easy to perform
long running tasks without effecting the server's responsiveness, but they have been
designed to be used independently and have no direct ties to the web server
architecture.

``TotoWorkers`` and ``WorkerConnections`` use ZMQ for messaging and require the ``pyzmq`` module.

The ``TotoWorkerService`` has a built in message router that will round-robin balance incoming
messages. The router can be disabled through configuration if only one worker process is needed.
Alternatively, the router can be configured to run without any worker processes, allowing multiple
machines to share a common router.

Most of the time you'll only need this script to start your server::

  from toto.worker import TotoWorkerService
  
  TotoWorkerService('settings.conf').run()

Methods, startup functions and databases can all be configured with the conf file.

Run your startup script with --help to see all available options.
'''

import os
import zmq
from zmq.devices.basedevice import ProcessDevice
import tornado
from tornado.options import options
import logging
import zlib
import cPickle as pickle
import sys
import time
from threading import Thread
from multiprocessing import Process, cpu_count
from toto.service import TotoService, process_count, pid_path
from toto.dbconnection import configured_connection
from exceptions import *
from toto.options import safe_define

safe_define("method_module", default='methods', help="The root module to use for method lookup")
safe_define("remote_event_receivers", type=str, help="A comma separated list of remote event address that this event manager should connect to. e.g.: 'tcp://192.168.1.2:8889'", multiple=True)
safe_define("event_init_module", default=None, type=str, help="If defined, this module's 'invoke' function will be called with the EventManager instance after the main event handler is registered (e.g.: myevents.setup)")
safe_define("startup_function", default=None, type=str, help="An optional function to run on startup - e.g. module.function. The function will be called for each worker process after it is configured and before it starts listening for tasks with the named parameters worker and db_connection.")
safe_define("worker_bind_address", default="tcp://*:55555", help="The service will bind to this address with a zmq PULL socket and listen for incoming tasks. Tasks will be load balanced to all workers. If this is set to an empty string, workers will connect directly to worker_socket_address.")
safe_define("worker_socket_address", default="ipc:///tmp/workerservice.sock", help="The load balancer will use this address to coordinate tasks between local workers")
safe_define("control_socket_address", default="ipc:///tmp/workercontrol.sock", help="Workers will subscribe to messages on this socket and listen for control commands. If this is an empty string, the command option will have no effect")
safe_define("command", type=str, metavar='status|shutdown', help="Specify a command to send to running workers on the control socket")
safe_define("compression_module", type=str, help="The module to use for compressing and decompressing messages. The module must have 'decompress' and 'compress' methods. If not specified, no compression will be used. You can also set worker.compress and worker.decompress in your startup method for increased flexibility")
safe_define("serialization_module", type=str, help="The module to use for serializing and deserializing messages. The module must have 'dumps' and 'loads' methods. If not specified, cPickle will be used. You can also set worker.dumps and worker.loads in your startup method for increased flexibility")

[docs]class TotoWorkerService(TotoService): '''Instances can be configured in three ways: 1. (Most common) Pass the path to a config file as the first parameter to the constructor. 2. Pass config parameters as command line arguments to the initialization script. 3. Pass keyword arguments to the constructor. Precidence is as follows: Keyword args, config file, command line ''' def __init__(self, conf_file=None, **kwargs): module_options = {'method_module', 'event_init_module'} function_options = {'startup_function'} original_argv, sys.argv = sys.argv, [i for i in sys.argv if i.strip('-').split('=')[0] in module_options] self._load_options(conf_file, **{i: kwargs[i] for i in kwargs if i in module_options}) modules = {getattr(options, i) for i in module_options if getattr(options, i)} for module in modules: __import__(module) function_modules = {getattr(options, i).rsplit('.', 1)[0] for i in function_options if getattr(options, i)} for module in function_modules: __import__(module) sys.argv = original_argv #clear root logger handlers to prevent duplicate logging if user has specified a log file super(TotoWorkerService, self).__init__(conf_file, **kwargs) #clear method_module references so we can fully reload with new options for module in modules: for i in (m for m in sys.modules.keys() if m.startswith(module)): del sys.modules[i] for module in function_modules: for i in (m for m in sys.modules.keys() if m.startswith(module)): del sys.modules[i] #prevent the reloaded module from re-defining options define, tornado.options.define = tornado.options.define, lambda *args, **kwargs: None self.__event_init = options.event_init_module and __import__(options.event_init_module) or None self.__method_module = options.method_module and __import__(options.method_module) or None tornado.options.define = define def prepare(self): self.balancer = None if options.worker_bind_address: self.balancer = ProcessDevice(zmq.QUEUE, zmq.ROUTER, zmq.DEALER) self.balancer.daemon = True self.balancer.bind_in(options.worker_bind_address) self.balancer.bind_out(options.worker_socket_address) self.balancer.setsockopt_in(zmq.IDENTITY, 'ROUTER') self.balancer.setsockopt_out(zmq.IDENTITY, 'DEALER') self.balancer.start() if options.daemon: with open(pid_path(0), 'wb') as f: f.write(str(self.balancer.launcher.pid)) count = options.processes if options.processes >= 0 else cpu_count() if count == 0: print 'Starting load balancer. Listening on "%s". Routing to "%s"' % (options.worker_bind_address, options.worker_socket_address) else: print "Starting %s worker process%s. %s." % (count, count > 1 and 'es' or '', options.worker_bind_address and ('Listening on "%s"' % options.worker_bind_address) or ('Connecting to "%s"' % options.worker_socket_address)) def main_loop(self): db_connection = configured_connection() if options.remote_event_receivers: from toto.events import EventManager event_manager = EventManager.instance() if options.remote_instances: for address in options.remote_event_receivers.split(','): event_manager.register_server(address) init_module = self.__event_init if init_module: init_module.invoke(event_manager) serialization = options.serialization_module and __import__(options.serialization_module) or pickle compression = options.compression_module and __import__(options.compression_module) worker = TotoWorker(self.__method_module, options.worker_socket_address, db_connection, compression, serialization) if options.startup_function: startup_path = options.startup_function.rsplit('.') __import__(startup_path[0]).__dict__[startup_path[1]](worker=worker, db_connection=db_connection) worker.start() def send_worker_command(self, command): if options.control_socket_address: socket = zmq.Context().socket(zmq.PUB) socket.bind(options.control_socket_address) time.sleep(1) socket.send_string('command %s' % command) print "Sent command: %s" % options.command
[docs] def run(self): if options.command: self.send_worker_command(options.command) return super(TotoWorkerService, self).run()
[docs]class TotoWorker(): '''The worker is responsible for processing all RPC calls. An instance will be initialized for each incoming request. You can set the module to use for method delegation via the ``method_module`` parameter. Methods are modules that contain an invoke function:: def invoke(handler, parameters) The request worker instance will be passed as the first parameter to the invoke function and provides access to the server's database connection. Request parameters will be passed as the second argument to the invoke function. Any value returned from a method invocation will be sent to the caller, closing the message->response cycle. If you only need to let the caller know that the task has begun, you should decorate your ``invoke`` function with ``@toto.invocation.asynchronous`` to send a response before processing begins. ''' def __init__(self, method_module, socket_address, db_connection, compression=None, serialization=None): self.context = zmq.Context() self.socket_address = socket_address self.method_module = method_module self.db_connection = db_connection self.db = db_connection and db_connection.db or None self.status = 'Initialized' self.running = False self.compress = compression and compression.compress or (lambda x: x) self.decompress = compression and compression.decompress or (lambda x: x) self.loads = serialization and serialization.loads or pickle.loads self.dumps = serialization and serialization.dumps or pickle.dumps if options.debug: from traceback import format_exc def error_info(self, e): if not isinstance(e, TotoException): e = TotoException(ERROR_SERVER, str(e)) logging.error('%s\n%s\n' % (e, format_exc())) return e.__dict__ TotoWorker.error_info = error_info def error_info(self, e): if not isinstance(e, TotoException): e = TotoException(ERROR_SERVER, str(e)) logging.error(str(e)) return e.__dict__ def log_status(self): logging.info('Pid: %s status: %s' % (os.getpid(), self.status)) def __monitor_control(self, address=options.control_socket_address): def monitor(): socket = self.context.socket(zmq.SUB) socket.setsockopt(zmq.SUBSCRIBE, 'command') socket.connect(address) while self.running: try: command = socket.recv().split(' ', 1)[1] logging.info("Received command: %s" % command) if command == 'shutdown': self.running = False self.context.term() return elif command == 'status': self.log_status() except Exception as e: self.error_info(e) if address: thread = Thread(target=monitor) thread.daemon = True thread.start() def start(self): self.running = True self.__monitor_control() socket = self.context.socket(zmq.REP) socket.connect(self.socket_address) pending_reply = False while self.running: try: self.status = 'Listening' message = socket.recv_multipart() pending_reply = True message_id = message[0] data = self.loads(self.decompress(message[1])) logging.info('Received Task %s: %s' % (message_id, data['method'])) method = self.method_module for i in data['method'].split('.'): method = getattr(method, i) if hasattr(method.invoke, 'asynchronous'): socket.send_multipart((message_id,)) pending_reply = False self.status = 'Working' method.invoke(self, data['parameters']) else: self.status = 'Working' response = method.invoke(self, data['parameters']) socket.send_multipart((message_id, self.compress(self.dumps(response)))) pending_reply = False except Exception as e: if pending_reply: socket.send_multipart((message_id, self.compress(self.dumps({'error': self.error_info(e)})))) self.status = 'Finished' self.log_status()

Contents