123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652 |
- # -*- coding: utf-8 -
- #
- # This file is part of gunicorn released under the MIT license.
- # See the NOTICE for more information.
- import errno
- import os
- import random
- import select
- import signal
- import sys
- import time
- import traceback
- from gunicorn.errors import HaltServer, AppImportError
- from gunicorn.pidfile import Pidfile
- from gunicorn import sock, systemd, util
- from gunicorn import __version__, SERVER_SOFTWARE
- class Arbiter(object):
- """
- Arbiter maintain the workers processes alive. It launches or
- kills them if needed. It also manages application reloading
- via SIGHUP/USR2.
- """
- # A flag indicating if a worker failed to
- # to boot. If a worker process exist with
- # this error code, the arbiter will terminate.
- WORKER_BOOT_ERROR = 3
- # A flag indicating if an application failed to be loaded
- APP_LOAD_ERROR = 4
- START_CTX = {}
- LISTENERS = []
- WORKERS = {}
- PIPE = []
- # I love dynamic languages
- SIG_QUEUE = []
- SIGNALS = [getattr(signal, "SIG%s" % x)
- for x in "HUP QUIT INT TERM TTIN TTOU USR1 USR2 WINCH".split()]
- SIG_NAMES = dict(
- (getattr(signal, name), name[3:].lower()) for name in dir(signal)
- if name[:3] == "SIG" and name[3] != "_"
- )
- def __init__(self, app):
- os.environ["SERVER_SOFTWARE"] = SERVER_SOFTWARE
- self._num_workers = None
- self._last_logged_active_worker_count = None
- self.log = None
- self.setup(app)
- self.pidfile = None
- self.systemd = False
- self.worker_age = 0
- self.reexec_pid = 0
- self.master_pid = 0
- self.master_name = "Master"
- cwd = util.getcwd()
- args = sys.argv[:]
- args.insert(0, sys.executable)
- # init start context
- self.START_CTX = {
- "args": args,
- "cwd": cwd,
- 0: sys.executable
- }
- def _get_num_workers(self):
- return self._num_workers
- def _set_num_workers(self, value):
- old_value = self._num_workers
- self._num_workers = value
- self.cfg.nworkers_changed(self, value, old_value)
- num_workers = property(_get_num_workers, _set_num_workers)
- def setup(self, app):
- self.app = app
- self.cfg = app.cfg
- if self.log is None:
- self.log = self.cfg.logger_class(app.cfg)
- # reopen files
- if 'GUNICORN_FD' in os.environ:
- self.log.reopen_files()
- self.worker_class = self.cfg.worker_class
- self.address = self.cfg.address
- self.num_workers = self.cfg.workers
- self.timeout = self.cfg.timeout
- self.proc_name = self.cfg.proc_name
- self.log.debug('Current configuration:\n{0}'.format(
- '\n'.join(
- ' {0}: {1}'.format(config, value.value)
- for config, value
- in sorted(self.cfg.settings.items(),
- key=lambda setting: setting[1]))))
- # set enviroment' variables
- if self.cfg.env:
- for k, v in self.cfg.env.items():
- os.environ[k] = v
- if self.cfg.preload_app:
- self.app.wsgi()
- def start(self):
- """\
- Initialize the arbiter. Start listening and set pidfile if needed.
- """
- self.log.info("Starting gunicorn %s", __version__)
- if 'GUNICORN_PID' in os.environ:
- self.master_pid = int(os.environ.get('GUNICORN_PID'))
- self.proc_name = self.proc_name + ".2"
- self.master_name = "Master.2"
- self.pid = os.getpid()
- if self.cfg.pidfile is not None:
- pidname = self.cfg.pidfile
- if self.master_pid != 0:
- pidname += ".2"
- self.pidfile = Pidfile(pidname)
- self.pidfile.create(self.pid)
- self.cfg.on_starting(self)
- self.init_signals()
- if not self.LISTENERS:
- fds = None
- listen_fds = systemd.listen_fds()
- if listen_fds:
- self.systemd = True
- fds = range(systemd.SD_LISTEN_FDS_START,
- systemd.SD_LISTEN_FDS_START + listen_fds)
- elif self.master_pid:
- fds = []
- for fd in os.environ.pop('GUNICORN_FD').split(','):
- fds.append(int(fd))
- self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
- listeners_str = ",".join([str(l) for l in self.LISTENERS])
- self.log.debug("Arbiter booted")
- self.log.info("Listening at: %s (%s)", listeners_str, self.pid)
- self.log.info("Using worker: %s", self.cfg.worker_class_str)
- systemd.sd_notify("READY=1\nSTATUS=Gunicorn arbiter booted", self.log)
- # check worker class requirements
- if hasattr(self.worker_class, "check_config"):
- self.worker_class.check_config(self.cfg, self.log)
- self.cfg.when_ready(self)
- def init_signals(self):
- """\
- Initialize master signal handling. Most of the signals
- are queued. Child signals only wake up the master.
- """
- # close old PIPE
- for p in self.PIPE:
- os.close(p)
- # initialize the pipe
- self.PIPE = pair = os.pipe()
- for p in pair:
- util.set_non_blocking(p)
- util.close_on_exec(p)
- self.log.close_on_exec()
- # initialize all signals
- for s in self.SIGNALS:
- signal.signal(s, self.signal)
- signal.signal(signal.SIGCHLD, self.handle_chld)
- def signal(self, sig, frame):
- if len(self.SIG_QUEUE) < 5:
- self.SIG_QUEUE.append(sig)
- self.wakeup()
- def run(self):
- "Main master loop."
- self.start()
- util._setproctitle("master [%s]" % self.proc_name)
- try:
- self.manage_workers()
- while True:
- self.maybe_promote_master()
- sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
- if sig is None:
- self.sleep()
- self.murder_workers()
- self.manage_workers()
- continue
- if sig not in self.SIG_NAMES:
- self.log.info("Ignoring unknown signal: %s", sig)
- continue
- signame = self.SIG_NAMES.get(sig)
- handler = getattr(self, "handle_%s" % signame, None)
- if not handler:
- self.log.error("Unhandled signal: %s", signame)
- continue
- self.log.info("Handling signal: %s", signame)
- handler()
- self.wakeup()
- except (StopIteration, KeyboardInterrupt):
- self.halt()
- except HaltServer as inst:
- self.halt(reason=inst.reason, exit_status=inst.exit_status)
- except SystemExit:
- raise
- except Exception:
- self.log.info("Unhandled exception in main loop",
- exc_info=True)
- self.stop(False)
- if self.pidfile is not None:
- self.pidfile.unlink()
- sys.exit(-1)
- def handle_chld(self, sig, frame):
- "SIGCHLD handling"
- self.reap_workers()
- self.wakeup()
- def handle_hup(self):
- """\
- HUP handling.
- - Reload configuration
- - Start the new worker processes with a new configuration
- - Gracefully shutdown the old worker processes
- """
- self.log.info("Hang up: %s", self.master_name)
- self.reload()
- def handle_term(self):
- "SIGTERM handling"
- raise StopIteration
- def handle_int(self):
- "SIGINT handling"
- self.stop(False)
- raise StopIteration
- def handle_quit(self):
- "SIGQUIT handling"
- self.stop(False)
- raise StopIteration
- def handle_ttin(self):
- """\
- SIGTTIN handling.
- Increases the number of workers by one.
- """
- self.num_workers += 1
- self.manage_workers()
- def handle_ttou(self):
- """\
- SIGTTOU handling.
- Decreases the number of workers by one.
- """
- if self.num_workers <= 1:
- return
- self.num_workers -= 1
- self.manage_workers()
- def handle_usr1(self):
- """\
- SIGUSR1 handling.
- Kill all workers by sending them a SIGUSR1
- """
- self.log.reopen_files()
- self.kill_workers(signal.SIGUSR1)
- def handle_usr2(self):
- """\
- SIGUSR2 handling.
- Creates a new arbiter/worker set as a fork of the current
- arbiter without affecting old workers. Use this to do live
- deployment with the ability to backout a change.
- """
- self.reexec()
- def handle_winch(self):
- """SIGWINCH handling"""
- if self.cfg.daemon:
- self.log.info("graceful stop of workers")
- self.num_workers = 0
- self.kill_workers(signal.SIGTERM)
- else:
- self.log.debug("SIGWINCH ignored. Not daemonized")
- def maybe_promote_master(self):
- if self.master_pid == 0:
- return
- if self.master_pid != os.getppid():
- self.log.info("Master has been promoted.")
- # reset master infos
- self.master_name = "Master"
- self.master_pid = 0
- self.proc_name = self.cfg.proc_name
- del os.environ['GUNICORN_PID']
- # rename the pidfile
- if self.pidfile is not None:
- self.pidfile.rename(self.cfg.pidfile)
- # reset proctitle
- util._setproctitle("master [%s]" % self.proc_name)
- def wakeup(self):
- """\
- Wake up the arbiter by writing to the PIPE
- """
- try:
- os.write(self.PIPE[1], b'.')
- except IOError as e:
- if e.errno not in [errno.EAGAIN, errno.EINTR]:
- raise
- def halt(self, reason=None, exit_status=0):
- """ halt arbiter """
- self.stop()
- self.log.info("Shutting down: %s", self.master_name)
- if reason is not None:
- self.log.info("Reason: %s", reason)
- if self.pidfile is not None:
- self.pidfile.unlink()
- self.cfg.on_exit(self)
- sys.exit(exit_status)
- def sleep(self):
- """\
- Sleep until PIPE is readable or we timeout.
- A readable PIPE means a signal occurred.
- """
- try:
- ready = select.select([self.PIPE[0]], [], [], 1.0)
- if not ready[0]:
- return
- while os.read(self.PIPE[0], 1):
- pass
- except (select.error, OSError) as e:
- # TODO: select.error is a subclass of OSError since Python 3.3.
- error_number = getattr(e, 'errno', e.args[0])
- if error_number not in [errno.EAGAIN, errno.EINTR]:
- raise
- except KeyboardInterrupt:
- sys.exit()
- def stop(self, graceful=True):
- """\
- Stop workers
- :attr graceful: boolean, If True (the default) workers will be
- killed gracefully (ie. trying to wait for the current connection)
- """
- unlink = (
- self.reexec_pid == self.master_pid == 0
- and not self.systemd
- and not self.cfg.reuse_port
- )
- sock.close_sockets(self.LISTENERS, unlink)
- self.LISTENERS = []
- sig = signal.SIGTERM
- if not graceful:
- sig = signal.SIGQUIT
- limit = time.time() + self.cfg.graceful_timeout
- # instruct the workers to exit
- self.kill_workers(sig)
- # wait until the graceful timeout
- while self.WORKERS and time.time() < limit:
- time.sleep(0.1)
- self.kill_workers(signal.SIGKILL)
- def reexec(self):
- """\
- Relaunch the master and workers.
- """
- if self.reexec_pid != 0:
- self.log.warning("USR2 signal ignored. Child exists.")
- return
- if self.master_pid != 0:
- self.log.warning("USR2 signal ignored. Parent exists.")
- return
- master_pid = os.getpid()
- self.reexec_pid = os.fork()
- if self.reexec_pid != 0:
- return
- self.cfg.pre_exec(self)
- environ = self.cfg.env_orig.copy()
- environ['GUNICORN_PID'] = str(master_pid)
- if self.systemd:
- environ['LISTEN_PID'] = str(os.getpid())
- environ['LISTEN_FDS'] = str(len(self.LISTENERS))
- else:
- environ['GUNICORN_FD'] = ','.join(
- str(l.fileno()) for l in self.LISTENERS)
- os.chdir(self.START_CTX['cwd'])
- # exec the process using the original environment
- os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
- def reload(self):
- old_address = self.cfg.address
- # reset old environment
- for k in self.cfg.env:
- if k in self.cfg.env_orig:
- # reset the key to the value it had before
- # we launched gunicorn
- os.environ[k] = self.cfg.env_orig[k]
- else:
- # delete the value set by gunicorn
- try:
- del os.environ[k]
- except KeyError:
- pass
- # reload conf
- self.app.reload()
- self.setup(self.app)
- # reopen log files
- self.log.reopen_files()
- # do we need to change listener ?
- if old_address != self.cfg.address:
- # close all listeners
- for l in self.LISTENERS:
- l.close()
- # init new listeners
- self.LISTENERS = sock.create_sockets(self.cfg, self.log)
- listeners_str = ",".join([str(l) for l in self.LISTENERS])
- self.log.info("Listening at: %s", listeners_str)
- # do some actions on reload
- self.cfg.on_reload(self)
- # unlink pidfile
- if self.pidfile is not None:
- self.pidfile.unlink()
- # create new pidfile
- if self.cfg.pidfile is not None:
- self.pidfile = Pidfile(self.cfg.pidfile)
- self.pidfile.create(self.pid)
- # set new proc_name
- util._setproctitle("master [%s]" % self.proc_name)
- # spawn new workers
- for _ in range(self.cfg.workers):
- self.spawn_worker()
- # manage workers
- self.manage_workers()
- def murder_workers(self):
- """\
- Kill unused/idle workers
- """
- if not self.timeout:
- return
- workers = list(self.WORKERS.items())
- for (pid, worker) in workers:
- try:
- if time.time() - worker.tmp.last_update() <= self.timeout:
- continue
- except (OSError, ValueError):
- continue
- if not worker.aborted:
- self.log.critical("WORKER TIMEOUT (pid:%s)", pid)
- worker.aborted = True
- self.kill_worker(pid, signal.SIGABRT)
- else:
- self.kill_worker(pid, signal.SIGKILL)
- def reap_workers(self):
- """\
- Reap workers to avoid zombie processes
- """
- try:
- while True:
- wpid, status = os.waitpid(-1, os.WNOHANG)
- if not wpid:
- break
- if self.reexec_pid == wpid:
- self.reexec_pid = 0
- else:
- # A worker was terminated. If the termination reason was
- # that it could not boot, we'll shut it down to avoid
- # infinite start/stop cycles.
- exitcode = status >> 8
- if exitcode == self.WORKER_BOOT_ERROR:
- reason = "Worker failed to boot."
- raise HaltServer(reason, self.WORKER_BOOT_ERROR)
- if exitcode == self.APP_LOAD_ERROR:
- reason = "App failed to load."
- raise HaltServer(reason, self.APP_LOAD_ERROR)
- if os.WIFSIGNALED(status):
- self.log.warning(
- "Worker with pid %s was terminated due to signal %s",
- wpid,
- os.WTERMSIG(status)
- )
- worker = self.WORKERS.pop(wpid, None)
- if not worker:
- continue
- worker.tmp.close()
- self.cfg.child_exit(self, worker)
- except OSError as e:
- if e.errno != errno.ECHILD:
- raise
- def manage_workers(self):
- """\
- Maintain the number of workers by spawning or killing
- as required.
- """
- if len(self.WORKERS) < self.num_workers:
- self.spawn_workers()
- workers = self.WORKERS.items()
- workers = sorted(workers, key=lambda w: w[1].age)
- while len(workers) > self.num_workers:
- (pid, _) = workers.pop(0)
- self.kill_worker(pid, signal.SIGTERM)
- active_worker_count = len(workers)
- if self._last_logged_active_worker_count != active_worker_count:
- self._last_logged_active_worker_count = active_worker_count
- self.log.debug("{0} workers".format(active_worker_count),
- extra={"metric": "gunicorn.workers",
- "value": active_worker_count,
- "mtype": "gauge"})
- def spawn_worker(self):
- self.worker_age += 1
- worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
- self.app, self.timeout / 2.0,
- self.cfg, self.log)
- self.cfg.pre_fork(self, worker)
- pid = os.fork()
- if pid != 0:
- worker.pid = pid
- self.WORKERS[pid] = worker
- return pid
- # Do not inherit the temporary files of other workers
- for sibling in self.WORKERS.values():
- sibling.tmp.close()
- # Process Child
- worker.pid = os.getpid()
- try:
- util._setproctitle("worker [%s]" % self.proc_name)
- self.log.info("Booting worker with pid: %s", worker.pid)
- self.cfg.post_fork(self, worker)
- worker.init_process()
- sys.exit(0)
- except SystemExit:
- raise
- except AppImportError as e:
- self.log.debug("Exception while loading the application",
- exc_info=True)
- print("%s" % e, file=sys.stderr)
- sys.stderr.flush()
- sys.exit(self.APP_LOAD_ERROR)
- except Exception:
- self.log.exception("Exception in worker process")
- if not worker.booted:
- sys.exit(self.WORKER_BOOT_ERROR)
- sys.exit(-1)
- finally:
- self.log.info("Worker exiting (pid: %s)", worker.pid)
- try:
- worker.tmp.close()
- self.cfg.worker_exit(self, worker)
- except Exception:
- self.log.warning("Exception during worker exit:\n%s",
- traceback.format_exc())
- def spawn_workers(self):
- """\
- Spawn new workers as needed.
- This is where a worker process leaves the main loop
- of the master process.
- """
- for _ in range(self.num_workers - len(self.WORKERS)):
- self.spawn_worker()
- time.sleep(0.1 * random.random())
- def kill_workers(self, sig):
- """\
- Kill all workers with the signal `sig`
- :attr sig: `signal.SIG*` value
- """
- worker_pids = list(self.WORKERS.keys())
- for pid in worker_pids:
- self.kill_worker(pid, sig)
- def kill_worker(self, pid, sig):
- """\
- Kill a worker
- :attr pid: int, worker pid
- :attr sig: `signal.SIG*` value
- """
- try:
- os.kill(pid, sig)
- except OSError as e:
- if e.errno == errno.ESRCH:
- try:
- worker = self.WORKERS.pop(pid)
- worker.tmp.close()
- self.cfg.worker_exit(self, worker)
- return
- except (KeyError, OSError):
- return
- raise
|