Mercurial > cgi-bin > hgweb.cgi > tincan
view tincan.py @ 2:ca6f8ca38cf2 draft
Another backup commit.
author | David Barts <n5jrn@me.com> |
---|---|
date | Sun, 12 May 2019 22:51:34 -0700 |
parents | 94b36e721500 |
children | c6902cded64d |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # As with Bottle, it's all in one big, ugly file. For now. # I m p o r t s import os, sys import ast import binascii from base64 import b16encode, b16decode import importlib import io import py_compile from stat import S_ISDIR, S_ISREG import bottle # E x c e p t i o n s class TinCanException(Exception): """ The parent class of all exceptions we raise. """ pass class TemplateHeaderException(TinCanException): """ Raised upon encountering a syntax error in the template headers. """ def __init__(self, message, line): super().__init__(message, line) self.message = message self.line = line def __str__(self): return "Line {0}: {1}".format(self.line, self.message) class ForwardException(TinCanException): """ Raised to effect the flow control needed to do a forward (server-side redirect). It is ugly to do this, but other Python frameworks do and there seems to be no good alternative. """ def __init__(self, target): self.target = target class TinCanError(TinCanException): """ General-purpose exception thrown by TinCan when things go wrong, often when attempting to launch webapps. """ pass # T e m p l a t e s # # Template (.pspx) files. These are standard templates for a supported # template engine, but with an optional set of header lines that begin # with '#'. class TemplateFile(object): """ Parse a template file into a header part and the body part. The header is always a leading set of lines, each starting with '#', that is of the same format regardless of the template body. The template body varies depending on the selected templating engine. The body part has each header line replaced by a blank line. This preserves the overall line numbering when processing the body. The added newlines are normally stripped out before the rendered page is sent back to the client. """ def __init__(self, raw, encoding='utf-8'): if isinstance(raw, io.TextIOBase): self._do_init(raw) elif isinstance(raw, str): with open(raw, "r", encoding=encoding) as fp: self._do_init(fp) else: raise TypeError("Expecting a string or Text I/O object.") def _do_init(self, fp): self._hbuf = [] self._bbuf = [] self._state = self._header while True: line = fp.readline() if line == '': break self._state(line) self.header = ''.join(self._hbuf) self.body = ''.join(self._bbuf) def _header(self, line): if not line.startswith('#'): self._state = self._body self._state(line) return self._hbuf.append(line) self._bbuf.append("\n") def _body(self, line): self._bbuf.append(line) class TemplateHeader(object): """ Parses and represents a set of header lines. """ _NAMES = [ "errors", "forward", "methods", "python", "template" ] _FNAMES = [ "hidden" ] def __init__(self, string): # Initialize our state for i in self._NAMES: setattr(self, i, None) for i in self._FNAMES: setattr(self, i, False) # Parse the string count = 0 nameset = set(self._NAMES + self._FNAMES) seen = set() lines = string.split("\n") if lines and lines[-1] == "": del lines[-1] for line in lines: # Get line count += 1 if not line.startswith("#"): raise TemplateHeaderException("Does not start with '#'.", count) try: rna, rpa = line.split(maxsplit=1) except ValueError: raise TemplateHeaderException("Missing parameter.", count) # Get name, ignoring remarks. name = rna[1:] if name == "rem": continue if name not in nameset: raise TemplateHeaderException("Invalid directive: {0!r}".format(rna), count) if name in seen: raise TemplateHeaderException("Duplicate {0!r} directive.".format(rna), count) seen.add(name) # Flags if name in self._FLAGS: setattr(self, name, True) continue # Get parameter param = rpa.strip() for i in [ "'", '"']: if param.startswith(i) and param.endswith(i): param = ast.literal_eval(param) break # Update this object setattr(self, name, param) # C h a m e l e o n # # Support for Chameleon templates (the kind TinCan uses by default). class ChameleonTemplate(bottle.BaseTemplate): def prepare(self, **options): from chameleon import PageTemplate, PageTemplateFile if self.source: self.tpl = chameleon.PageTemplate(self.source, encoding=self.encoding, **options) else: self.tpl = chameleon.PageTemplateFile(self.filename, encoding=self.encoding, search_path=self.lookup, **options) def render(self, *args, **kwargs): for dictarg in args: kwargs.update(dictarg) _defaults = self.defaults.copy() _defaults.update(kwargs) return self.tpl.render(**_defaults) chameleon_template = functools.partial(template, template_adapter=ChameleonTemplate) chameleon_view = functools.partial(view, template_adapter=ChameleonTemplate) # U t i l i t i e s def _normpath(base, unsplit): """ Split, normalize and ensure a possibly relative path is absolute. First argument is a list of directory names, defining a base. Second argument is a string, which may either be relative to that base, or absolute. Only '/' is supported as a separator. """ scratch = unsplit.strip('/').split('/') if not unsplit.startswith('/'): scratch = base + scratch ret = [] for i in scratch: if i == '.': continue if i == '..': ret.pop() # may raise IndexError continue ret.append(i) return ret def _mangle(string): """ Turn a possibly troublesome identifier into a mangled one. """ first = True ret = [] for ch in string: if ch == '_' or not (ch if first else "x" + ch).isidentifier(): ret.append('_') ret.append(b16encode(ch.encode("utf-8")).decode("us-ascii")) else: ret.append(ch) first = False return ''.join(ret) # The TinCan class. Simply a Bottle webapp that contains a forward method, so # the code-behind can call request.app.forward(). class TinCan(bottle.Bottle): def forward(self, target): """ Forward this request to the specified target route. """ source = bottle.request.environ['PATH_INFO'] base = source.strip('/').split('/')[:-1] try: exc = ForwardException('/' + '/'.join(_normpath(base, target))) except IndexError as e: raise TinCanError("{0}: invalid forward to {1!r}".format(source, target)) from e raise exc # C o d e B e h i n d # # Represents the code-behind of one of our pages. This gets subclassed, of # course. class Page(object): # Non-private things we refuse to export anyhow. __HIDDEN = set([ "request", "response" ]) def __init__(self, req, resp): """ Constructor. This is a lightweight operation. """ self.request = req # app context is request.app in Bottle self.response = resp def handle(self): """ This is the entry point for the code-behind logic. It is intended to be overridden. """ pass def export(self): """ Export template variables. The default behavior is to export all non-hidden non-callables that don't start with an underscore, plus a an export named page that contains this object itself. This method can be overridden if a different behavior is desired. It should always return a dict or dict-like object. """ ret = { "page": self } # feature: will be clobbered if self.page exists for name in dir(self): if name in self.__HIDDEN or name.startswith('_'): continue value = getattr(self, name) if callable(value): continue ret[name] = value # R o u t e s # # Represents a route in TinCan. Our launcher creates these on-the-fly based # on the files it finds. _ERRMIN = 400 _ERRMAX = 599 _PEXTEN = ".py" _TEXTEN = ".pspx" _FLOOP = "tincan.forwards" _FORIG = "tincan.origin" class TinCanErrorRoute(object): """ A route to an error page. These never have code-behind, don't get routes created for them, and are only reached if an error routes them there. Error templates only have two variables available: e (the HTTPError object associated with the error) and request. """ def __init__(self, template): self._template = template self._template.prepare() def __call__(self, e): return self._template.render(e=e, request=bottle.request).lstrip('\n') class TinCanRoute(object): """ A route created by the TinCan launcher. """ def __init__(self, launcher, name, subdir): self._fsroot = launcher.fsroot self._urlroot = launcher.urlroot self._name = name self._python = name + _PEXTEN self._content = CONTENT self._fspath = os.path.join(launcher.fsroot, *subdir, name + _TEXTEN) self._urlpath = self._urljoin(launcher.urlroot, *subdir, name + _TEXTEN) self._origin = self._urlpath self._subdir = subdir self._seen = set() self._tclass = launcher.tclass self._app = launcher.app def launch(self): """ Launch a single page. """ # Build master and header objects, process #forward directives hidden = None while True: self._template = TemplateFile(self._fspath) self._header = TemplateHeader(self._template.header) if hidden is None: hidden = self._header.hidden if self._header.forward is None: break self._redirect() # If this is a hidden page, we ignore it for now, since hidden pages # don't get routes made for them. if hidden: return # If this is an error page, register it as such. if self._header.errors is not None: if self._header.template is not None: tpath = os.path.join(self._fsroot, *self._splitpath(self._header.template)) self._template = try: errors = [ int(i) for i in self._header.errors.split() ] except ValueError as e: raise TinCanError("{0}: bad #errors line".format(self._urlpath)) from e if not errors: errors = range(_ERRMIN, _ERRMAX+1) route = TinCanErrorRoute(self._tclass(source=self._template.body)) for error in errors: if error < _ERRMIN or error > _ERRMAX: raise TinCanError("{0}: bad #errors code".format(self._urlpath)) self._app.error(code=error, callback=route) return # this implies #hidden # Get methods for this route if self._header.methods is None: methods = [ 'GET' ] else: methods = [ i.upper() for i in self._header.methods.split() ] # Process other header entries if self._header.python is not None: if not self._header.python.endswith(_PEXTEN): raise TinCanError("{0}: #python files must end in {1}", self._urlpath, _PEXTEN) self._python = self._header.python # Obtain a class object by importing and introspecting a module. pypath = os.path.normpath(os.path.join(self._fsroot, *self._subdir, self._python)) pycpath = pypath + 'c' try: pyctime = os.stat(pycpath).st_mtime except OSError: pyctime = 0 if pyctime < os.stat(pypath).st_mtime: try: py_compile.compile(pypath, cfile=pycpath) except Exception as e: raise TinCanError("{0}: error compiling".format(pypath)) from e try: spec = importlib.util.spec_from_file_location(_mangle(self._name), cpath) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) except Exception as e: raise TinCanError("{0}: error importing".format(pycpath)) from e self._class = None for i in dir(mod): v = getattr(mod, i) if issubclass(v, Page): if self._class is not None: raise TinCanError("{0}: contains multiple Page classes", pypath) self._class = v # Build body object (Chameleon template) if self._header.template is not None: tpath = os.path.join(self._fsroot, *self._splitpath(self._header.template)) tfile = TemplateFile(tpath) self._body = self._tclass(source=tfile.body) else: self._body = self._tclass(source=self._template.body) self._body.prepare() # Register this thing with Bottle print("adding route:", self._origin) # debug self._app.route(self._origin, methods, self) def _splitpath(self, unsplit): return _normpath(self._subdir, unsplit) def _redirect(self): if self._header.forward in self._seen: raise TinCanError("{0}: #forward loop".format(self._origin)) self._seen.add(self._header.forward) try: rlist = self._splitpath(self._header.forward) rname = rlist.pop() except IndexError as e: raise TinCanError("{0}: invalid #forward".format(self._urlpath)) from e name, ext = os.path.splitext(rname)[1] if ext != _TEXTEN: raise TinCanError("{0}: invalid #forward".format(self._urlpath)) self._subdir = rlist self._python = name + _PEXTEN self._fspath = os.path.join(self._fsroot, *self._subdir, rname) self._urlpath = self._urljoin(*self._subdir, rname) def _urljoin(self, *args): args = list(args) if args[0] == '/': args[0] = '' return '/'.join(args) def __call__(self): """ This gets called by the framework AFTER the page is launched. """ target = None obj = self._class(bottle.request, bottle.response) try: obj.handle() return self._body.render(obj.export()).lstrip('\n') except ForwardException as fwd: target = fwd.target if target is None: raise TinCanError("Unexpected null target!") # We get here if we are doing a server-side programmatic # forward. environ = bottle.request.environ if _FORIG not in environ: environ[_FORIG] = self._urlpath if _FLOOP not in environ: environ[_FLOOP] = set([self._urlpath]) elif target in environ[_FLOOP]: TinCanError("{0}: forward loop detected".format(environ[_FORIG])) environ[_FLOOP].add(target) environ['bottle.raw_path'] = target environ['PATH_INFO'] = urllib.parse.quote(target) route, args = self._app.router.match(environ) environ['route.handle'] = environ['bottle.route'] = route environ['route.url_args'] = args return route.call(**args) def _mkdict(self, obj): ret = {} for name in dir(obj): if name.startswith('_'): continue value = getattr(obj, name) if not callable(value): ret[name] = value return ret # L a u n c h e r _WINF = "WEB-INF" _BANNED = set([_WINF]) class _Launcher(object): """ Helper class for launching webapps. """ def __init__(self, fsroot, urlroot, tclass): """ Lightweight constructor. The real action happens in .launch() below. """ self.fsroot = fsroot self.urlroot = urlroot self.tclass = tclass self.app = None def launch(self): """ Does the actual work of launching something. XXX - modifies sys.path and never un-modifies it. """ # Sanity checks if not self.urlroot.startswith("/"): raise TinCanError("urlroot must be absolute") if not os.path.isdir(self.fsroot): raise TinCanError("no such directory: {0!r}".format(self.fsroot)) # Make WEB-INF, if needed winf = os.path.join(self.fsroot, _WINF) lib = os.path.join(winf, "lib") for i in [ winf, lib ]: if not os.path.isdir(i): os.mkdir(i) # Add our private lib directory to sys.path sys.path.insert(1, os.path.abspath(lib)) # Do what we gotta do self.app = TinCan() self._launch([]) return self.app def _launch(self, subdir): for entry in os.listdir(os.path.join(self.fsroot, *subdir)): if not subdir and entry in _BANNED: continue etype = os.stat(os.path.join(self.fsroot, *subdir, entry)).st_mode if S_ISREG(etype): ename, eext = os.path.splitext(entry) if eext != _TEXTEN: continue # only look at interesting files route = TinCanRoute(self, ename, subdir) page.launch() elif S_ISDIR(etype): self._launch(subdir + [entry]) def launch(fsroot=None, urlroot='/', tclass=ChameleonTemplate): """ Launch and return a TinCan webapp. Does not run the app; it is the caller's responsibility to call app.run() """ if fsroot is None: fsroot = os.getcwd() return _Launcher(fsroot, urlroot, tclass).launch()