blob: edec132975d95602c261c56fe59183204cc70bdf [file] [log] [blame]
import codecs
import re
import types
import sys
from constants import EOF, spaceCharacters, asciiLetters, asciiUppercase
from constants import encodings, ReparseException
import utils
#Non-unicode versions of constants for use in the pre-parser
spaceCharactersBytes = frozenset([str(item) for item in spaceCharacters])
asciiLettersBytes = frozenset([str(item) for item in asciiLetters])
asciiUppercaseBytes = frozenset([str(item) for item in asciiUppercase])
spacesAngleBrackets = spaceCharactersBytes | frozenset([">", "<"])
invalid_unicode_re = re.compile(u"[\u0001-\u0008\u000B\u000E-\u001F\u007F-\u009F\uD800-\uDFFF\uFDD0-\uFDEF\uFFFE\uFFFF\U0001FFFE\U0001FFFF\U0002FFFE\U0002FFFF\U0003FFFE\U0003FFFF\U0004FFFE\U0004FFFF\U0005FFFE\U0005FFFF\U0006FFFE\U0006FFFF\U0007FFFE\U0007FFFF\U0008FFFE\U0008FFFF\U0009FFFE\U0009FFFF\U000AFFFE\U000AFFFF\U000BFFFE\U000BFFFF\U000CFFFE\U000CFFFF\U000DFFFE\U000DFFFF\U000EFFFE\U000EFFFF\U000FFFFE\U000FFFFF\U0010FFFE\U0010FFFF]")
non_bmp_invalid_codepoints = set([0x1FFFE, 0x1FFFF, 0x2FFFE, 0x2FFFF, 0x3FFFE,
0x3FFFF, 0x4FFFE, 0x4FFFF, 0x5FFFE, 0x5FFFF,
0x6FFFE, 0x6FFFF, 0x7FFFE, 0x7FFFF, 0x8FFFE,
0x8FFFF, 0x9FFFE, 0x9FFFF, 0xAFFFE, 0xAFFFF,
0xBFFFE, 0xBFFFF, 0xCFFFE, 0xCFFFF, 0xDFFFE,
0xDFFFF, 0xEFFFE, 0xEFFFF, 0xFFFFE, 0xFFFFF,
0x10FFFE, 0x10FFFF])
ascii_punctuation_re = re.compile(ur"[\u0009-\u000D\u0020-\u002F\u003A-\u0040\u005B-\u0060\u007B-\u007E]")
# Cache for charsUntil()
charsUntilRegEx = {}
class BufferedStream:
"""Buffering for streams that do not have buffering of their own
The buffer is implemented as a list of chunks on the assumption that
joining many strings will be slow since it is O(n**2)
"""
def __init__(self, stream):
self.stream = stream
self.buffer = []
self.position = [-1,0] #chunk number, offset
def tell(self):
pos = 0
for chunk in self.buffer[:self.position[0]]:
pos += len(chunk)
pos += self.position[1]
return pos
def seek(self, pos):
assert pos < self._bufferedBytes()
offset = pos
i = 0
while len(self.buffer[i]) < offset:
offset -= pos
i += 1
self.position = [i, offset]
def read(self, bytes):
if not self.buffer:
return self._readStream(bytes)
elif (self.position[0] == len(self.buffer) and
self.position[1] == len(self.buffer[-1])):
return self._readStream(bytes)
else:
return self._readFromBuffer(bytes)
def _bufferedBytes(self):
return sum([len(item) for item in self.buffer])
def _readStream(self, bytes):
data = self.stream.read(bytes)
self.buffer.append(data)
self.position[0] += 1
self.position[1] = len(data)
return data
def _readFromBuffer(self, bytes):
remainingBytes = bytes
rv = []
bufferIndex = self.position[0]
bufferOffset = self.position[1]
while bufferIndex < len(self.buffer) and remainingBytes != 0:
assert remainingBytes > 0
bufferedData = self.buffer[bufferIndex]
if remainingBytes <= len(bufferedData) - bufferOffset:
bytesToRead = remainingBytes
self.position = [bufferIndex, bufferOffset + bytesToRead]
else:
bytesToRead = len(bufferedData) - bufferOffset
self.position = [bufferIndex, len(bufferedData)]
bufferIndex += 1
data = rv.append(bufferedData[bufferOffset:
bufferOffset + bytesToRead])
remainingBytes -= bytesToRead
bufferOffset = 0
if remainingBytes:
rv.append(self._readStream(remainingBytes))
return "".join(rv)
class HTMLInputStream:
"""Provides a unicode stream of characters to the HTMLTokenizer.
This class takes care of character encoding and removing or replacing
incorrect byte-sequences and also provides column and line tracking.
"""
_defaultChunkSize = 10240
def __init__(self, source, encoding=None, parseMeta=True, chardet=True):
"""Initialises the HTMLInputStream.
HTMLInputStream(source, [encoding]) -> Normalized stream from source
for use by html5lib.
source can be either a file-object, local filename or a string.
The optional encoding parameter must be a string that indicates
the encoding. If specified, that encoding will be used,
regardless of any BOM or later declaration (such as in a meta
element)
parseMeta - Look for a <meta> element containing encoding information
"""
#Craziness
if len(u"\U0010FFFF") == 1:
self.reportCharacterErrors = self.characterErrorsUCS4
self.replaceCharactersRegexp = re.compile(u"[\uD800-\uDFFF]")
else:
self.reportCharacterErrors = self.characterErrorsUCS2
self.replaceCharactersRegexp = re.compile(u"([\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF])")
# List of where new lines occur
self.newLines = [0]
self.charEncoding = (codecName(encoding), "certain")
# Raw Stream - for unicode objects this will encode to utf-8 and set
# self.charEncoding as appropriate
self.rawStream = self.openStream(source)
# Encoding Information
#Number of bytes to use when looking for a meta element with
#encoding information
self.numBytesMeta = 512
#Number of bytes to use when using detecting encoding using chardet
self.numBytesChardet = 100
#Encoding to use if no other information can be found
self.defaultEncoding = "windows-1252"
#Detect encoding iff no explicit "transport level" encoding is supplied
if (self.charEncoding[0] is None):
self.charEncoding = self.detectEncoding(parseMeta, chardet)
self.reset()
def reset(self):
self.dataStream = codecs.getreader(self.charEncoding[0])(self.rawStream,
'replace')
self.chunk = u""
self.chunkSize = 0
self.chunkOffset = 0
self.errors = []
# number of (complete) lines in previous chunks
self.prevNumLines = 0
# number of columns in the last line of the previous chunk
self.prevNumCols = 0
#Deal with CR LF and surrogates split over chunk boundaries
self._bufferedCharacter = None
def openStream(self, source):
"""Produces a file object from source.
source can be either a file object, local filename or a string.
"""
# Already a file object
if hasattr(source, 'read'):
stream = source
else:
# Otherwise treat source as a string and convert to a file object
if isinstance(source, unicode):
source = source.encode('utf-8')
self.charEncoding = ("utf-8", "certain")
try:
from io import BytesIO
except:
# 2to3 converts this line to: from io import StringIO
from cStringIO import StringIO as BytesIO
stream = BytesIO(source)
if (not(hasattr(stream, "tell") and hasattr(stream, "seek")) or
stream is sys.stdin):
stream = BufferedStream(stream)
return stream
def detectEncoding(self, parseMeta=True, chardet=True):
#First look for a BOM
#This will also read past the BOM if present
encoding = self.detectBOM()
confidence = "certain"
#If there is no BOM need to look for meta elements with encoding
#information
if encoding is None and parseMeta:
encoding = self.detectEncodingMeta()
confidence = "tentative"
#Guess with chardet, if avaliable
if encoding is None and chardet:
confidence = "tentative"
try:
from chardet.universaldetector import UniversalDetector
buffers = []
detector = UniversalDetector()
while not detector.done:
buffer = self.rawStream.read(self.numBytesChardet)
if not buffer:
break
buffers.append(buffer)
detector.feed(buffer)
detector.close()
encoding = detector.result['encoding']
self.rawStream.seek(0)
except ImportError:
pass
# If all else fails use the default encoding
if encoding is None:
confidence="tentative"
encoding = self.defaultEncoding
#Substitute for equivalent encodings:
encodingSub = {"iso-8859-1":"windows-1252"}
if encoding.lower() in encodingSub:
encoding = encodingSub[encoding.lower()]
return encoding, confidence
def changeEncoding(self, newEncoding):
newEncoding = codecName(newEncoding)
if newEncoding in ("utf-16", "utf-16-be", "utf-16-le"):
newEncoding = "utf-8"
if newEncoding is None:
return
elif newEncoding == self.charEncoding[0]:
self.charEncoding = (self.charEncoding[0], "certain")
else:
self.rawStream.seek(0)
self.reset()
self.charEncoding = (newEncoding, "certain")
raise ReparseException, "Encoding changed from %s to %s"%(self.charEncoding[0], newEncoding)
def detectBOM(self):
"""Attempts to detect at BOM at the start of the stream. If
an encoding can be determined from the BOM return the name of the
encoding otherwise return None"""
bomDict = {
codecs.BOM_UTF8: 'utf-8',
codecs.BOM_UTF16_LE: 'utf-16-le', codecs.BOM_UTF16_BE: 'utf-16-be',
codecs.BOM_UTF32_LE: 'utf-32-le', codecs.BOM_UTF32_BE: 'utf-32-be'
}
# Go to beginning of file and read in 4 bytes
string = self.rawStream.read(4)
# Try detecting the BOM using bytes from the string
encoding = bomDict.get(string[:3]) # UTF-8
seek = 3
if not encoding:
# Need to detect UTF-32 before UTF-16
encoding = bomDict.get(string) # UTF-32
seek = 4
if not encoding:
encoding = bomDict.get(string[:2]) # UTF-16
seek = 2
# Set the read position past the BOM if one was found, otherwise
# set it to the start of the stream
self.rawStream.seek(encoding and seek or 0)
return encoding
def detectEncodingMeta(self):
"""Report the encoding declared by the meta element
"""
buffer = self.rawStream.read(self.numBytesMeta)
parser = EncodingParser(buffer)
self.rawStream.seek(0)
encoding = parser.getEncoding()
if encoding in ("utf-16", "utf-16-be", "utf-16-le"):
encoding = "utf-8"
return encoding
def _position(self, offset):
chunk = self.chunk
nLines = chunk.count(u'\n', 0, offset)
positionLine = self.prevNumLines + nLines
lastLinePos = chunk.rfind(u'\n', 0, offset)
if lastLinePos == -1:
positionColumn = self.prevNumCols + offset
else:
positionColumn = offset - (lastLinePos + 1)
return (positionLine, positionColumn)
def position(self):
"""Returns (line, col) of the current position in the stream."""
line, col = self._position(self.chunkOffset)
return (line+1, col)
def char(self):
""" Read one character from the stream or queue if available. Return
EOF when EOF is reached.
"""
# Read a new chunk from the input stream if necessary
if self.chunkOffset >= self.chunkSize:
if not self.readChunk():
return EOF
chunkOffset = self.chunkOffset
char = self.chunk[chunkOffset]
self.chunkOffset = chunkOffset + 1
return char
def readChunk(self, chunkSize=None):
if chunkSize is None:
chunkSize = self._defaultChunkSize
self.prevNumLines, self.prevNumCols = self._position(self.chunkSize)
self.chunk = u""
self.chunkSize = 0
self.chunkOffset = 0
data = self.dataStream.read(chunkSize)
#Deal with CR LF and surrogates broken across chunks
if self._bufferedCharacter:
data = self._bufferedCharacter + data
self._bufferedCharacter = None
elif not data:
# We have no more data, bye-bye stream
return False
if len(data) > 1:
lastv = ord(data[-1])
if lastv == 0x0D or 0xD800 <= lastv <= 0xDBFF:
self._bufferedCharacter = data[-1]
data = data[:-1]
self.reportCharacterErrors(data)
# Replace invalid characters
# Note U+0000 is dealt with in the tokenizer
data = self.replaceCharactersRegexp.sub(u"\ufffd", data)
data = data.replace(u"\r\n", u"\n")
data = data.replace(u"\r", u"\n")
self.chunk = data
self.chunkSize = len(data)
return True
def characterErrorsUCS4(self, data):
for i in xrange(len(invalid_unicode_re.findall(data))):
self.errors.append("invalid-codepoint")
def characterErrorsUCS2(self, data):
#Someone picked the wrong compile option
#You lose
skip = False
import sys
for match in invalid_unicode_re.finditer(data):
if skip:
continue
codepoint = ord(match.group())
pos = match.start()
#Pretty sure there should be endianness issues here
if utils.isSurrogatePair(data[pos:pos+2]):
#We have a surrogate pair!
char_val = utils.surrogatePairToCodepoint(data[pos:pos+2])
if char_val in non_bmp_invalid_codepoints:
self.errors.append("invalid-codepoint")
skip = True
elif (codepoint >= 0xD800 and codepoint <= 0xDFFF and
pos == len(data) - 1):
self.errors.append("invalid-codepoint")
else:
skip = False
self.errors.append("invalid-codepoint")
def charsUntil(self, characters, opposite = False):
""" Returns a string of characters from the stream up to but not
including any character in 'characters' or EOF. 'characters' must be
a container that supports the 'in' method and iteration over its
characters.
"""
# Use a cache of regexps to find the required characters
try:
chars = charsUntilRegEx[(characters, opposite)]
except KeyError:
if __debug__:
for c in characters:
assert(ord(c) < 128)
regex = u"".join([u"\\x%02x" % ord(c) for c in characters])
if not opposite:
regex = u"^%s" % regex
chars = charsUntilRegEx[(characters, opposite)] = re.compile(u"[%s]+" % regex)
rv = []
while True:
# Find the longest matching prefix
m = chars.match(self.chunk, self.chunkOffset)
if m is None:
# If nothing matched, and it wasn't because we ran out of chunk,
# then stop
if self.chunkOffset != self.chunkSize:
break
else:
end = m.end()
# If not the whole chunk matched, return everything
# up to the part that didn't match
if end != self.chunkSize:
rv.append(self.chunk[self.chunkOffset:end])
self.chunkOffset = end
break
# If the whole remainder of the chunk matched,
# use it all and read the next chunk
rv.append(self.chunk[self.chunkOffset:])
if not self.readChunk():
# Reached EOF
break
r = u"".join(rv)
return r
def unget(self, char):
# Only one character is allowed to be ungotten at once - it must
# be consumed again before any further call to unget
if char is not None:
if self.chunkOffset == 0:
# unget is called quite rarely, so it's a good idea to do
# more work here if it saves a bit of work in the frequently
# called char and charsUntil.
# So, just prepend the ungotten character onto the current
# chunk:
self.chunk = char + self.chunk
self.chunkSize += 1
else:
self.chunkOffset -= 1
assert self.chunk[self.chunkOffset] == char
class EncodingBytes(str):
"""String-like object with an associated position and various extra methods
If the position is ever greater than the string length then an exception is
raised"""
def __new__(self, value):
return str.__new__(self, value.lower())
def __init__(self, value):
self._position=-1
def __iter__(self):
return self
def next(self):
p = self._position = self._position + 1
if p >= len(self):
raise StopIteration
elif p < 0:
raise TypeError
return self[p]
def previous(self):
p = self._position
if p >= len(self):
raise StopIteration
elif p < 0:
raise TypeError
self._position = p = p - 1
return self[p]
def setPosition(self, position):
if self._position >= len(self):
raise StopIteration
self._position = position
def getPosition(self):
if self._position >= len(self):
raise StopIteration
if self._position >= 0:
return self._position
else:
return None
position = property(getPosition, setPosition)
def getCurrentByte(self):
return self[self.position]
currentByte = property(getCurrentByte)
def skip(self, chars=spaceCharactersBytes):
"""Skip past a list of characters"""
p = self.position # use property for the error-checking
while p < len(self):
c = self[p]
if c not in chars:
self._position = p
return c
p += 1
self._position = p
return None
def skipUntil(self, chars):
p = self.position
while p < len(self):
c = self[p]
if c in chars:
self._position = p
return c
p += 1
self._position = p
return None
def matchBytes(self, bytes):
"""Look for a sequence of bytes at the start of a string. If the bytes
are found return True and advance the position to the byte after the
match. Otherwise return False and leave the position alone"""
p = self.position
data = self[p:p+len(bytes)]
rv = data.startswith(bytes)
if rv:
self.position += len(bytes)
return rv
def jumpTo(self, bytes):
"""Look for the next sequence of bytes matching a given sequence. If
a match is found advance the position to the last byte of the match"""
newPosition = self[self.position:].find(bytes)
if newPosition > -1:
# XXX: This is ugly, but I can't see a nicer way to fix this.
if self._position == -1:
self._position = 0
self._position += (newPosition + len(bytes)-1)
return True
else:
raise StopIteration
class EncodingParser(object):
"""Mini parser for detecting character encoding from meta elements"""
def __init__(self, data):
"""string - the data to work on for encoding detection"""
self.data = EncodingBytes(data)
self.encoding = None
def getEncoding(self):
methodDispatch = (
("<!--",self.handleComment),
("<meta",self.handleMeta),
("</",self.handlePossibleEndTag),
("<!",self.handleOther),
("<?",self.handleOther),
("<",self.handlePossibleStartTag))
for byte in self.data:
keepParsing = True
for key, method in methodDispatch:
if self.data.matchBytes(key):
try:
keepParsing = method()
break
except StopIteration:
keepParsing=False
break
if not keepParsing:
break
return self.encoding
def handleComment(self):
"""Skip over comments"""
return self.data.jumpTo("-->")
def handleMeta(self):
if self.data.currentByte not in spaceCharactersBytes:
#if we have <meta not followed by a space so just keep going
return True
#We have a valid meta element we want to search for attributes
while True:
#Try to find the next attribute after the current position
attr = self.getAttribute()
if attr is None:
return True
else:
if attr[0] == "charset":
tentativeEncoding = attr[1]
codec = codecName(tentativeEncoding)
if codec is not None:
self.encoding = codec
return False
elif attr[0] == "content":
contentParser = ContentAttrParser(EncodingBytes(attr[1]))
tentativeEncoding = contentParser.parse()
codec = codecName(tentativeEncoding)
if codec is not None:
self.encoding = codec
return False
def handlePossibleStartTag(self):
return self.handlePossibleTag(False)
def handlePossibleEndTag(self):
self.data.next()
return self.handlePossibleTag(True)
def handlePossibleTag(self, endTag):
data = self.data
if data.currentByte not in asciiLettersBytes:
#If the next byte is not an ascii letter either ignore this
#fragment (possible start tag case) or treat it according to
#handleOther
if endTag:
data.previous()
self.handleOther()
return True
c = data.skipUntil(spacesAngleBrackets)
if c == "<":
#return to the first step in the overall "two step" algorithm
#reprocessing the < byte
data.previous()
else:
#Read all attributes
attr = self.getAttribute()
while attr is not None:
attr = self.getAttribute()
return True
def handleOther(self):
return self.data.jumpTo(">")
def getAttribute(self):
"""Return a name,value pair for the next attribute in the stream,
if one is found, or None"""
data = self.data
# Step 1 (skip chars)
c = data.skip(spaceCharactersBytes | frozenset("/"))
# Step 2
if c in (">", None):
return None
# Step 3
attrName = []
attrValue = []
#Step 4 attribute name
while True:
if c == "=" and attrName:
break
elif c in spaceCharactersBytes:
#Step 6!
c = data.skip()
c = data.next()
break
elif c in ("/", ">"):
return "".join(attrName), ""
elif c in asciiUppercaseBytes:
attrName.append(c.lower())
elif c == None:
return None
else:
attrName.append(c)
#Step 5
c = data.next()
#Step 7
if c != "=":
data.previous()
return "".join(attrName), ""
#Step 8
data.next()
#Step 9
c = data.skip()
#Step 10
if c in ("'", '"'):
#10.1
quoteChar = c
while True:
#10.2
c = data.next()
#10.3
if c == quoteChar:
data.next()
return "".join(attrName), "".join(attrValue)
#10.4
elif c in asciiUppercaseBytes:
attrValue.append(c.lower())
#10.5
else:
attrValue.append(c)
elif c == ">":
return "".join(attrName), ""
elif c in asciiUppercaseBytes:
attrValue.append(c.lower())
elif c is None:
return None
else:
attrValue.append(c)
# Step 11
while True:
c = data.next()
if c in spacesAngleBrackets:
return "".join(attrName), "".join(attrValue)
elif c in asciiUppercaseBytes:
attrValue.append(c.lower())
elif c is None:
return None
else:
attrValue.append(c)
class ContentAttrParser(object):
def __init__(self, data):
self.data = data
def parse(self):
try:
#Check if the attr name is charset
#otherwise return
self.data.jumpTo("charset")
self.data.position += 1
self.data.skip()
if not self.data.currentByte == "=":
#If there is no = sign keep looking for attrs
return None
self.data.position += 1
self.data.skip()
#Look for an encoding between matching quote marks
if self.data.currentByte in ('"', "'"):
quoteMark = self.data.currentByte
self.data.position += 1
oldPosition = self.data.position
if self.data.jumpTo(quoteMark):
return self.data[oldPosition:self.data.position]
else:
return None
else:
#Unquoted value
oldPosition = self.data.position
try:
self.data.skipUntil(spaceCharactersBytes)
return self.data[oldPosition:self.data.position]
except StopIteration:
#Return the whole remaining value
return self.data[oldPosition:]
except StopIteration:
return None
def codecName(encoding):
"""Return the python codec name corresponding to an encoding or None if the
string doesn't correspond to a valid encoding."""
if (encoding is not None and type(encoding) in types.StringTypes):
canonicalName = ascii_punctuation_re.sub("", encoding).lower()
return encodings.get(canonicalName, None)
else:
return None