Workers¶
spawn_worker¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
self.WORKERS[pid] = worker
return pid
# Process Child
worker_pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker_pid)
if self.cfg.reuseport:
worker.sockets = create_sockets(self.cfg, self.log)
listeners_str = ",".join([str(l) for l in worker.sockets])
self.log.info("Listening at: %s (%s) using reuseport",
listeners_str,
worker_pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
except AppImportError as e:
self.log.debug("Exception while loading the application: \n%s",
traceback.format_exc())
print("%s" % e, file=sys.stderr)
sys.stderr.flush()
sys.exit(self.APP_LOAD_ERROR)
except:
self.log.exception("Exception in worker process:\n%s",
traceback.format_exc())
if not worker.booted:
sys.exit(self.WORKER_BOOT_ERROR)
sys.exit(-1)
finally:
self.log.info("Worker exiting (pid: %s)", worker_pid)
try:
worker.tmp.close()
self.cfg.worker_exit(self, worker)
except:
pass
|
首先, 实例化 worker_class
, 执行 pre_fork
这个钩子, 然后就是fork子进程, 在父进程中记录
worker进程的pid和worker实例, self.WORKERS[pid] = worker
.
在子进程中, 设置进程名记录logger等, 然后执行 post_fork
钩子, 最后这个子进程进入自己的处理函数,
worker.init_process
.
worker¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 | class Worker(object):
SIGNALS = [getattr(signal, "SIG%s" % x)
for x in "ABRT HUP QUIT INT TERM USR1 USR2 WINCH CHLD".split()]
PIPE = []
def __init__(self, age, ppid, sockets, app, timeout, cfg, log):
"""\
This is called pre-fork so it shouldn't do anything to the
current process. If there's a need to make process wide
changes you'll want to do that in ``self.init_process()``.
"""
self.age = age
self.ppid = ppid
self.sockets = sockets
self.app = app
self.timeout = timeout
self.cfg = cfg
self.booted = False
self.aborted = False
self.reloader = None
self.nr = 0
jitter = randint(0, cfg.max_requests_jitter)
self.max_requests = cfg.max_requests + jitter or MAXSIZE
self.alive = True
self.log = log
self.tmp = WorkerTmp(cfg)
def __str__(self):
return "<Worker %s>" % self.pid
@property
def pid(self):
return os.getpid()
def notify(self):
"""\
Your worker subclass must arrange to have this method called
once every ``self.timeout`` seconds. If you fail in accomplishing
this task, the master process will murder your workers.
"""
self.tmp.notify()
def run(self):
"""\
This is the mainloop of a worker process. You should override
this method in a subclass to provide the intended behaviour
for your particular evil schemes.
"""
raise NotImplementedError()
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
# start the reloader
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
os.kill(self.pid, signal.SIGQUIT)
self.reloader = Reloader(callback=changed).start()
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)
# Prevent fd inheritance
[util.close_on_exec(s) for s in self.sockets]
util.close_on_exec(self.tmp.fileno())
self.log.close_on_exec()
self.init_signals()
self.wsgi = self.app.wsgi()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
def handle_usr1(self, sig, frame):
self.log.reopen_files()
def handle_exit(self, sig, frame):
self.alive = False
def handle_quit(self, sig, frame):
self.alive = False
# worker_int callback
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
def handle_abort(self, sig, frame):
self.alive = False
self.cfg.worker_abort(self)
sys.exit(1)
def handle_error(self, req, client, addr, exc):
request_start = datetime.now()
addr = addr or ('', -1) # unix socket case
if isinstance(exc, (InvalidRequestLine, InvalidRequestMethod,
InvalidHTTPVersion, InvalidHeader, InvalidHeaderName,
LimitRequestLine, LimitRequestHeaders,
InvalidProxyLine, ForbiddenProxyRequest)):
status_int = 400
reason = "Bad Request"
if isinstance(exc, InvalidRequestLine):
mesg = "Invalid Request Line '%s'" % str(exc)
elif isinstance(exc, InvalidRequestMethod):
mesg = "Invalid Method '%s'" % str(exc)
elif isinstance(exc, InvalidHTTPVersion):
mesg = "Invalid HTTP Version '%s'" % str(exc)
elif isinstance(exc, (InvalidHeaderName, InvalidHeader,)):
mesg = "%s" % str(exc)
if not req and hasattr(exc, "req"):
req = exc.req # for access log
elif isinstance(exc, LimitRequestLine):
mesg = "%s" % str(exc)
elif isinstance(exc, LimitRequestHeaders):
mesg = "Error parsing headers: '%s'" % str(exc)
elif isinstance(exc, InvalidProxyLine):
mesg = "'%s'" % str(exc)
elif isinstance(exc, ForbiddenProxyRequest):
reason = "Forbidden"
mesg = "Request forbidden"
status_int = 403
msg = "Invalid request from ip={ip}: {error}"
self.log.debug(msg.format(ip=addr[0], error=str(exc)))
else:
self.log.exception("Error handling request")
status_int = 500
reason = "Internal Server Error"
mesg = ""
if req is not None:
request_time = datetime.now() - request_start
environ = default_environ(req, client, self.cfg)
environ['REMOTE_ADDR'] = addr[0]
environ['REMOTE_PORT'] = str(addr[1])
resp = Response(req, client, self.cfg)
resp.status = "%s %s" % (status_int, reason)
resp.response_length = len(mesg)
self.log.access(resp, req, environ, request_time)
try:
util.write_error(client, status_int, reason, mesg)
except:
self.log.debug("Failed to send error message.")
def handle_winch(self, sig, fname):
# Ignore SIGWINCH in worker. Fixes a crash on OpenBSD.
return
|
这个类中, 主要是初始化了一些信号
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
主要注意的是, signal.SIGTERN
这个信号, 它的处理函数是 handle_exit
.
def handle_exit(self, sig, frame):
self.alive = False
只是将 worker 的状态标志为 self.alive = False
. 这就会使得worker能够在退出前处理未完成的请求.
在 init_process
的最后, 调用 self.run()
函数运行worker进程. self.run()
具体的实现需要看
worker的类型 self.worker_class
. 支持的类型定义在 gunicorn.workers.__init__.SUPPORTED_WORKERS
里.