Mercurial > cgi-bin > hgweb.cgi > curlyq
view workspace.py @ 3:091c03f1b2e8
Getting it working...
author | David Barts <n5jrn@me.com> |
---|---|
date | Thu, 26 Dec 2019 19:54:45 -0800 |
parents | 8884b0bf779d |
children | 7a83e82e65a6 |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Classes that implement a workspace for curly-quoting a text, and views # into the same. # I m p o r t s import os, sys import io import codecs # V a r i a b l e s # C l a s s e s # Our workspace class. This is enough like a string that it can be # accessed via subscripts and ranges, and enough like a TextIOBase object # that it can be written to much like a stream. (However, a Workspace is # neither a string nor a TextIOBase object.) # # The advantage of using UTF-16 (as we do here) is that all quotation # marks of interest are represented in a single 16-bit value, so changing # straight quotes to curly ones can be accomplished most easily. # # It was a deliberate design decision to return empty strings when reading # out-of-range indices but to throw exceptions when attempting to write # them, because both decisions made coding easier in other modules. class Workspace(object): """ A workspace for text-processing; a mutable hybrid of a string and an in-memory file. """ # The most efficient 16-bit one on this platform encoding = "UTF-16" + sys.byteorder[0].upper() + "E" codec = codecs.lookup(encoding) # Errors should never happen; UTF-16 can represent all Unicode characters errors = 'strict' def __init__(self, initial_data=None): """ Constructor. """ if initial_data is not None: data = initial_data.encode(self.encoding, self.errors) self._fp = io.BytesIO(data) else: self._fp = io.BytesIO() def close(self): """ Causes our buffer to be discarded and this workspace to become unusable. """ self._fp.close() def flush(self): """ Does nothing, but allowed. """ pass def seek(self, offset, whence=io.SEEK_SET): """ Seeks to an absolute position. """ return self._fp.seek(offset, whence) def tell(self): """ Returns current position. """ return self._fp.tell() def read(self, nchars=None): """ Read characters. XXX - might return replacement chars from surrogate fragments. """ if nchars is not None and nchars >= 0: nchars *= 2 return self._fp.read(nchars).decode(self.encoding, "replace") def write(self, string): """ Write characters. """ self._fp.write(string.encode(self.encoding, self.errors)) def truncate(self, size=None): """ Truncate. XXX - can create a runt surrogate pair """ if size is None: self._fp.truncate(None) else: self._fp.truncate(2 * size) def clear(self): """ Clear this object's contents. """ self.truncate(0) self.seek(0, os.SEEK_SET) def __len__(self): """ Length in characters. """ return len(self._fp.getbuffer()) // 2 def _mapped(self, index): if index < 0 or index >= len(self): raise IndexError("index {0} out of range".format(index)) i2 = index * 2 return slice(i2, i2 + 2) def __getitem__(self, key): """ Direct access to a single character or range of characters. We do not support negative indices. Return value is based on what's most useful for curling quotes. XXX - might return replacement chars from surrogate fragments. """ if isinstance(key, int): try: key = self._mapped(key) except IndexError: return "" elif isinstance(key, slice): if key.step is not None: raise ValueError("__getitem__ does not support steps in slices") length = len(self) start = 0 if key.start is None else key.start stop = length if key.stop is None else key.stop start = max(0, min(length - 1, start)) stop = max(0, min(length, stop)) if stop <= start: return "" key = slice(start * 2, stop * 2) else: raise TypeError("__getitem__ only supports integers and slices") return self.codec.decode(self._fp.getbuffer()[key], "replace")[0] def __setitem__(self, key, value): """ Direct access to a single character. We do not support negative indices or replacing more than a single character at a time. XXX - only works on characters in the BMP. """ if not isinstance(key, int): raise TypeError("__setitem__ only supports integers") if not value: return encoded = value[0].encode(self.encoding, self.errors) if len(encoded) != 2: raise ValueError("{0!r} not in BMP".format(value[0])) self._fp.getbuffer()[self._mapped(key)] = encoded def __del__(self): """ Equivalent to .close(). """ self.close() def getvalue(self): """ Gets the string represented by this workspace. """ return self.codec.decode(self._fp.getbuffer(), self.errors)[0] def __enter__(self): """ Context manager. """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ Context manager: close on exit. """ self.close() return False class Bounds(object): """ A set of index bounds. """ def __init__(self, start, stop): if start > stop or start < 0 or stop < 0: raise ValueError("invalid bounds") self.start = int(start) self.stop = int(stop) @classmethod def from_object(cls, obj): if isinstance(obj, slice): return self(slice.start, slice.stop) return self(obj[0], obj[1]) def __lt__(self, other): return self.start < other.start def __le__(self, other): return self.start <= other.start def __eq__(self, other): return self.start == other.start def __ne__(self, other): return self.start != other.start def __gt__(self, other): return self.start > other.start def __ge__(self, other): return self.start >= other.start def __contains__(self, scalar): return self.start <= scalar < self.stop def __repr__(self): return "{0}({1!r}, {2!r})".format(self.__class__.__name__, self.start, self.stop) class Mapping(object): """ Represents a mapping of a single view segment into an indexable object. """ def __init__(self, bounds, offset): if not isinstance(bounds, Bounds): raise TypeError("bounds must be a Bounds object") if not isinstance(offset, int): raise TypeError("offset must be an int") self.bounds = bounds self.offset = offset self.delta = self.offset - self.bounds.start def __repr__(self): return "{0}({1!r}, {2!r})".format(self.__class__.__name__, self.bounds, self.offset) class SegmentedView(object): """ Implements a view on a subscriptable object. The view is composed of zero or more segments of the source object. Has the same idiosyncratic behavior for out-of-bounds indices that Workspace has (and for the same reason). Mutating this object causes the parent object to also be mutated. """ def __init__(self, indexable, bounds): self.indexable = indexable self._mmap = [ Mapping(Bounds(0, 0), 0) ] pos = 0 for r in sorted(bounds): if pos is not None and r.start <= pos and r.stop > pos: # merge ranges self._mmap[-1].bounds.stop = r.stop pos = r.stop continue opos = pos pos += r.stop - r.start self._mmap.append(Mapping(Bounds(opos, pos), r.start)) self._length = pos def _mapped(self, index): mmap_index = self._binsch(index) if mmap_index is None: raise IndexError("index {0} out of range".format(index)) return index + self._mmap[mmap_index].delta def _binsch(self, index): a = 0 z = len(self._mmap) - 1 while a <= z: m = (a + z) // 2 if index in self._mmap[m].bounds: return m if index < self._mmap[m].bounds.start: z = m - 1 else: assert index >= self._mmap[m].bounds.stop a = m + 1 return None def __setitem__(self, key, value): """ Direct access to replace a single character. """ if not isinstance(key, int): raise TypeError("__setitem__ only supports integers") self.indexable[self._mapped(key)] = value def __getitem__(self, key): """ Direct access to a single character or range of characters. """ # Trivial cases if isinstance(key, int): return self._get1(key) if not isinstance(key, slice): raise TypeError("expecting int or slice") if key.step is not None: raise ValueError("__getitem__ does not support steps in slices") # Loop up the starting segment. mi = self._binsch(key.start) if mi is None: return "" m = self._mmap[mi] # Horray! There's only one segment, so we can optimize. if key.stop <= m.bounds.stop: start = key.start + m.delta stop = key.stop + m.delta return self.indexable[start:stop] # The most involved (multi-segment) case. with io.StringIO() as buf: for m in self._mmap[mi:]: if m.bounds.start >= key.stop: break start = max(key.start, m.bounds.start) + m.delta stop = min(key.stop, m.bounds.stop) + m.delta buf.write(self.indexable[start:stop]) return buf.getvalue() def __len__(self): return self._length def _get1(self, index): try: return self.indexable[self._mapped(index)] except IndexError: return "" def getvalue(self): return self[0:len(self)]