view tincan.py @ 1:94b36e721500 draft

Another check in to back stuff up.
author David Barts <n5jrn@me.com>
date Sun, 12 May 2019 19:19:40 -0700
parents e726fafcffac
children ca6f8ca38cf2
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# As with Bottle, it's all in one big, ugly file. For now.

# I m p o r t s

import os, sys
import ast
import binascii
from base64 import b16encode, b16decode
import importlib, py_compile
import io

import bottle

# C l a s s e s

# Exceptions

class TinCanException(Exception):
    """
    The parent class of all exceptions we raise.
    """
    pass

class TemplateHeaderException(TinCanException):
    """
    Raised upon encountering a syntax error in the template headers.
    """
    def __init__(self, message, line):
        super().__init__(message, line)
        self.message = message
        self.line = line

    def __str__(self):
        return "Line {0}: {1}".format(self.line, self.message)

class ForwardException(TinCanException):
    """
    Raised to effect the flow control needed to do a forward (server-side
    redirect). It is ugly to do this, but other Python frameworks do and
    there seems to be no good alternative.
    """
    def __init__(self, target):
        self.target = target

class TinCanError(TinCanException):
    """
    General-purpose exception thrown by TinCan when things go wrong, often
    when attempting to launch webapps.
    """
    pass

# Template (.pspx) files. These are standard templates for a supported
# template engine, but with an optional set of header lines that begin
# with '#'.

class TemplateFile(object):
    """
    Parse a template file into a header part and the body part. The header
    is always a leading set of lines, each starting with '#', that is of the
    same format regardless of the template body. The template body varies
    depending on the selected templating engine. The body part has
    each header line replaced by a blank line. This preserves the overall
    line numbering when processing the body. The added newlines are normally
    stripped out before the rendered page is sent back to the client.
    """
    def __init__(self, raw, encoding='utf-8'):
        if isinstance(raw, io.TextIOBase):
            self._do_init(raw)
        elif isinstance(raw, str):
            with open(raw, "r", encoding=encoding) as fp:
                self._do_init(fp)
        else:
            raise TypeError("Expecting a string or Text I/O object.")

    def _do_init(self, fp):
        self._hbuf = []
        self._bbuf = []
        self._state = self._header
        while True:
            line = fp.readline()
            if line == '':
                break
            self._state(line)
        self.header = ''.join(self._hbuf)
        self.body = ''.join(self._bbuf)

    def _header(self, line):
        if not line.startswith('#'):
            self._state = self._body
            self._state(line)
            return
        self._hbuf.append(line)
        self._bbuf.append("\n")

    def _body(self, line):
        self._bbuf.append(line)

class TemplateHeader(object):
    """
    Parses and represents a set of header lines.
    """
    _NAMES = [ "error", "forward", "methods", "python", "template" ]
    _FNAMES = [ "hidden" ]

    def __init__(self, string):
        # Initialize our state
        for i in self._NAMES:
            setattr(self, i, None)
        for i in self._FNAMES:
            setattr(self, i, False)
        # Parse the string
        count = 0
        nameset = set(self._NAMES + self._FNAMES)
        seen = set()
        lines = string.split("\n")
        if lines and lines[-1] == "":
            del lines[-1]
        for line in lines:
            # Get line
            count += 1
            if not line.startswith("#"):
                raise TemplateHeaderException("Does not start with '#'.", count)
            try:
                rna, rpa = line.split(maxsplit=1)
            except ValueError:
                raise TemplateHeaderException("Missing parameter.", count)
            # Get name, ignoring remarks.
            name = rna[1:]
            if name == "rem":
                continue
            if name not in nameset:
                raise TemplateHeaderException("Invalid directive: {0!r}".format(rna), count)
            if name in seen:
                raise TemplateHeaderException("Duplicate {0!r} directive.".format(rna), count)
            seen.add(name)
            # Flags
            if name in self._FLAGS:
                setattr(self, name, True)
                continue
            # Get parameter
            param = rpa.strip()
            for i in [ "'", '"']:
                if param.startswith(i) and param.endswith(i):
                    param = ast.literal_eval(param)
                    break
            # Update this object
            setattr(self, name, param)

# Support for Chameleon templates (the kind TinCan uses by default).

class ChameleonTemplate(bottle.BaseTemplate):
    def prepare(self, **options):
        from chameleon import PageTemplate, PageTemplateFile
        if self.source:
            self.tpl = chameleon.PageTemplate(self.source,
                encoding=self.encoding, **options)
        else:
            self.tpl = chameleon.PageTemplateFile(self.filename,
                encoding=self.encoding, search_path=self.lookup, **options)

    def render(self, *args, **kwargs):
        for dictarg in args:
            kwargs.update(dictarg)
        _defaults = self.defaults.copy()
        _defaults.update(kwargs)
        return self.tpl.render(**_defaults)

chameleon_template = functools.partial(template, template_adapter=ChameleonTemplate)
chameleon_view = functools.partial(view, template_adapter=ChameleonTemplate)

# Utility functions, used in various places.

def _normpath(base, unsplit):
    """
    Split, normalize and ensure a possibly relative path is absolute. First
    argument is a list of directory names, defining a base. Second
    argument is a string, which may either be relative to that base, or
    absolute. Only '/' is supported as a separator.
    """
    scratch = unsplit.strip('/').split('/')
    if not unsplit.startswith('/'):
        scratch = base + scratch
    ret = []
    for i in scratch:
        if i == '.':
            continue
        if i == '..':
            ret.pop()  # may raise IndexError
            continue
        ret.append(i)
    return ret

def _mangle(string):
    """
    Turn a possibly troublesome identifier into a mangled one.
    """
    first = True
    ret = []
    for ch in string:
        if ch == '_' or not (ch if first else "x" + ch).isidentifier():
            ret.append('_')
            ret.append(b16encode(ch.encode("utf-8")).decode("us-ascii"))
        else:
            ret.append(ch)
        first = False
    return ''.join(ret)

# The TinCan class. Simply a Bottle webapp that contains a forward method, so
# the code-behind can call request.app.forward().

class TinCan(bottle.Bottle):
    def forward(self, target):
        """
        Forward this request to the specified target route.
        """
        source = bottle.request.environ['PATH_INFO']
        base = source.strip('/').split('/')[:-1]
        try:
            exc = ForwardException('/' + '/'.join(_normpath(base, target)))
        except IndexError as e:
            raise TinCanError("{0}: invalid forward to {1!r}".format(source, target)) from e
        raise exc

# Represents the code-behind of one of our pages. This gets subclassed, of
# course.

class Page(object):
    # Non-private things we refuse to export anyhow.
    __HIDDEN = set([ "request", "response" ])

    def __init__(self, req, resp):
        """
        Constructor. This is a lightweight operation.
        """
        self.request = req  # app context is request.app in Bottle
        self.response = resp

    def handle(self):
        """
        This is the entry point for the code-behind logic. It is intended
        to be overridden.
        """
        pass

    def export(self):
        """
        Export template variables. The default behavior is to export all
        non-hidden non-callables that don't start with an underscore,
        plus a an export named page that contains this object itself.
        This method can be overridden if a different behavior is
        desired. It should always return a dict or dict-like object.
        """
        ret = { "page": self }  # feature: will be clobbered if self.page exists
        for name in dir(self):
            if name in self.__HIDDEN or name.startswith('_'):
                continue
            value = getattr(self, name)
            if callable(value):
                continue
            ret[name] = value

# Represents a route in TinCan. Our launcher creates these on-the-fly based
# on the files it finds.

EXTENSION = ".pspx"
CONTENT = "text/html"
_WINF = "WEB-INF"
_BANNED = set([_WINF])
_CODING = "utf-8"
_CLASS = "Page"
_FLOOP = "tincan.forwards"
_FORIG = "tincan.origin"

class TinCanErrorRoute(object):
    """
    A route to an error page. These never have code-behind, don't get
    routes created for them, and are only reached if an error routes them
    there. Error templates only have two variables available: e (the
    HTTPError object associated with the error) and request.
    """
    def __init__(self, template):
        self._template = template
        self._template.prepare()

    def __call__(self, e):
        return self._template.render(e=e, request=bottle.request).lstrip('\n')

class TinCanRoute(object):
    """
    A route created by the TinCan launcher.
    """
    def __init__(self, launcher, name, subdir):
        self._plib = launcher.plib
        self._fsroot = launcher.fsroot
        self._urlroot = launcher.urlroot
        self._name = name
        self._python = name + ".py"
        self._content = CONTENT
        self._fspath = os.path.join(launcher.fsroot, *subdir, name + EXTENSION)
        self._urlpath = self._urljoin(launcher.urlroot, *subdir, name + EXTENSION)
        self._origin = self._urlpath
        self._subdir = subdir
        self._seen = set()
        self._class = None
        self._tclass = launcher.tclass
        self._app = launcher.app

    def launch(self, config):
        """
        Launch a single page.
        """
        # Build master and header objects, process #forward directives
        hidden = None
        while True:
            self._template = TemplateFile(self._fspath)
            self._header = TemplateHeader(self._template.header)
            if hidden is None:
                hidden = self._header.hidden
            if self._header.forward is None:
                break
            self._redirect()
        # If this is a hidden page, we ignore it for now, since hidden pages
        # don't get routes made for them.
        if hidden:
            return
        # If this is an error page, register it as such.
        if self._header.error is not None:
            try:
                errors = [ int(i) for i in self._header.error.split() ]
            except ValueError as e:
                raise TinCanError("{0}: bad #error line".format(self._urlpath)) from e
            if not errors:
                errors = range(400, 600)
            route = TinCanErrorRoute(self._tclass(source=self._template.body))
            for error in errors:
                self._app.error(code=error, callback=route)
            return  # this implies #hidden
        # Get methods for this route
        if self._header.methods is None:
            methods = [ 'GET' ]
        else:
            methods = [ i.upper() for i in self._header.methods.split() ]
        # Process other header entries
        if self._header.python is not None:
            if not self._header.python.endswith('.py'):
                raise TinCanError("{0}: #python files must end in .py", self._urlpath)
            self._python = self._header.python
        # Obtain a class object by importing and introspecting a module.
        pypath = os.path.normpath(os.path.join(self._fsroot, *self._subdir, self._python))
        pycpath = pypath + 'c'
        try:
            pyctime = os.stat(pycpath).st_mtime
        except OSError:
            pyctime = 0
        if pyctime < os.stat(pypath).st_mtime:
            try:
                py_compile.compile(pypath, cfile=pycpath)
            except Exception as e:
                raise TinCanError("{0}: error compiling".format(pypath)) from e
        try:
            self._mangled = self._manage_module()
            spec = importlib.util.spec_from_file_location(_mangle(self._name), cpath)
            mod =  importlib.util.module_from_spec(spec)
            spec.loader.exec_module(mod)
        except Exception as e:
            raise TinCanError("{0}: error importing".format(pycpath)) from e
        self._class = None
        for i in dir(mod):
            v = getattr(mod, i)
            if issubclass(v, Page):
                if self._class is not None:
                    raise TinCanError("{0}: contains multiple Page classes", pypath)
                self._class = v
        # Build body object (Chameleon template)
        if self._header.template is not None:
            tpath = os.path.join(self._fsroot, *self._splitpath(self._header.template))
            tfile = TemplateFile(tpath)
            self._body = self._tclass(source=tfile.body)
        else:
            self._body = self._tclass(source=self._template.body)
        self._body.prepare()
        # Register this thing with Bottle
        print("adding route:", self._origin)  # debug
        self._app.route(self._origin, methods, self)

    def _splitpath(self, unsplit):
        return _normpath(self._subdir, unsplit)

    def _redirect(self):
        if self._header.forward in self._seen:
            raise TinCanError("{0}: #forward loop".format(self._origin))
        self._seen.add(self._header.forward)
        try:
            rlist = self._splitpath(self._header.forward)
            rname = rlist.pop()
        except IndexError as e:
            raise TinCanError("{0}: invalid #forward".format(self._urlpath)) from e
        name, ext = os.path.splitext(rname)[1]
        if ext != EXTENSION:
            raise TinCanError("{0}: invalid #forward".format(self._urlpath))
        self._subdir = rlist
        self._python = name + ".py"
        self._fspath = os.path.join(self._fsroot, *self._subdir, rname)
        self._urlpath = self._urljoin(*self._subdir, rname)

    def _urljoin(self, *args):
        args = list(args)
        if args[0] == '/':
            args[0] = ''
        return '/'.join(args)

    def __call__(self):
        """
        This gets called by the framework AFTER the page is launched.
        """
        target = None
        try:
            obj = self._class(bottle.request, bottle.response)
            obj.handle()
            return self._body.render(obj.export())
        except ForwardException as fwd:
            target = fwd.target
        if target is None:
            raise TinCanError("Unexpected null target!")
        # We get here if we are doing a server-side programmatic
        # forward.
        environ = bottle.request.environ
        if _FLOOP not in environ:
            environ[_FLOOP] = set()
        if _FORIG not in environ:
            environ[_FORIG] = self._urlpath
        elif target in environ[_FLOOP]:
            TinCanError("{0}: forward loop detected".format(environ[_FORIG]))
        environ[_FLOOP].add(target)
        environ['bottle.raw_path'] = target
        environ['PATH_INFO'] = urllib.parse.quote(target)
        route, args = self._app.router.match(environ)
        environ['route.handle'] = environ['bottle.route'] = route
        environ['route.url_args'] = args
        return route.call(**args)

    def _mkdict(self, obj):
        ret = {}
        for name in dir(obj):
            if name.startswith('_'):
                continue
            value = getattr(obj, name)
            if not callable(value):
                ret[name] = value
        return ret