'''``TotoService`` can be used to write general processes that take advantage of the process creation/management features
used by ``TotoServer`` and ``TotoWorker`` - the two built in subclasses of ``TotoService``. ``TotoService`` subclasses can be
run with the ``--start`` (or ``--stop``) and ``--processes`` options
to start the service as a daemon process or run multiple instances simultaneously.
To run a subclass of ``TotoService`` create a script like this::
from toto.service import TotoService
class MyServiceSubclass(TotoService):
def main_loop(self):
while 1:
#run some job continuously
MyServiceSubclass('conf_file.conf').run()
'''
import os
import tornado
import logging
from tornado.options import define, options
from multiprocessing import Process, cpu_count
from time import sleep
define("daemon", metavar='start|stop|restart', help="Start, stop or restart this script as a daemon process. Use this setting in conf files, the shorter start, stop, restart aliases as command line arguments. Requires the multiprocessing module.")
define("processes", default=1, help="The number of daemon processes to run")
define("pidfile", default="toto.daemon.pid", help="The path to the pidfile for daemon processes will be named <path>.<num>.pid (toto.daemon.pid -> toto.daemon.0.pid)")
define("start", default=False, help="Alias for daemon=start for command line usage - overrides daemon setting.")
define("stop", default=False, help="Alias for daemon=start for command line usage - overrides daemon setting.")
define("restart", default=False, help="Alias for daemon=start for command line usage - overrides daemon setting.")
define("nodaemon", default=False, help="Alias for daemon='' for command line usage - overrides daemon setting.")
define("debug", default=False, help="Set this to true to prevent Toto from nicely formatting generic errors. With debug=True, errors will print to the command line")
#convert p to the absolute path, insert ".i" before the last "." or at the end of the path
[docs]def pid_path(i):
'''Used to generate PID files for daemonized TotoServices. Child processes with PID files
matching the paths returned by this function will be killed with SIGTERM when the server daemon process is stopped using the
``--stop`` or ``--daemon=stop`` arguments::
proc = Process()
proc.start()
with open(pid_path(process_count() + 1), 'wb') as f:
f.write(str(proc.pid))
Note that ``i`` must be an integer.
'''
(d, f) = os.path.split(os.path.abspath(options.pidfile))
components = f.rsplit('.', 1)
f = '%s.%s' % (components[0], i)
if len(components) > 1:
f += "." + components[1]
return os.path.join(d, f)
[docs]def process_count():
'''Returns the number of service processes that will run with the current configuration. This will match
the ``--processes=n`` option if n >= 0. Otherwise ``multiprocessing.cpu_count()`` will be used.
'''
return options.processes if options.processes >= 0 else cpu_count()
[docs]class TotoService(object):
'''Subclass ``TotoService`` to create a service that can be easily daemonised or
ran in multiple processes simultaneously.
'''
def _load_options(self, conf_file=None, final=True, **kwargs):
for k in kwargs:
setattr(options, k, kwargs[k])
if conf_file:
tornado.options.parse_config_file(conf_file, final=False)
tornado.options.parse_command_line(final=final)
if options.start:
setattr(options, 'daemon', 'start')
elif options.stop:
setattr(options, 'daemon', 'stop')
elif options.restart:
setattr(options, 'daemon', 'restart')
elif options.nodaemon:
setattr(options, 'daemon', '')
def __init__(self, conf_file=None, **kwargs):
if options.log_file_prefix:
root_logger = logging.getLogger()
for handler in [h for h in root_logger.handlers]:
root_logger.removeHandler(handler)
self._load_options(conf_file, **kwargs)
def __run_service(self, pidfile=None):
def start_server_process(pidfile, service_id=0):
self.service_id = service_id
self.main_loop()
if pidfile:
os.remove(pidfile)
count = process_count()
processes = []
pidfiles = options.daemon and [pid_path(i) for i in xrange(1, count + 1)] or []
self.prepare()
for i in xrange(count):
proc = Process(target=start_server_process, args=(pidfiles and pidfiles[i], i))
proc.daemon = True
processes.append(proc)
proc.start()
else:
print "Starting %s %s process%s." % (count, self.__class__.__name__, count > 1 and 'es' or '')
if options.daemon:
i = 1
for proc in processes:
with open(pidfiles[i - 1], 'w') as f:
f.write(str(proc.pid))
i += 1
for proc in processes:
proc.join()
self.finish()
if pidfile:
os.remove(pidfile)
[docs] def run(self):
'''Start the service. Depending on the initialization options, this may run more than one
service process.
'''
if options.daemon:
import multiprocessing
import signal, re
pattern = pid_path(r'\d+').replace('.', r'\.')
piddir = os.path.dirname(pattern).replace('\\.', '.')
master_pidfile = pid_path('master')
if options.daemon == 'stop' or options.daemon == 'restart':
existing_pidfiles = [pidfile for pidfile in (os.path.join(piddir, fn) for fn in os.listdir(piddir)) if re.match(pattern, pidfile)]
try:
with open(master_pidfile, 'rb') as f:
master_pid = int(f.read())
except:
master_pid = 0
for pidfile in existing_pidfiles:
try:
with open(pidfile, 'r') as f:
pid = int(f.read())
try:
os.kill(pid, signal.SIGTERM)
except OSError as e:
if e.errno != 3:
raise
print "Stopped %s %s" % (self.__class__.__name__, pid)
os.remove(pidfile)
except (OSError, IOError) as e:
if e.errno != 2:
raise
if not existing_pidfiles and master_pid:
try:
os.kill(master_pid, signal.SIGTERM)
except OSError as e:
if e.errno != 3:
raise
os.remove(master_pidfile)
print 'Force stopped %s %s' % (self.__class__.__name__, master_pid)
else:
while os.path.exists(master_pidfile):
sleep(0.01)
if options.daemon == 'start' or options.daemon == 'restart':
existing_pidfiles = [pidfile for pidfile in (os.path.join(piddir, fn) for fn in os.listdir(piddir)) if re.match(pattern.replace(r'\d', r'[\w\d]'), pidfile)]
if existing_pidfiles:
print "Not starting %s, pidfile%s exist%s at %s" % (self.__class__.__name__, len(existing_pidfiles) > 1 and 's' or '', len(existing_pidfiles) == 1 and 's' or '', ', '.join(existing_pidfiles))
return
#fork and only continue on child process
if not os.fork():
#detach from controlling terminal
os.setsid()
#fork again and write pid to pidfile from parent, run server on child
pid = os.fork()
if pid:
with open(master_pidfile, 'w') as f:
f.write(str(pid))
else:
self.__run_service(master_pidfile)
if options.daemon not in ('start', 'stop', 'restart'):
print "Invalid daemon option: " + options.daemon
else:
self.__run_service()
[docs] def prepare(self):
'''Override this method in a ``TotoService`` subclass and it will be called before any service processes
are created. You can set instance variables here and they will be available in ``main_loop()`` but be
careful that any retained objects are safe to access across processes'''
pass
[docs] def main_loop(self):
'''Subclass ``TotoService`` and override ``main_loop()`` with your desired functionality.'''
raise NotImplementedError()
[docs] def finish(self):
'''Override this method in a ``TotoService`` subclass and it will be called after all service processes
have exited (after each ``main_loop()`` has returned).
Note: This method will only be called once and only after all child processes have finished.'''
pass