Updated requests package to v2.2.1

It was included, but never used. However, lots of code can improve from this.
This commit is contained in:
Bas Stottelaar
2014-04-05 14:57:59 +02:00
parent ea6e3335a3
commit 667f768749
122 changed files with 10284 additions and 16040 deletions

View File

@@ -11,55 +11,29 @@ that are also useful for external consumption.
import cgi
import codecs
import collections
import io
import os
import platform
import re
import sys
import zlib
from netrc import netrc, NetrcParseError
import socket
import struct
from . import __version__
from . import certs
from .compat import parse_http_list as _parse_list_header
from .compat import quote, urlparse, basestring, bytes, str
from .compat import (quote, urlparse, bytes, str, OrderedDict, unquote, is_py2,
builtin_str, getproxies, proxy_bypass)
from .cookies import RequestsCookieJar, cookiejar_from_dict
from .structures import CaseInsensitiveDict
from .exceptions import MissingSchema, InvalidURL
_hush_pyflakes = (RequestsCookieJar,)
CERTIFI_BUNDLE_PATH = None
try:
# see if requests's own CA certificate bundle is installed
from . import certs
CERTIFI_BUNDLE_PATH = certs.where()
except ImportError:
pass
NETRC_FILES = ('.netrc', '_netrc')
# common paths for the OS's CA certificate bundle
POSSIBLE_CA_BUNDLE_PATHS = [
# Red Hat, CentOS, Fedora and friends (provided by the ca-certificates package):
'/etc/pki/tls/certs/ca-bundle.crt',
# Ubuntu, Debian, and friends (provided by the ca-certificates package):
'/etc/ssl/certs/ca-certificates.crt',
# FreeBSD (provided by the ca_root_nss package):
'/usr/local/share/certs/ca-root-nss.crt',
# openSUSE (provided by the ca-certificates package), the 'certs' directory is the
# preferred way but may not be supported by the SSL module, thus it has 'ca-bundle.pem'
# as a fallback (which is generated from pem files in the 'certs' directory):
'/etc/ssl/ca-bundle.pem',
]
def get_os_ca_bundle_path():
"""Try to pick an available CA certificate bundle provided by the OS."""
for path in POSSIBLE_CA_BUNDLE_PATHS:
if os.path.exists(path):
return path
return None
# if certifi is installed, use its CA bundle;
# otherwise, try and use the OS bundle
DEFAULT_CA_BUNDLE_PATH = CERTIFI_BUNDLE_PATH or get_os_ca_bundle_path()
DEFAULT_CA_BUNDLE_PATH = certs.where()
def dict_to_sequence(d):
@@ -71,20 +45,50 @@ def dict_to_sequence(d):
return d
def super_len(o):
if hasattr(o, '__len__'):
return len(o)
if hasattr(o, 'len'):
return o.len
if hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
return os.fstat(fileno).st_size
if hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringI
return len(o.getvalue())
def get_netrc_auth(url):
"""Returns the Requests tuple auth for a given url from netrc."""
try:
locations = (os.path.expanduser('~/{0}'.format(f)) for f in NETRC_FILES)
from netrc import netrc, NetrcParseError
netrc_path = None
for loc in locations:
if os.path.exists(loc) and not netrc_path:
for f in NETRC_FILES:
try:
loc = os.path.expanduser('~/{0}'.format(f))
except KeyError:
# os.path.expanduser can fail when $HOME is undefined and
# getpwuid fails. See http://bugs.python.org/issue20164 &
# https://github.com/kennethreitz/requests/issues/1846
return
if os.path.exists(loc):
netrc_path = loc
break
# Abort early if there isn't one.
if netrc_path is None:
return netrc_path
return
ri = urlparse(url)
@@ -111,31 +115,52 @@ def guess_filename(obj):
"""Tries to guess the filename of the given object."""
name = getattr(obj, 'name', None)
if name and name[0] != '<' and name[-1] != '>':
return name
return os.path.basename(name)
def to_key_val_list(value):
def from_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. Unless it can not be represented as such, return a list of
tuples, e.g.,:
dictionary. Unless it can not be represented as such, return an
OrderedDict, e.g.,
>>> to_key_val_list([('key', 'val')])
[('key', 'val')]
>>> to_key_val_list('string')
ValueError: ...
>>> to_key_val_list({'key': 'val'})
[('key', 'val')]
::
>>> from_key_val_list([('key', 'val')])
OrderedDict([('key', 'val')])
>>> from_key_val_list('string')
ValueError: need more than 1 value to unpack
>>> from_key_val_list({'key': 'val'})
OrderedDict([('key', 'val')])
"""
if value is None:
return None
try:
dict(value)
except ValueError:
raise ValueError('Unable to encode lists with elements that are not '
'2-tuples.')
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
if isinstance(value, dict) or hasattr(value, 'items'):
return OrderedDict(value)
def to_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. If it can be, return a list of tuples, e.g.,
::
>>> to_key_val_list([('key', 'val')])
[('key', 'val')]
>>> to_key_val_list({'key': 'val'})
[('key', 'val')]
>>> to_key_val_list('string')
ValueError: cannot encode objects that are not 2-tuples.
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
if isinstance(value, collections.Mapping):
value = value.items()
return list(value)
@@ -231,57 +256,6 @@ def unquote_header_value(value, is_filename=False):
return value
def header_expand(headers):
"""Returns an HTTP Header value string from a dictionary.
Example expansion::
{'text/x-dvi': {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}, 'text/x-c': {}}
# Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c
(('text/x-dvi', {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}), ('text/x-c', {}))
# Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c
"""
collector = []
if isinstance(headers, dict):
headers = list(headers.items())
elif isinstance(headers, basestring):
return headers
elif isinstance(headers, str):
# As discussed in https://github.com/kennethreitz/requests/issues/400
# latin-1 is the most conservative encoding used on the web. Anyone
# who needs more can encode to a byte-string before calling
return headers.encode("latin-1")
elif headers is None:
return headers
for i, (value, params) in enumerate(headers):
_params = []
for (p_k, p_v) in list(params.items()):
_params.append('%s=%s' % (p_k, p_v))
collector.append(value)
collector.append('; ')
if len(params):
collector.append('; '.join(_params))
if not len(headers) == i + 1:
collector.append(', ')
# Remove trailing separators.
if collector[-1] in (', ', '; '):
del collector[-1]
return ''.join(collector)
def dict_from_cookiejar(cj):
"""Returns a key/value dictionary from a CookieJar.
@@ -290,11 +264,8 @@ def dict_from_cookiejar(cj):
cookie_dict = {}
for _, cookies in list(cj._cookies.items()):
for _, cookies in list(cookies.items()):
for cookie in list(cookies.values()):
# print cookie
cookie_dict[cookie.name] = cookie.value
for cookie in cj:
cookie_dict[cookie.name] = cookie.value
return cookie_dict
@@ -307,8 +278,7 @@ def add_dict_to_cookiejar(cj, cookie_dict):
"""
cj2 = cookiejar_from_dict(cookie_dict)
for cookie in cj2:
cj.set_cookie(cookie)
cj.update(cj2)
return cj
@@ -319,8 +289,12 @@ def get_encodings_from_content(content):
"""
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
return charset_re.findall(content)
return (charset_re.findall(content) +
pragma_re.findall(content) +
xml_re.findall(content))
def get_encoding_from_headers(headers):
@@ -356,17 +330,19 @@ def stream_decode_response_unicode(iterator, r):
rv = decoder.decode(chunk)
if rv:
yield rv
rv = decoder.decode('', final=True)
rv = decoder.decode(b'', final=True)
if rv:
yield rv
def iter_slices(string, slice_length):
"""Iterate over slices of a string."""
pos = 0
while pos < len(string):
yield string[pos:pos+slice_length]
yield string[pos:pos + slice_length]
pos += slice_length
def get_unicode_from_response(r):
"""Returns the requested content back in unicode.
@@ -400,48 +376,6 @@ def get_unicode_from_response(r):
return r.content
def stream_decompress(iterator, mode='gzip'):
"""
Stream decodes an iterator over compressed data
:param iterator: An iterator over compressed data
:param mode: 'gzip' or 'deflate'
:return: An iterator over decompressed data
"""
if mode not in ['gzip', 'deflate']:
raise ValueError('stream_decompress mode must be gzip or deflate')
zlib_mode = 16 + zlib.MAX_WBITS if mode == 'gzip' else -zlib.MAX_WBITS
dec = zlib.decompressobj(zlib_mode)
try:
for chunk in iterator:
rv = dec.decompress(chunk)
if rv:
yield rv
except zlib.error:
# If there was an error decompressing, just return the raw chunk
yield chunk
# Continue to return the rest of the raw data
for chunk in iterator:
yield chunk
else:
# Make sure everything has been returned from the decompression object
buf = dec.decompress(bytes())
rv = buf + dec.flush()
if rv:
yield rv
def stream_untransfer(gen, resp):
if 'gzip' in resp.headers.get('content-encoding', ''):
gen = stream_decompress(gen, mode='gzip')
elif 'deflate' in resp.headers.get('content-encoding', ''):
gen = stream_decompress(gen, mode='deflate')
return gen
# The unreserved URI characters (RFC 3986)
UNRESERVED_SET = frozenset(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
@@ -452,21 +386,22 @@ def unquote_unreserved(uri):
"""Un-escape any percent-escape sequences in a URI that are unreserved
characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
"""
try:
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
except ValueError:
raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
except ValueError:
return uri
else:
parts[i] = '%' + parts[i]
return ''.join(parts)
def requote_uri(uri):
@@ -481,35 +416,114 @@ def requote_uri(uri):
return quote(unquote_unreserved(uri), safe="!#$%&'()*+,/:;=?@[]~")
def get_environ_proxies():
def address_in_network(ip, net):
"""
This function allows you to check if on IP belongs to a network subnet
Example: returns True if ip = 192.168.1.1 and net = 192.168.1.0/24
returns False if ip = 192.168.1.1 and net = 192.168.100.0/24
"""
ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0]
netaddr, bits = net.split('/')
netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0]
network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask
return (ipaddr & netmask) == (network & netmask)
def dotted_netmask(mask):
"""
Converts mask from /xx format to xxx.xxx.xxx.xxx
Example: if mask is 24 function returns 255.255.255.0
"""
bits = 0xffffffff ^ (1 << 32 - mask) - 1
return socket.inet_ntoa(struct.pack('>I', bits))
def is_ipv4_address(string_ip):
try:
socket.inet_aton(string_ip)
except socket.error:
return False
return True
def is_valid_cidr(string_network):
"""Very simple check of the cidr format in no_proxy variable"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
def get_environ_proxies(url):
"""Return a dict of environment proxies."""
proxy_keys = [
'all',
'http',
'https',
'ftp',
'socks',
'no'
]
get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper())
proxies = [(key, get_proxy(key + '_proxy')) for key in proxy_keys]
return dict([(key, val) for (key, val) in proxies if val])
# First check whether no_proxy is defined. If it is, check that the URL
# we're getting isn't in the no_proxy list.
no_proxy = get_proxy('no_proxy')
netloc = urlparse(url).netloc
if no_proxy:
# We need to check whether we match here. We need to see if we match
# the end of the netloc, both with and without the port.
no_proxy = no_proxy.replace(' ', '').split(',')
ip = netloc.split(':')[0]
if is_ipv4_address(ip):
for proxy_ip in no_proxy:
if is_valid_cidr(proxy_ip):
if address_in_network(ip, proxy_ip):
return {}
else:
for host in no_proxy:
if netloc.endswith(host) or netloc.split(':')[0].endswith(host):
# The URL does match something in no_proxy, so we don't want
# to apply the proxies on this URL.
return {}
# If the system proxy settings indicate that this URL should be bypassed,
# don't proxy.
# The proxy_bypass function is incredibly buggy on OS X in early versions
# of Python 2.6, so allow this call to fail. Only catch the specific
# exceptions we've seen, though: this call failing in other ways can reveal
# legitimate problems.
try:
bypass = proxy_bypass(netloc)
except (TypeError, socket.gaierror):
bypass = False
if bypass:
return {}
# If we get here, we either didn't have no_proxy set or we're not going
# anywhere that no_proxy applies to, and the system settings don't require
# bypassing the proxy for the current URL.
return getproxies()
def default_user_agent():
def default_user_agent(name="python-requests"):
"""Return a string representing the default user agent."""
_implementation = platform.python_implementation()
if _implementation == 'CPython':
_implementation_version = platform.python_version()
elif _implementation == 'PyPy':
_implementation_version = '%s.%s.%s' % (
sys.pypy_version_info.major,
_implementation_version = '%s.%s.%s' % (sys.pypy_version_info.major,
sys.pypy_version_info.minor,
sys.pypy_version_info.micro
)
sys.pypy_version_info.micro)
if sys.pypy_version_info.releaselevel != 'final':
_implementation_version = ''.join([_implementation_version, sys.pypy_version_info.releaselevel])
elif _implementation == 'Jython':
@@ -519,11 +533,25 @@ def default_user_agent():
else:
_implementation_version = 'Unknown'
return " ".join([
'python-requests/%s' % __version__,
'%s/%s' % (_implementation, _implementation_version),
'%s/%s' % (platform.system(), platform.release()),
])
try:
p_system = platform.system()
p_release = platform.release()
except IOError:
p_system = 'Unknown'
p_release = 'Unknown'
return " ".join(['%s/%s' % (name, __version__),
'%s/%s' % (_implementation, _implementation_version),
'%s/%s' % (p_system, p_release)])
def default_headers():
return CaseInsensitiveDict({
'User-Agent': default_user_agent(),
'Accept-Encoding': ', '.join(('gzip', 'deflate', 'compress')),
'Accept': '*/*'
})
def parse_header_links(value):
"""Return a dict of parsed link headers proxies.
@@ -531,7 +559,7 @@ def parse_header_links(value):
i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
"""
links = []
replace_chars = " '\""
@@ -548,12 +576,88 @@ def parse_header_links(value):
for param in params.split(";"):
try:
key,value = param.split("=")
key, value = param.split("=")
except ValueError:
break
link[key.strip(replace_chars)] = value.strip(replace_chars)
links.append(link)
return links
# Null bytes; no need to recreate these on each call to guess_json_utf
_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3
_null2 = _null * 2
_null3 = _null * 3
def guess_json_utf(data):
# JSON always starts with two ASCII characters, so detection is as
# easy as counting the nulls and from their location and count
# determine the encoding. Also detect a BOM, if present.
sample = data[:4]
if sample in (codecs.BOM_UTF32_LE, codecs.BOM32_BE):
return 'utf-32' # BOM included
if sample[:3] == codecs.BOM_UTF8:
return 'utf-8-sig' # BOM included, MS style (discouraged)
if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
return 'utf-16' # BOM included
nullcount = sample.count(_null)
if nullcount == 0:
return 'utf-8'
if nullcount == 2:
if sample[::2] == _null2: # 1st and 3rd are null
return 'utf-16-be'
if sample[1::2] == _null2: # 2nd and 4th are null
return 'utf-16-le'
# Did not detect 2 valid UTF-16 ascii-range characters
if nullcount == 3:
if sample[:3] == _null3:
return 'utf-32-be'
if sample[1:] == _null3:
return 'utf-32-le'
# Did not detect a valid UTF-32 ascii-range character
return None
def except_on_missing_scheme(url):
"""Given a URL, raise a MissingSchema exception if the scheme is missing.
"""
scheme, netloc, path, params, query, fragment = urlparse(url)
if not scheme:
raise MissingSchema('Proxy URLs must have explicit schemes.')
def get_auth_from_url(url):
"""Given a url with authentication components, extract them into a tuple of
username,password."""
parsed = urlparse(url)
try:
auth = (unquote(parsed.username), unquote(parsed.password))
except (AttributeError, TypeError):
auth = ('', '')
return auth
def to_native_string(string, encoding='ascii'):
"""
Given a string object, regardless of type, returns a representation of that
string in the native string type, encoding and decoding where necessary.
This assumes ASCII unless told otherwise.
"""
out = None
if isinstance(string, builtin_str):
out = string
else:
if is_py2:
out = string.encode(encoding)
else:
out = string.decode(encoding)
return out