Mercurial > cgi-bin > hgweb.cgi > tincan
view tincan.py @ 13:6de828de4409 draft
Fix an introspection error.
author | David Barts <n5jrn@me.com> |
---|---|
date | Thu, 16 May 2019 11:42:56 -0700 |
parents | 496d43d551d2 |
children | 9d0497dc19f8 |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # As with Bottle, it's all in one big, ugly file. For now. # I m p o r t s import os, sys import ast import binascii from base64 import b16encode, b16decode import functools import importlib from inspect import isclass import io import py_compile from stat import S_ISDIR, S_ISREG from string import whitespace import bottle # E x c e p t i o n s class TinCanException(Exception): """ The parent class of all exceptions we raise. """ pass class TemplateHeaderError(TinCanException): """ Raised upon encountering a syntax error in the template headers. """ def __init__(self, message, line): super().__init__(message, line) self.message = message self.line = line def __str__(self): return "line {0}: {1}".format(self.line, self.message) class ForwardException(TinCanException): """ Raised to effect the flow control needed to do a forward (server-side redirect). It is ugly to do this, but other Python frameworks do and there seems to be no good alternative. """ def __init__(self, target): self.target = target class TinCanError(TinCanException): """ General-purpose exception thrown by TinCan when things go wrong, often when attempting to launch webapps. """ pass # T e m p l a t e s # # Template (.pspx) files. These are standard templates for a supported # template engine, but with an optional set of header lines that begin # with '#'. class TemplateFile(object): """ Parse a template file into a header part and the body part. The header is always a leading set of lines, each starting with '#', that is of the same format regardless of the template body. The template body varies depending on the selected templating engine. The body part has each header line replaced by a blank line. This preserves the overall line numbering when processing the body. The added newlines are normally stripped out before the rendered page is sent back to the client. """ _END = "#end" _LEND = len(_END) _WS = set(whitespace) def __init__(self, raw, encoding='utf-8'): if isinstance(raw, io.TextIOBase): self._do_init(raw) elif isinstance(raw, str): with open(raw, "r", encoding=encoding) as fp: self._do_init(fp) else: raise TypeError("Expecting a string or Text I/O object.") def _do_init(self, fp): self._hbuf = [] self._bbuf = [] self._state = self._header while True: line = fp.readline() if line == '': break self._state(line) self.header = ''.join(self._hbuf) self.body = ''.join(self._bbuf) def _header(self, line): if not line.startswith('#'): self._state = self._body self._state(line) return if line.startswith(self._END) and (len(line) == self._LEND or line[self._LEND] in self._WS): self._state = self._body self._hbuf.append(line) self._bbuf.append("\n") def _body(self, line): self._bbuf.append(line) class TemplateHeader(object): """ Parses and represents a set of header lines. """ _NAMES = [ "errors", "forward", "methods", "python", "template" ] _FNAMES = [ "hidden" ] def __init__(self, string): # Initialize our state for i in self._NAMES: setattr(self, i, None) for i in self._FNAMES: setattr(self, i, False) # Parse the string count = 0 nameset = set(self._NAMES + self._FNAMES) seen = set() lines = string.split("\n") if lines and lines[-1] == "": del lines[-1] for line in lines: # Get line count += 1 if not line.startswith("#"): raise TemplateHeaderError("Does not start with '#'.", count) try: rna, rpa = line.split(maxsplit=1) except ValueError: rna = line.rstrip() rpa = None # Get name, ignoring remarks. name = rna[1:] if name == "rem": continue if name == "end": break if name not in nameset: raise TemplateHeaderError("Invalid directive: {0!r}".format(rna), count) if name in seen: raise TemplateHeaderError("Duplicate {0!r} directive.".format(rna), count) seen.add(name) # Flags if name in self._FNAMES: setattr(self, name, True) continue # Get parameter if rpa is None: raise TemplateHeaderError("Missing parameter.", count) param = rpa.strip() for i in [ "'", '"']: if param.startswith(i) and param.endswith(i): param = ast.literal_eval(param) break # Update this object setattr(self, name, param) # C h a m e l e o n # # Support for Chameleon templates (the kind TinCan uses by default). class ChameleonTemplate(bottle.BaseTemplate): def prepare(self, **options): from chameleon import PageTemplate, PageTemplateFile if self.source: self.tpl = PageTemplate(self.source, encoding=self.encoding, **options) else: self.tpl = PageTemplateFile(self.filename, encoding=self.encoding, search_path=self.lookup, **options) def render(self, *args, **kwargs): for dictarg in args: kwargs.update(dictarg) _defaults = self.defaults.copy() _defaults.update(kwargs) return self.tpl.render(**_defaults) chameleon_template = functools.partial(bottle.template, template_adapter=ChameleonTemplate) chameleon_view = functools.partial(bottle.view, template_adapter=ChameleonTemplate) # U t i l i t i e s def _normpath(base, unsplit): """ Split, normalize and ensure a possibly relative path is absolute. First argument is a list of directory names, defining a base. Second argument is a string, which may either be relative to that base, or absolute. Only '/' is supported as a separator. """ scratch = unsplit.strip('/').split('/') if not unsplit.startswith('/'): scratch = base + scratch ret = [] for i in scratch: if i == '.': continue if i == '..': ret.pop() # may raise IndexError continue ret.append(i) return ret def _mangle(string): """ Turn a possibly troublesome identifier into a mangled one. """ first = True ret = [] for ch in string: if ch == '_' or not (ch if first else "x" + ch).isidentifier(): ret.append('_') ret.append(b16encode(ch.encode("utf-8")).decode("us-ascii")) else: ret.append(ch) first = False return ''.join(ret) # The TinCan class. Simply a Bottle webapp that contains a forward method, so # the code-behind can call request.app.forward(). class TinCan(bottle.Bottle): def forward(self, target): """ Forward this request to the specified target route. """ source = bottle.request.environ['PATH_INFO'] base = source.strip('/').split('/')[:-1] if bottle.request.environ.get(_FTYPE, False): raise TinCanError("{0}: forward from error page".format(source)) try: exc = ForwardException('/' + '/'.join(_normpath(base, target))) except IndexError as e: raise TinCanError("{0}: invalid forward to {1!r}".format(source, target)) from e raise exc # C o d e B e h i n d # # Represents the code-behind of one of our pages. This gets subclassed, of # course. class BasePage(object): """ The parent class of both error and normal pages' code-behind. """ def handle(self): """ This is the entry point for the code-behind logic. It is intended to be overridden. """ pass def export(self): """ Export template variables. The default behavior is to export all non-hidden non-callables that don't start with an underscore. This method can be overridden if a different behavior is desired. It should always return a dict or dict-like object. """ ret = { 'page': self } for name in dir(self): if name in self._HIDDEN or name.startswith('_'): continue value = getattr(self, name) if callable(value): continue ret[name] = value return ret class Page(BasePage): """ The code-behind for a normal page. """ # Non-private things we refuse to export anyhow. _HIDDEN = set([ "request", "response" ]) def __init__(self, req, resp): """ Constructor. This is a lightweight operation. """ self.request = req # app context is request.app in Bottle self.response = resp class ErrorPage(BasePage): """ The code-behind for an error page. """ _HIDDEN = set() def __init__(self, req, err): """ Constructor. This is a lightweight operation. """ self.request = req self.error = err # R o u t e s # # Represents a route in TinCan. Our launcher creates these on-the-fly based # on the files it finds. _ERRMIN = 400 _ERRMAX = 599 _PEXTEN = ".py" _TEXTEN = ".pspx" _FLOOP = "tincan.forwards" _FORIG = "tincan.origin" _FTYPE = "tincan.iserror" class _TinCanErrorRoute(object): """ A route to an error page. These don't get routes created for them, and are only reached if an error routes them there. Unless you create custom code-behind, only two variables are available to your template: request (bottle.Request) and error (bottle.HTTPError). """ def __init__(self, template, klass): self._template = template self._template.prepare() self._class = klass def __call__(self, e): bottle.request.environ[_FTYPE] = True obj = self._class(bottle.request, e) obj.handle() return self._template.render(obj.export()).lstrip('\n') class _TinCanRoute(object): """ A route created by the TinCan launcher. """ def __init__(self, launcher, name, subdir): self._fsroot = launcher.fsroot self._urlroot = launcher.urlroot self._name = name self._python = name + _PEXTEN self._fspath = os.path.join(launcher.fsroot, *subdir, name + _TEXTEN) self._urlpath = self._urljoin(launcher.urlroot, *subdir, name + _TEXTEN) self._origin = self._urlpath self._subdir = subdir self._seen = set() self._tclass = launcher.tclass self._app = launcher.app def launch(self): """ Launch a single page. """ # Build master and header objects, process #forward directives oheader = None while True: try: self._template = TemplateFile(self._fspath) except IOError as e: if oheader is not None: note = "{0}: invalid #forward: ".format(self._origin) else: note = "" raise TinCanError("{0}{1!s}".format(note, e)) from e try: self._header = TemplateHeader(self._template.header) except TemplateHeaderError as e: raise TinCanError("{0}: {1!s}".format(self._fspath, e)) from e if oheader is None: oheader = self._header # save original header elif (oheader.errors is None) != (self._header.errors is None): raise TinCanError("{0}: invalid #forward".format(self._origin)) if self._header.forward is None: break # print("forwarding from:", self._urlpath) # debug self._redirect() # print("forwarded to:", self._urlpath) # debug # If this is a #hidden page, we ignore it for now, since hidden pages # don't get routes made for them. if oheader.hidden and not oheader.errors: return # Get the code-behind #python if self._header.python is not None: if not self._header.python.endswith(_PEXTEN): raise TinCanError("{0}: #python files must end in {1}".format(self._urlpath, _PEXTEN)) self._python = self._header.python # Obtain a class object by importing and introspecting a module. self._getclass() # Build body object (#template) if self._header.template is not None: if not self._header.template.endswith(_TEXTEN): raise TinCanError("{0}: #template files must end in {1}".format(self._urlpath, _TEXTEN)) tpath = os.path.normpath(os.path.join(self._fsroot, *self._splitpath(self._header.template))) tfile = TemplateFile(tpath) self._body = self._tclass(source=tfile.body) else: self._body = self._tclass(source=self._template.body) self._body.prepare() # If this is an #errors page, register it as such. if oheader.errors is not None: self._mkerror(oheader.errors) return # this implies #hidden # Get #methods for this route if self._header.methods is None: methods = [ 'GET' ] else: methods = [ i.upper() for i in self._header.methods.split() ] if not methods: raise TinCanError("{0}: no #methods specified".format(self._urlpath)) # Register this thing with Bottle print("adding route:", self._origin, '('+','.join(methods)+')') # debug self._app.route(self._origin, methods, self) def _splitpath(self, unsplit): return _normpath(self._subdir, unsplit) def _mkerror(self, rerrors): try: errors = [ int(i) for i in rerrors.split() ] except ValueError as e: raise TinCanError("{0}: bad #errors line".format(self._urlpath)) from e if not errors: errors = range(_ERRMIN, _ERRMAX+1) route = _TinCanErrorRoute(self._tclass(source=self._template.body), self._class) for error in errors: if error < _ERRMIN or error > _ERRMAX: raise TinCanError("{0}: bad #errors code".format(self._urlpath)) self._app.error_handler[error] = route # XXX def _gettime(self, path): try: return os.stat(path).st_mtime except FileNotFoundError: return 0 except OSError as e: raise TinCanError(str(e)) from e def _getclass(self): pypath = os.path.normpath(os.path.join(self._fsroot, *self._splitpath(self._python))) klass = ErrorPage if self._header.errors else Page # Give 'em a default code-behind if they don't furnish one pytime = self._gettime(pypath) if not pytime: self._class = klass return # Else load the code-behind from a .py file pycpath = pypath + 'c' pyctime = self._gettime(pycpath) try: if pyctime < pytime: py_compile.compile(pypath, cfile=pycpath, doraise=True) except py_compile.PyCompileError as e: raise TinCanError(str(e)) from e except Exception as e: raise TinCanError("{0}: {1!s}".format(pypath, e)) from e try: spec = importlib.util.spec_from_file_location(_mangle(self._name), pycpath) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) except Exception as e: raise TinCanError("{0}: error importing".format(pycpath)) from e self._class = None for i in dir(mod): v = getattr(mod, i) if isclass(v) and issubclass(v, klass) and v is not klass: if self._class is not None and self._class is not klass: raise TinCanError("{0}: contains multiple {1} classes".format(pypath, klass.__name__)) self._class = v if self._class is None: raise TinCanError("{0}: contains no {1} classes".format(pypath, klass.__name__)) def _redirect(self): try: rlist = self._splitpath(self._header.forward) forw = '/' + '/'.join(rlist) if forw in self._seen: raise TinCanError("{0}: #forward loop".format(self._origin)) self._seen.add(forw) rname = rlist.pop() except IndexError as e: raise TinCanError("{0}: invalid #forward".format(self._urlpath)) from e name, ext = os.path.splitext(rname) if ext != _TEXTEN: raise TinCanError("{0}: invalid #forward".format(self._urlpath)) self._subdir = rlist self._python = name + _PEXTEN self._fspath = os.path.join(self._fsroot, *self._subdir, rname) self._urlpath = '/' + self._urljoin(*self._subdir, rname) def _urljoin(self, *args): args = list(args) if args[0] == '/': args[0] = '' return '/'.join(args) def __call__(self): """ This gets called by the framework AFTER the page is launched. """ target = None obj = self._class(bottle.request, bottle.response) try: obj.handle() return self._body.render(obj.export()).lstrip('\n') except ForwardException as fwd: target = fwd.target if target is None: raise TinCanError("{0}: unexpected null target".format(self._urlpath)) # We get here if we are doing a server-side programmatic # forward. environ = bottle.request.environ if _FORIG not in environ: environ[_FORIG] = self._urlpath if _FLOOP not in environ: environ[_FLOOP] = set([self._urlpath]) elif target in environ[_FLOOP]: raise TinCanError("{0}: forward loop detected".format(environ[_FORIG])) environ[_FLOOP].add(target) environ['bottle.raw_path'] = target environ['PATH_INFO'] = urllib.parse.quote(target) route, args = self._app.router.match(environ) environ['route.handle'] = environ['bottle.route'] = route environ['route.url_args'] = args return route.call(**args) def _mkdict(self, obj): ret = {} for name in dir(obj): if name.startswith('_'): continue value = getattr(obj, name) if not callable(value): ret[name] = value return ret # L a u n c h e r _WINF = "WEB-INF" _BANNED = set([_WINF]) class _Launcher(object): """ Helper class for launching webapps. """ def __init__(self, fsroot, urlroot, tclass, logger): """ Lightweight constructor. The real action happens in .launch() below. """ self.fsroot = fsroot self.urlroot = urlroot self.tclass = tclass self.logger = logger self.app = None self.errors = 0 self.debug = False def launch(self): """ Does the actual work of launching something. XXX - modifies sys.path and never un-modifies it. """ # Sanity checks if not self.urlroot.startswith("/"): raise TinCanError("urlroot must be absolute") if not os.path.isdir(self.fsroot): raise TinCanError("no such directory: {0!r}".format(self.fsroot)) # Make WEB-INF, if needed winf = os.path.join(self.fsroot, _WINF) lib = os.path.join(winf, "lib") for i in [ winf, lib ]: if not os.path.isdir(i): os.mkdir(i) # Add our private lib directory to sys.path sys.path.insert(1, os.path.abspath(lib)) # Do what we gotta do self.app = TinCan() self._launch([]) return self def _launch(self, subdir): for entry in os.listdir(os.path.join(self.fsroot, *subdir)): if not subdir and entry in _BANNED: continue etype = os.stat(os.path.join(self.fsroot, *subdir, entry)).st_mode if S_ISREG(etype): ename, eext = os.path.splitext(entry) if eext != _TEXTEN: continue # only look at interesting files route = _TinCanRoute(self, ename, subdir) try: route.launch() except TinCanError as e: self.logger(str(e)) if self.debug: while e.__cause__ != None: e = e.__cause__ self.logger("\t{0}: {1!s}".format(e.__class__.__name__, e)) self.errors += 1 elif S_ISDIR(etype): self._launch(subdir + [entry]) def _logger(message): sys.stderr.write(message) sys.stderr.write('\n') def launch(fsroot=None, urlroot='/', tclass=ChameleonTemplate, logger=_logger): """ Launch and return a TinCan webapp. Does not run the app; it is the caller's responsibility to call app.run() """ if fsroot is None: fsroot = os.getcwd() launcher = _Launcher(fsroot, urlroot, tclass, logger) # launcher.debug = True launcher.launch() return launcher.app, launcher.errors # XXX - We cannot implement a command-line launcher here; see the # launcher script for why.