view tincan.py @ 17:8186de188daf draft

Improve the run-time error handling in code-behinds.
author David Barts <n5jrn@me.com>
date Mon, 20 May 2019 07:34:46 -0700
parents 448fc3d534f8
children e88ab99914cf
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# As with Bottle, it's all in one big, ugly file. For now.

# I m p o r t s

import os, sys
import ast
import binascii
from base64 import b16encode, b16decode
import functools
import importlib
from inspect import isclass
import io
import py_compile
from stat import S_ISDIR, S_ISREG
from string import whitespace
import traceback
import urllib

import bottle

# E x c e p t i o n s

class TinCanException(Exception):
    """
    The parent class of all exceptions we raise.
    """
    pass

class TemplateHeaderError(TinCanException):
    """
    Raised upon encountering a syntax error in the template headers.
    """
    def __init__(self, message, line):
        super().__init__(message, line)
        self.message = message
        self.line = line

    def __str__(self):
        return "line {0}: {1}".format(self.line, self.message)

class ForwardException(TinCanException):
    """
    Raised to effect the flow control needed to do a forward (server-side
    redirect). It is ugly to do this, but other Python frameworks do and
    there seems to be no good alternative.
    """
    def __init__(self, target):
        self.target = target

class TinCanError(TinCanException):
    """
    General-purpose exception thrown by TinCan when things go wrong, often
    when attempting to launch webapps.
    """
    pass

# T e m p l a t e s
#
# Template (.pspx) files. These are standard templates for a supported
# template engine, but with an optional set of header lines that begin
# with '#'.

class TemplateFile(object):
    """
    Parse a template file into a header part and the body part. The header
    is always a leading set of lines, each starting with '#', that is of the
    same format regardless of the template body. The template body varies
    depending on the selected templating engine. The body part has
    each header line replaced by a blank line. This preserves the overall
    line numbering when processing the body. The added newlines are normally
    stripped out before the rendered page is sent back to the client.
    """
    _END = "#end"
    _LEND = len(_END)
    _WS = set(whitespace)

    def __init__(self, raw, encoding='utf-8'):
        if isinstance(raw, io.TextIOBase):
            self._do_init(raw)
        elif isinstance(raw, str):
            with open(raw, "r", encoding=encoding) as fp:
                self._do_init(fp)
        else:
            raise TypeError("Expecting a string or Text I/O object.")

    def _do_init(self, fp):
        self._hbuf = []
        self._bbuf = []
        self._state = self._header
        while True:
            line = fp.readline()
            if line == '':
                break
            self._state(line)
        self.header = ''.join(self._hbuf)
        self.body = ''.join(self._bbuf)

    def _header(self, line):
        if not line.startswith('#'):
            self._state = self._body
            self._state(line)
            return
        if line.startswith(self._END) and (len(line) == self._LEND or line[self._LEND] in self._WS):
            self._state = self._body
        self._hbuf.append(line)
        self._bbuf.append("\n")

    def _body(self, line):
        self._bbuf.append(line)

class TemplateHeader(object):
    """
    Parses and represents a set of header lines.
    """
    _NAMES = [ "errors", "forward", "methods", "python", "template" ]
    _FNAMES = [ "hidden" ]

    def __init__(self, string):
        # Initialize our state
        for i in self._NAMES:
            setattr(self, i, None)
        for i in self._FNAMES:
            setattr(self, i, False)
        # Parse the string
        count = 0
        nameset = set(self._NAMES + self._FNAMES)
        seen = set()
        lines = string.split("\n")
        if lines and lines[-1] == "":
            del lines[-1]
        for line in lines:
            # Get line
            count += 1
            if not line.startswith("#"):
                raise TemplateHeaderError("Does not start with '#'.", count)
            try:
                rna, rpa = line.split(maxsplit=1)
            except ValueError:
                rna = line.rstrip()
                rpa = None
            # Get name, ignoring remarks.
            name = rna[1:]
            if name == "rem":
                continue
            if name == "end":
                break
            if name not in nameset:
                raise TemplateHeaderError("Invalid directive: {0!r}".format(rna), count)
            if name in seen:
                raise TemplateHeaderError("Duplicate {0!r} directive.".format(rna), count)
            seen.add(name)
            # Flags
            if name in self._FNAMES:
                setattr(self, name, True)
                continue
            # Get parameter
            if rpa is None:
                raise TemplateHeaderError("Missing parameter.", count)
            param = rpa.strip()
            for i in [ "'", '"']:
                if param.startswith(i) and param.endswith(i):
                    param = ast.literal_eval(param)
                    break
            # Update this object
            setattr(self, name, param)

# C h a m e l e o n
#
# Support for Chameleon templates (the kind TinCan uses by default).

class ChameleonTemplate(bottle.BaseTemplate):
    def prepare(self, **options):
        from chameleon import PageTemplate, PageTemplateFile
        if self.source:
            self.tpl = PageTemplate(self.source, encoding=self.encoding,
                **options)
        else:
            self.tpl = PageTemplateFile(self.filename, encoding=self.encoding,
                search_path=self.lookup, **options)

    def render(self, *args, **kwargs):
        for dictarg in args:
            kwargs.update(dictarg)
        _defaults = self.defaults.copy()
        _defaults.update(kwargs)
        return self.tpl.render(**_defaults)

chameleon_template = functools.partial(bottle.template, template_adapter=ChameleonTemplate)
chameleon_view = functools.partial(bottle.view, template_adapter=ChameleonTemplate)

# U t i l i t i e s

def _normpath(base, unsplit):
    """
    Split, normalize and ensure a possibly relative path is absolute. First
    argument is a list of directory names, defining a base. Second
    argument is a string, which may either be relative to that base, or
    absolute. Only '/' is supported as a separator.
    """
    scratch = unsplit.strip('/').split('/')
    if not unsplit.startswith('/'):
        scratch = base + scratch
    ret = []
    for i in scratch:
        if i == '.':
            continue
        if i == '..':
            ret.pop()  # may raise IndexError
            continue
        ret.append(i)
    return ret

def _mangle(string):
    """
    Turn a possibly troublesome identifier into a mangled one.
    """
    first = True
    ret = []
    for ch in string:
        if ch == '_' or not (ch if first else "x" + ch).isidentifier():
            ret.append('_')
            ret.append(b16encode(ch.encode("utf-8")).decode("us-ascii"))
        else:
            ret.append(ch)
        first = False
    return ''.join(ret)

# The TinCan class. Simply a Bottle webapp that contains a forward method, so
# the code-behind can call request.app.forward().

class TinCan(bottle.Bottle):
    def forward(self, target):
        """
        Forward this request to the specified target route.
        """
        source = bottle.request.environ['PATH_INFO']
        base = source.strip('/').split('/')[:-1]
        if bottle.request.environ.get(_FTYPE, False):
            raise TinCanError("{0}: forward from error page".format(source))
        try:
            exc = ForwardException('/' + '/'.join(_normpath(base, target)))
        except IndexError as e:
            raise TinCanError("{0}: invalid forward to {1!r}".format(source, target)) from e
        raise exc

# C o d e   B e h i n d
#
# Represents the code-behind of one of our pages. This gets subclassed, of
# course.

class BasePage(object):
    """
    The parent class of both error and normal pages' code-behind.
    """
    def handle(self):
        """
        This is the entry point for the code-behind logic. It is intended
        to be overridden.
        """
        pass

    def export(self):
        """
        Export template variables. The default behavior is to export all
        non-hidden non-callables that don't start with an underscore.
        This method can be overridden if a different behavior is
        desired. It should always return a dict or dict-like object.
        """
        ret = { 'page': self }
        for name in dir(self):
            if name in self._HIDDEN or name.startswith('_'):
                continue
            value = getattr(self, name)
            if callable(value):
                continue
            ret[name] = value
        return ret

class Page(BasePage):
    """
    The code-behind for a normal page.
    """
    # Non-private things we refuse to export anyhow.
    _HIDDEN = set([ "request", "response" ])

    def __init__(self, req, resp):
        """
        Constructor. This is a lightweight operation.
        """
        self.request = req  # app context is request.app in Bottle
        self.response = resp

class ErrorPage(BasePage):
    """
    The code-behind for an error page.
    """
    _HIDDEN = set()

    def __init__(self, req, err):
        """
        Constructor. This is a lightweight operation.
        """
        self.request = req
        self.error = err

# R o u t e s
#
# Represents a route in TinCan. Our launcher creates these on-the-fly based
# on the files it finds.

_ERRMIN = 400
_ERRMAX = 599
_PEXTEN = ".py"
_TEXTEN = ".pspx"
_FLOOP = "tincan.forwards"
_FORIG = "tincan.origin"
_FTYPE = "tincan.iserror"

class _TinCanErrorRoute(object):
    """
    A route to an error page. These don't get routes created for them,
    and are only reached if an error routes them there. Unless you create
    custom code-behind, only two variables are available to your template:
    request (bottle.Request) and error (bottle.HTTPError).
    """
    def __init__(self, template, klass):
        self._template = template
        self._template.prepare()
        self._class = klass

    def __call__(self, e):
        bottle.request.environ[_FTYPE] = True
        try:
            obj = self._class(bottle.request, e)
            obj.handle()
            return self._template.render(obj.export()).lstrip('\n')
        except bottle.HTTPResponse as e:
            return e
        except Exception as e:
            traceback.print_exc()
            raise bottle.HTTPError(status=500, exception=e)

class _TinCanRoute(object):
    """
    A route created by the TinCan launcher.
    """
    def __init__(self, launcher, name, subdir):
        self._fsroot = launcher.fsroot
        self._urlroot = launcher.urlroot
        self._name = name
        self._python = name + _PEXTEN
        self._fspath = os.path.join(launcher.fsroot, *subdir, name + _TEXTEN)
        self._urlpath = self._urljoin(launcher.urlroot, *subdir, name + _TEXTEN)
        self._origin = self._urlpath
        self._subdir = subdir
        self._seen = set()
        self._tclass = launcher.tclass
        self._app = launcher.app

    def launch(self):
        """
        Launch a single page.
        """
        # Build master and header objects, process #forward directives
        oheader = None
        while True:
            try:
                self._template = TemplateFile(self._fspath)
            except IOError as e:
                if oheader is not None:
                    note = "{0}: invalid #forward: ".format(self._origin)
                else:
                    note = ""
                raise TinCanError("{0}{1!s}".format(note, e)) from e
            try:
                self._header = TemplateHeader(self._template.header)
            except TemplateHeaderError as e:
                raise TinCanError("{0}: {1!s}".format(self._fspath, e)) from e
            if oheader is None:
                oheader = self._header  # save original header
            elif (oheader.errors is None) != (self._header.errors is None):
                raise TinCanError("{0}: invalid #forward".format(self._origin))
            if self._header.forward is None:
                break
            # print("forwarding from:", self._urlpath)  # debug
            self._redirect()
            # print("forwarded to:", self._urlpath)  # debug
        # If this is a #hidden page, we ignore it for now, since hidden pages
        # don't get routes made for them.
        if oheader.hidden and not oheader.errors:
            return
        # Get the code-behind #python
        if self._header.python is None:
            self._python_specified = False
        else:
            if not self._header.python.endswith(_PEXTEN):
                raise TinCanError("{0}: #python files must end in {1}".format(self._urlpath, _PEXTEN))
            self._python = self._header.python
            self._python_specified = True
        # Obtain a class object by importing and introspecting a module.
        self._getclass()
        # Build body object (#template)
        if self._header.template is not None:
            if not self._header.template.endswith(_TEXTEN):
                raise TinCanError("{0}: #template files must end in {1}".format(self._urlpath, _TEXTEN))
            try:
                tpath = os.path.normpath(os.path.join(self._fsroot, *self._splitpath(self._header.template)))
                tfile = TemplateFile(tpath)
            except OSError as e:
                raise TinCanError("{0}: invalid #template: {1!s}".format(self._urlpath, e)) from e
            except IndexError as e:
                raise TinCanError("{0}: invalid #template".format(self._urlpath)) from e
            self._body = self._tclass(source=tfile.body)
        else:
            self._body = self._tclass(source=self._template.body)
        self._body.prepare()
        # If this is an #errors page, register it as such.
        if oheader.errors is not None:
            self._mkerror(oheader.errors)
            return  # this implies #hidden
        # Get #methods for this route
        if self._header.methods is None:
            methods = [ 'GET' ]
        else:
            methods = [ i.upper() for i in self._header.methods.split() ]
            if not methods:
                raise TinCanError("{0}: no #methods specified".format(self._urlpath))
        # Register this thing with Bottle
        print("adding route:", self._origin, '('+','.join(methods)+')')  # debug
        self._app.route(self._origin, methods, self)

    def _splitpath(self, unsplit):
        return _normpath(self._subdir, unsplit)

    def _mkerror(self, rerrors):
        try:
            errors = [ int(i) for i in rerrors.split() ]
        except ValueError as e:
            raise TinCanError("{0}: bad #errors line".format(self._urlpath)) from e
        if not errors:
            errors = range(_ERRMIN, _ERRMAX+1)
        route = _TinCanErrorRoute(self._tclass(source=self._template.body), self._class)
        for error in errors:
            if error < _ERRMIN or error > _ERRMAX:
                raise TinCanError("{0}: bad #errors code".format(self._urlpath))
            self._app.error_handler[error] = route  # XXX

    def _gettime(self, path):
        try:
            return os.stat(path).st_mtime
        except FileNotFoundError:
            return 0
        except OSError as e:
            raise TinCanError(str(e)) from e

    def _getclass(self):
        try:
            pypath = os.path.normpath(os.path.join(self._fsroot, *self._splitpath(self._python)))
        except IndexError as e:
            raise TinCanError("{0}: invalid #python".format(self._urlpath)) from e
        klass = ErrorPage if self._header.errors else Page
        # Give 'em a default code-behind if they don't furnish one
        pytime = self._gettime(pypath)
        if not pytime:
            if self._python_specified:
                raise TinCanError("{0}: #python file not found".format(self._urlpath))
            self._class = klass
            return
        # Else load the code-behind from a .py file
        pycpath = pypath + 'c'
        pyctime = self._gettime(pycpath)
        try:
            if pyctime < pytime:
                py_compile.compile(pypath, cfile=pycpath, doraise=True)
        except py_compile.PyCompileError as e:
            raise TinCanError(str(e)) from e
        except Exception as e:
            raise TinCanError("{0}: {1!s}".format(pypath, e)) from e
        try:
            spec = importlib.util.spec_from_file_location(_mangle(self._name), pycpath)
            mod =  importlib.util.module_from_spec(spec)
            spec.loader.exec_module(mod)
        except Exception as e:
            raise TinCanError("{0}: error importing: {1!s}".format(pycpath, e)) from e
        # Locate a suitable class
        self._class = None
        for i in dir(mod):
            v = getattr(mod, i)
            if isclass(v) and issubclass(v, klass) and v is not klass:
                if self._class is not None:
                    raise TinCanError("{0}: contains multiple {1} classes".format(pypath, klass.__name__))
                self._class = v
        if self._class is None:
            raise TinCanError("{0}: contains no {1} classes".format(pypath, klass.__name__))

    def _redirect(self):
        try:
            rlist = self._splitpath(self._header.forward)
            forw = '/' + '/'.join(rlist)
            if forw in self._seen:
                raise TinCanError("{0}: #forward loop".format(self._origin))
            self._seen.add(forw)
            rname = rlist.pop()
        except IndexError as e:
            raise TinCanError("{0}: invalid #forward".format(self._urlpath)) from e
        name, ext = os.path.splitext(rname)
        if ext != _TEXTEN:
            raise TinCanError("{0}: invalid #forward".format(self._urlpath))
        self._subdir = rlist
        self._python = name + _PEXTEN
        self._fspath = os.path.join(self._fsroot, *self._subdir, rname)
        self._urlpath = '/' + self._urljoin(*self._subdir, rname)

    def _urljoin(self, *args):
        args = list(args)
        if args[0] == '/':
            args[0] = ''
        return '/'.join(args)

    def __call__(self):
        """
        This gets called by the framework AFTER the page is launched.
        """
        target = None
        try:
            obj = self._class(bottle.request, bottle.response)
            obj.handle()
            return self._body.render(obj.export()).lstrip('\n')
        except ForwardException as fwd:
            target = fwd.target
        except bottle.HTTPResponse as e:
            return e
        except Exception as e:
            traceback.print_exc()
            raise bottle.HTTPError(status=500, exception=e)
        if target is None:
            message = "{0}: unexpected null target".format(self._urlpath)
            sys.stderr.write(message + '\n')
            raise bottle.HTTPError(status=500, exception=TinCanError(message))
        # We get here if we are doing a server-side programmatic
        # forward.
        environ = bottle.request.environ
        if _FORIG not in environ:
            environ[_FORIG] = self._urlpath
        if _FLOOP not in environ:
            environ[_FLOOP] = set([self._urlpath])
        elif target in environ[_FLOOP]:
            message = "{0}: forward loop detected".format(environ[_FORIG])
            sys.stderr.write(message + '\n')
            raise bottle.HTTPError(status=500, exception=TinCanError(message))
        environ[_FLOOP].add(target)
        environ['bottle.raw_path'] = target
        environ['PATH_INFO'] = urllib.parse.quote(target)
        route, args = self._app.router.match(environ)
        environ['route.handle'] = environ['bottle.route'] = route
        environ['route.url_args'] = args
        return route.call(**args)

    def _mkdict(self, obj):
        ret = {}
        for name in dir(obj):
            if name.startswith('_'):
                continue
            value = getattr(obj, name)
            if not callable(value):
                ret[name] = value
        return ret

# L a u n c h e r

_WINF = "WEB-INF"
_BANNED = set([_WINF])

class _Launcher(object):
    """
    Helper class for launching webapps.
    """
    def __init__(self, fsroot, urlroot, tclass, logger):
        """
        Lightweight constructor. The real action happens in .launch() below.
        """
        self.fsroot = fsroot
        self.urlroot = urlroot
        self.tclass = tclass
        self.logger = logger
        self.app = None
        self.errors = 0
        self.debug = False

    def launch(self):
        """
        Does the actual work of launching something. XXX - modifies sys.path
        and never un-modifies it.
        """
        # Sanity checks
        if not self.urlroot.startswith("/"):
            raise TinCanError("urlroot must be absolute")
        if not os.path.isdir(self.fsroot):
            raise TinCanError("no such directory: {0!r}".format(self.fsroot))
        # Make WEB-INF, if needed
        winf = os.path.join(self.fsroot, _WINF)
        lib = os.path.join(winf, "lib")
        for i in [ winf, lib ]:
            if not os.path.isdir(i):
                os.mkdir(i)
        # Add our private lib directory to sys.path
        sys.path.insert(1, os.path.abspath(lib))
        # Do what we gotta do
        self.app = TinCan()
        self._launch([])
        return self

    def _launch(self, subdir):
        for entry in os.listdir(os.path.join(self.fsroot, *subdir)):
            if not subdir and entry in _BANNED:
                continue
            etype = os.stat(os.path.join(self.fsroot, *subdir, entry)).st_mode
            if S_ISREG(etype):
                ename, eext = os.path.splitext(entry)
                if eext != _TEXTEN:
                    continue  # only look at interesting files
                route = _TinCanRoute(self, ename, subdir)
                try:
                    route.launch()
                except TinCanError as e:
                    self.logger(str(e))
                    if self.debug:
                        while e.__cause__ != None:
                            e = e.__cause__
                            self.logger("\t{0}: {1!s}".format(e.__class__.__name__, e))
                    self.errors += 1
            elif S_ISDIR(etype):
                self._launch(subdir + [entry])

def _logger(message):
    sys.stderr.write(message)
    sys.stderr.write('\n')

def launch(fsroot=None, urlroot='/', tclass=ChameleonTemplate, logger=_logger):
    """
    Launch and return a TinCan webapp. Does not run the app; it is the
    caller's responsibility to call app.run()
    """
    if fsroot is None:
        fsroot = os.getcwd()
    launcher = _Launcher(fsroot, urlroot, tclass, logger)
    # launcher.debug = True
    launcher.launch()
    return launcher.app, launcher.errors

# XXX - We cannot implement a command-line launcher here; see the
# launcher script for why.