From ef7a80865f253852f7c7b01f04bc02b695ead67b Mon Sep 17 00:00:00 2001 From: Thibaut Horel Date: Sun, 29 Sep 2013 17:48:14 -0400 Subject: Giving up on Python 2.5 support --- requests/utils.py | 518 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 344 insertions(+), 174 deletions(-) (limited to 'requests/utils.py') diff --git a/requests/utils.py b/requests/utils.py index 95dea4b..3ec6131 100644 --- a/requests/utils.py +++ b/requests/utils.py @@ -11,21 +11,137 @@ that are also useful for external consumption. import cgi import codecs -import cookielib +import collections import os -import random +import platform import re -import zlib -import urllib +import sys +from netrc import netrc, NetrcParseError -from urllib2 import parse_http_list as _parse_list_header +from . import __version__ +from . import certs +from .compat import parse_http_list as _parse_list_header +from .compat import (quote, urlparse, bytes, str, OrderedDict, urlunparse, + is_py2, is_py3, builtin_str, getproxies, proxy_bypass) +from .cookies import RequestsCookieJar, cookiejar_from_dict +from .structures import CaseInsensitiveDict +from .exceptions import MissingSchema, InvalidURL + +_hush_pyflakes = (RequestsCookieJar,) + +NETRC_FILES = ('.netrc', '_netrc') + +DEFAULT_CA_BUNDLE_PATH = certs.where() + + +def dict_to_sequence(d): + """Returns an internal sequence dictionary update.""" + + if hasattr(d, 'items'): + d = d.items() + + return d + + +def super_len(o): + if hasattr(o, '__len__'): + return len(o) + if hasattr(o, 'len'): + return o.len + if hasattr(o, 'fileno'): + return os.fstat(o.fileno()).st_size + + +def get_netrc_auth(url): + """Returns the Requests tuple auth for a given url from netrc.""" + + try: + locations = (os.path.expanduser('~/{0}'.format(f)) for f in NETRC_FILES) + netrc_path = None + + for loc in locations: + if os.path.exists(loc) and not netrc_path: + netrc_path = loc + + # Abort early if there isn't one. + if netrc_path is None: + return netrc_path + + ri = urlparse(url) + + # Strip port numbers from netloc + host = ri.netloc.split(':')[0] + + try: + _netrc = netrc(netrc_path).authenticators(host) + if _netrc: + # Return with login / password + login_i = (0 if _netrc[0] else 1) + return (_netrc[login_i], _netrc[2]) + except (NetrcParseError, IOError): + # If there was a parsing error or a permissions issue reading the file, + # we'll just skip netrc auth + pass + + # AppEngine hackiness. + except (ImportError, AttributeError): + pass def guess_filename(obj): """Tries to guess the filename of the given object.""" name = getattr(obj, 'name', None) if name and name[0] != '<' and name[-1] != '>': - return name + return os.path.basename(name) + + +def from_key_val_list(value): + """Take an object and test to see if it can be represented as a + dictionary. Unless it can not be represented as such, return an + OrderedDict, e.g., + + :: + + >>> from_key_val_list([('key', 'val')]) + OrderedDict([('key', 'val')]) + >>> from_key_val_list('string') + ValueError: need more than 1 value to unpack + >>> from_key_val_list({'key': 'val'}) + OrderedDict([('key', 'val')]) + """ + if value is None: + return None + + if isinstance(value, (str, bytes, bool, int)): + raise ValueError('cannot encode objects that are not 2-tuples') + + return OrderedDict(value) + + +def to_key_val_list(value): + """Take an object and test to see if it can be represented as a + dictionary. If it can be, return a list of tuples, e.g., + + :: + + >>> to_key_val_list([('key', 'val')]) + [('key', 'val')] + >>> to_key_val_list({'key': 'val'}) + [('key', 'val')] + >>> to_key_val_list('string') + ValueError: cannot encode objects that are not 2-tuples. + """ + if value is None: + return None + + if isinstance(value, (str, bytes, bool, int)): + raise ValueError('cannot encode objects that are not 2-tuples') + + if isinstance(value, collections.Mapping): + value = value.items() + + return list(value) + # From mitsuhiko/werkzeug (used with permission). def parse_list_header(value): @@ -117,68 +233,6 @@ def unquote_header_value(value, is_filename=False): return value -def header_expand(headers): - """Returns an HTTP Header value string from a dictionary. - - Example expansion:: - - {'text/x-dvi': {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}, 'text/x-c': {}} - # Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c - - (('text/x-dvi', {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}), ('text/x-c', {})) - # Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c - """ - - collector = [] - - if isinstance(headers, dict): - headers = headers.items() - - elif isinstance(headers, basestring): - return headers - - for i, (value, params) in enumerate(headers): - - _params = [] - - for (p_k, p_v) in params.items(): - - _params.append('%s=%s' % (p_k, p_v)) - - collector.append(value) - collector.append('; ') - - if len(params): - - collector.append('; '.join(_params)) - - if not len(headers) == i+1: - collector.append(', ') - - - # Remove trailing separators. - if collector[-1] in (', ', '; '): - del collector[-1] - - return ''.join(collector) - - - -def randombytes(n): - """Return n random bytes.""" - # Use /dev/urandom if it is available. Fall back to random module - # if not. It might be worthwhile to extend this function to use - # other platform-specific mechanisms for getting random bytes. - if os.path.exists("/dev/urandom"): - f = open("/dev/urandom") - s = f.read(n) - f.close() - return s - else: - L = [chr(random.randrange(0, 256)) for i in range(n)] - return "".join(L) - - def dict_from_cookiejar(cj): """Returns a key/value dictionary from a CookieJar. @@ -187,33 +241,12 @@ def dict_from_cookiejar(cj): cookie_dict = {} - for _, cookies in cj._cookies.items(): - for _, cookies in cookies.items(): - for cookie in cookies.values(): - # print cookie - cookie_dict[cookie.name] = cookie.value + for cookie in cj: + cookie_dict[cookie.name] = cookie.value return cookie_dict -def cookiejar_from_dict(cookie_dict): - """Returns a CookieJar from a key/value dictionary. - - :param cookie_dict: Dict of key/values to insert into CookieJar. - """ - - # return cookiejar if one was passed in - if isinstance(cookie_dict, cookielib.CookieJar): - return cookie_dict - - # create cookiejar - cj = cookielib.CookieJar() - - cj = add_dict_to_cookiejar(cj, cookie_dict) - - return cj - - def add_dict_to_cookiejar(cj, cookie_dict): """Returns a CookieJar from a key/value dictionary. @@ -221,31 +254,8 @@ def add_dict_to_cookiejar(cj, cookie_dict): :param cookie_dict: Dict of key/values to insert into CookieJar. """ - for k, v in cookie_dict.items(): - - cookie = cookielib.Cookie( - version=0, - name=k, - value=v, - port=None, - port_specified=False, - domain='', - domain_specified=False, - domain_initial_dot=False, - path='/', - path_specified=True, - secure=False, - expires=None, - discard=True, - comment=None, - comment_url=None, - rest={'HttpOnly': None}, - rfc2109=False - ) - - # add cookie to cookiejar - cj.set_cookie(cookie) - + cj2 = cookiejar_from_dict(cookie_dict) + cj.update(cj2) return cj @@ -256,8 +266,12 @@ def get_encodings_from_content(content): """ charset_re = re.compile(r']', flags=re.I) + pragma_re = re.compile(r']', flags=re.I) + xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') - return charset_re.findall(content) + return (charset_re.findall(content) + + pragma_re.findall(content) + + xml_re.findall(content)) def get_encoding_from_headers(headers): @@ -280,23 +294,6 @@ def get_encoding_from_headers(headers): return 'ISO-8859-1' -def unicode_from_html(content): - """Attempts to decode an HTML string into unicode. - If unsuccessful, the original content is returned. - """ - - encodings = get_encodings_from_content(content) - - for encoding in encodings: - - try: - return unicode(content, encoding) - except (UnicodeError, TypeError): - pass - - return content - - def stream_decode_response_unicode(iterator, r): """Stream decodes a iterator.""" @@ -310,11 +307,19 @@ def stream_decode_response_unicode(iterator, r): rv = decoder.decode(chunk) if rv: yield rv - rv = decoder.decode('', final=True) + rv = decoder.decode(b'', final=True) if rv: yield rv +def iter_slices(string, slice_length): + """Iterate over slices of a string.""" + pos = 0 + while pos < len(string): + yield string[pos:pos + slice_length] + pos += slice_length + + def get_unicode_from_response(r): """Returns the requested content back in unicode. @@ -337,65 +342,230 @@ def get_unicode_from_response(r): if encoding: try: - return unicode(r.content, encoding) + return str(r.content, encoding) except UnicodeError: tried_encodings.append(encoding) # Fall back: try: - return unicode(r.content, encoding, errors='replace') + return str(r.content, encoding, errors='replace') except TypeError: return r.content -def decode_gzip(content): - """Return gzip-decoded string. +# The unreserved URI characters (RFC 3986) +UNRESERVED_SET = frozenset( + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + + "0123456789-._~") - :param content: bytestring to gzip-decode. + +def unquote_unreserved(uri): + """Un-escape any percent-escape sequences in a URI that are unreserved + characters. This leaves all reserved, illegal and non-ASCII bytes encoded. + """ + parts = uri.split('%') + for i in range(1, len(parts)): + h = parts[i][0:2] + if len(h) == 2 and h.isalnum(): + try: + c = chr(int(h, 16)) + except ValueError: + raise InvalidURL("Invalid percent-escape sequence: '%s'" % h) + + if c in UNRESERVED_SET: + parts[i] = c + parts[i][2:] + else: + parts[i] = '%' + parts[i] + else: + parts[i] = '%' + parts[i] + return ''.join(parts) + + +def requote_uri(uri): + """Re-quote the given URI. + + This function passes the given URI through an unquote/quote cycle to + ensure that it is fully and consistently quoted. """ + # Unquote only the unreserved characters + # Then quote only illegal characters (do not quote reserved, unreserved, + # or '%') + return quote(unquote_unreserved(uri), safe="!#$%&'()*+,/:;=?@[]~") + + +def get_environ_proxies(url): + """Return a dict of environment proxies.""" + + get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper()) + + # First check whether no_proxy is defined. If it is, check that the URL + # we're getting isn't in the no_proxy list. + no_proxy = get_proxy('no_proxy') + netloc = urlparse(url).netloc + + if no_proxy: + # We need to check whether we match here. We need to see if we match + # the end of the netloc, both with and without the port. + no_proxy = no_proxy.replace(' ', '').split(',') + + for host in no_proxy: + if netloc.endswith(host) or netloc.split(':')[0].endswith(host): + # The URL does match something in no_proxy, so we don't want + # to apply the proxies on this URL. + return {} + + # If the system proxy settings indicate that this URL should be bypassed, + # don't proxy. + if proxy_bypass(netloc): + return {} + + # If we get here, we either didn't have no_proxy set or we're not going + # anywhere that no_proxy applies to, and the system settings don't require + # bypassing the proxy for the current URL. + return getproxies() + + +def default_user_agent(): + """Return a string representing the default user agent.""" + _implementation = platform.python_implementation() + + if _implementation == 'CPython': + _implementation_version = platform.python_version() + elif _implementation == 'PyPy': + _implementation_version = '%s.%s.%s' % (sys.pypy_version_info.major, + sys.pypy_version_info.minor, + sys.pypy_version_info.micro) + if sys.pypy_version_info.releaselevel != 'final': + _implementation_version = ''.join([_implementation_version, sys.pypy_version_info.releaselevel]) + elif _implementation == 'Jython': + _implementation_version = platform.python_version() # Complete Guess + elif _implementation == 'IronPython': + _implementation_version = platform.python_version() # Complete Guess + else: + _implementation_version = 'Unknown' + + try: + p_system = platform.system() + p_release = platform.release() + except IOError: + p_system = 'Unknown' + p_release = 'Unknown' + + return " ".join(['python-requests/%s' % __version__, + '%s/%s' % (_implementation, _implementation_version), + '%s/%s' % (p_system, p_release)]) + + +def default_headers(): + return CaseInsensitiveDict({ + 'User-Agent': default_user_agent(), + 'Accept-Encoding': ', '.join(('gzip', 'deflate', 'compress')), + 'Accept': '*/*' + }) + - return zlib.decompress(content, 16 + zlib.MAX_WBITS) +def parse_header_links(value): + """Return a dict of parsed link headers proxies. + i.e. Link: ; rel=front; type="image/jpeg",; rel=back;type="image/jpeg" -def stream_decompress(iterator, mode='gzip'): """ - Stream decodes an iterator over compressed data - :param iterator: An iterator over compressed data - :param mode: 'gzip' or 'deflate' - :return: An iterator over decompressed data + links = [] + + replace_chars = " '\"" + + for val in value.split(","): + try: + url, params = val.split(";", 1) + except ValueError: + url, params = val, '' + + link = {} + + link["url"] = url.strip("<> '\"") + + for param in params.split(";"): + try: + key, value = param.split("=") + except ValueError: + break + + link[key.strip(replace_chars)] = value.strip(replace_chars) + + links.append(link) + + return links + + +# Null bytes; no need to recreate these on each call to guess_json_utf +_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3 +_null2 = _null * 2 +_null3 = _null * 3 + + +def guess_json_utf(data): + # JSON always starts with two ASCII characters, so detection is as + # easy as counting the nulls and from their location and count + # determine the encoding. Also detect a BOM, if present. + sample = data[:4] + if sample in (codecs.BOM_UTF32_LE, codecs.BOM32_BE): + return 'utf-32' # BOM included + if sample[:3] == codecs.BOM_UTF8: + return 'utf-8-sig' # BOM included, MS style (discouraged) + if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): + return 'utf-16' # BOM included + nullcount = sample.count(_null) + if nullcount == 0: + return 'utf-8' + if nullcount == 2: + if sample[::2] == _null2: # 1st and 3rd are null + return 'utf-16-be' + if sample[1::2] == _null2: # 2nd and 4th are null + return 'utf-16-le' + # Did not detect 2 valid UTF-16 ascii-range characters + if nullcount == 3: + if sample[:3] == _null3: + return 'utf-32-be' + if sample[1:] == _null3: + return 'utf-32-le' + # Did not detect a valid UTF-32 ascii-range character + return None + + +def except_on_missing_scheme(url): + """Given a URL, raise a MissingSchema exception if the scheme is missing. """ + scheme, netloc, path, params, query, fragment = urlparse(url) - if mode not in ['gzip', 'deflate']: - raise ValueError('stream_decompress mode must be gzip or deflate') + if not scheme: + raise MissingSchema('Proxy URLs must have explicit schemes.') - zlib_mode = 16 + zlib.MAX_WBITS if mode == 'gzip' else -zlib.MAX_WBITS - dec = zlib.decompressobj(zlib_mode) - try: - for chunk in iterator: - rv = dec.decompress(chunk) - if rv: - yield rv - except zlib.error: - # If there was an error decompressing, just return the raw chunk - yield chunk - # Continue to return the rest of the raw data - for chunk in iterator: - yield chunk - else: - # Make sure everything has been returned from the decompression object - buf = dec.decompress('') - rv = buf + dec.flush() - if rv: - yield rv +def get_auth_from_url(url): + """Given a url with authentication components, extract them into a tuple of + username,password.""" + if url: + parsed = urlparse(url) + return (parsed.username, parsed.password) + else: + return ('', '') -def requote_path(path): - """Re-quote the given URL path component. - This function passes the given path through an unquote/quote cycle to - ensure that it is fully and consistently quoted. +def to_native_string(string, encoding='ascii'): + """ + Given a string object, regardless of type, returns a representation of that + string in the native string type, encoding and decoding where necessary. + This assumes ASCII unless told otherwise. """ - parts = path.split("/") - parts = (urllib.quote(urllib.unquote(part), safe="") for part in parts) - return "/".join(parts) + out = None + + if isinstance(string, builtin_str): + out = string + else: + if is_py2: + out = string.encode(encoding) + else: + out = string.decode(encoding) + + return out -- cgit v1.2.3-70-g09d2