view workspace.py @ 2:8884b0bf779d

Improve the efficiency.
author David Barts <n5jrn@me.com>
date Thu, 26 Dec 2019 13:18:53 -0800
parents 173e86601dbc
children 091c03f1b2e8
line wrap: on
line source

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# A class that implements a workspace for curly-quoting a text. This is enough
# like a string that it can be accessed via subscripts and ranges, and enough
# like a TextIOBase object that it can be written to much like a stream.
# (However, a Workspace is neither a string nor a TextIOBase object.)
#
# The advantage of using UTF-16 (as we do here) is that all quotation marks
# of interest are represented in a single 16-bit value, so changing straight
# quotes to curly ones can be accomplished most easily.
#
# It was a deliberate design decision to return empty strings when reading
# out-of-range indices but to throw exceptions when attempting to write
# them, because both decisions made coding easier in other modules.

# I m p o r t s

import os, sys
import io
import codecs

# V a r i a b l e s

# C l a s s e s

class Workspace(object):
    # The most efficient 16-bit one on this platform
    encoding = "UTF-16" + sys.byteorder[0].upper() + "E"
    codec = codecs.lookup(encoding)
    # Errors should never happen; UTF-16 can represent all Unicode characters
    errors = 'strict'

    def __init__(self, initial_data=None):
        """
        Constructor.
        """
        self._length = 0
        if initial_data is not None:
            data = initial_data.encode(self.encoding, self.errors)
            self._fp = io.BytesIO(data)
        else:
            self._fp = io.BytesIO()

    def close(self):
        """
        Causes our buffer to be discarded and this workspace to become
        unusable.
        """
        self._fp.close()

    def flush(self):
        """
        Does nothing, but allowed.
        """
        pass

    def seek(self, offset, whence=io.SEEK_SET):
        """
        Seeks to an absolute position.
        """
        return self._fp.seek(offset, whence)

    def tell(self):
        """
        Returns current position.
        """
        return self._fp.tell()

    def read(self, nchars=None):
        """
        Read characters.
        XXX - might return replacement chars from surrogate fragments.
        """
        if nchars is not None and nchars >= 0:
            nchars *= 2
        return self._fp.read(nchars).decode(self.encoding, "replace")

    def write(self, string):
        """
        Write characters.
        """
        self._fp.write(string.encode(self.encoding, self.errors))

    def __len__(self):
        """
        Length in characters.
        """
        return len(self._fp.getbuffer()) // 2

    def _mapped(self, index):
        if index < 0 or index >= len(self):
            raise IndexError("index {0} out of range".format(index))
        i2 = index * 2
        return slice(i2, i2 + 2)

    def __getitem__(self, key):
        """
        Direct access to a single character or range of characters. We do
        not support negative indices. Return value is based on what's most
        useful for curling quotes.
        XXX - might return replacement chars from surrogate fragments.
        """
        if isinstance(key, int):
            try:
                key = self._mapped(key)
            except IndexError:
                return ""
        elif isinstance(key, slice):
            if key.step is not None:
                raise ValueError("__getitem__ does not support steps in slices")
            length = len(self)
            start = 0 if key.start is None else key.start
            stop = length if key.stop is None else key.stop
            start = max(0, min(length - 1, start))
            stop = max(0, min(length, stop))
            if stop <= start:
                return ""
            key = slice(start * 2, stop * 2)
        else:
            raise TypeError("__getitem__ only supports integers and slices")
        return self.codec.decode(self._fp.getbuffer()[key], "replace")[0]

    def __setitem__(self, key, value):
        """
        Direct access to a single character. We do not support negative
        indices or replacing more than a single character at a time.
        XXX - only works on characters in the BMP.
        """
        if not isinstance(key, int):
            raise TypeError("__setitem__ only supports integers")
        if not value:
            return
        encoded = value[0].encode(self.encoding, self.errors)
        if len(encoded) != 2:
            raise ValueError("{0!r} not in BMP".format(value[0]))
        self._fp.getbuffer()[self._mapped(key)] = encoded

    def __del__(self):
        """
        Equivalent to .close().
        """
        self.close()

    def getvalue(self):
        """
        Gets the string represented by this workspace.
        """
        return self.codec.decode(self._fp.getbuffer(), self.errors)[0]

    def __enter__(self):
        """
        Context manager.
        """
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Context manager: close on exit.
        """
        self.close()
        return False

class Bounds(object):
    def __init__(self, start, stop):
        if start > stop or start < 0 or stop < 0:
            raise ValueError("invalid bounds")
        self.start = int(start)
        self.stop = int(stop)

    @classmethod
    def from_object(cls, obj):
        if isinstance(obj, slice):
            return self(slice.start, slice.stop)
        return self(obj[0], obj[1])

    def __lt__(self, other):
        return self.start < other.start

    def __le__(self, other):
        return self.start <= other.start

    def __eq__(self, other):
        return self.start == other.start

    def __ne__(self, other):
        return self.start != other.start

    def __gt__(self, other):
        return self.start > other.start

    def __ge__(self, other):
        return self.start >= other.start

    def __contains__(self, scalar):
        return self.start <= scalar < self.stop

    def __repr__(self):
        return "{0}({1!r}, {2!r})".format(self.__class__.__name__, self.start, self.stop)

class Mapping(object):
    def __init__(self, bounds, offset):
        if not isinstance(bounds, Bounds):
            raise TypeError("bounds must be a Bounds object")
        if not isinstance(offset, int):
            raise TypeError("offset must be an int")
        self.bounds = bounds
        self.offset = offset
        self.delta = self.offset - self.bounds.start

    def __repr__(self):
        return "{0}({1!r}, {2!r})".format(self.__class__.__name__, self.bounds, self.offset)

class SegmentedView(object):
    """
    Implements a view on a subscriptable object. The view is composed of
    zero or more segments of the source object. Has the same idiosyncratic
    behavior for out-of-bounds indices that Workspace has (and for the
    same reason).
    """
    def __init__(self, indexable, bounds):
        self.indexable = indexable
        self._mmap = [ Mapping(Bounds(0, 0), 0) ]
        pos = 0
        for r in sorted(bounds):
            if pos is not None and r.start <= pos and r.stop > pos:
                # merge ranges
                self._mmap[-1].bounds.stop = r.stop
                pos = r.stop
                continue
            opos = pos
            pos += r.stop - r.start
            self._mmap.append(Mapping(Bounds(opos, pos), r.start))
        self._length = pos

    def _mapped(self, index):
        mmap_index = self._binsch(index)
        if mmap_index is None:
            raise IndexError("index {0} out of range".format(index))
        return index + self._mmap[mmap_index].delta

    def _binsch(self, index):
        a = 0
        z = len(self._mmap) - 1
        while a <= z:
            m = (a + z) // 2
            if index in self._mmap[m].bounds:
                return m
            if index < self._mmap[m].bounds.start:
                z = m - 1
            else:
                assert index >= self._mmap[m].bounds.stop
                a = m + 1
        return None

    def __setitem__(self, key, value):
        if not isinstance(key, int):
            raise TypeError("__setitem__ only supports integers")
        self.indexable[self._mapped(key)] = value

    # XXX - this is sorta brute-forced and could be more efficient
    def __getitem__(self, key):
        # Trivial cases
        if isinstance(key, int):
            return self._get1(key)
        if not isinstance(key, slice):
            raise TypeError("expecting int or slice")
        if key.step is not None:
            raise ValueError("__getitem__ does not support steps in slices")

        # Loop up the starting segment.
        mi = self._binsch(key.start)
        if mi is None:
            return ""
        m = self._mmap[mi]

        # Horray! There's only one segment, so we can optimize.
        if key.stop <= m.bounds.stop:
            start = key.start + m.delta
            stop = key.stop + m.delta
            return self.indexable[start:stop]

        # The most involved (multi-segment) case.
        with io.StringIO() as buf:
            for m in self._mmap[mi:]:
                if m.bounds.start >= key.stop:
                    break
                start = max(key.start, m.bounds.start) + m.delta
                stop = min(key.stop, m.bounds.stop) + m.delta
                buf.write(self.indexable[start:stop])
            return buf.getvalue()

    def __len__(self):
        return self._length

    def _get1(self, index):
        try:
            return self.indexable[self._mapped(index)]
        except IndexError:
            return ""

    def getvalue(self):
        return self[0:len(self)]