diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-04-07 12:02:31 -0700 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-04-07 12:02:31 -0700 |
commit | ef33f70abb43da50793f169ad7efdd4c72df4966 (patch) | |
tree | 511f5541a42cb0fb6a45eeebd8f7a9dd42389b04 /tests | |
parent | a018723a7efa705c0f0ef6da55ebc23143c02238 (diff) | |
parent | b26f77851967d23913364fae8b6db71c4bf0adc8 (diff) | |
download | oauth2client-ef33f70abb43da50793f169ad7efdd4c72df4966.tar.gz |
Merge pull request #486 from dhermes/cover-client-v7
Getting client.py to 100% coverage.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_client.py | 325 | ||||
-rw-r--r-- | tests/test_jwt.py | 16 |
2 files changed, 326 insertions, 15 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 8f51d57..6e0d997 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -50,6 +50,7 @@ from oauth2client.client import AssertionCredentials from oauth2client.client import AUTHORIZED_USER from oauth2client.client import Credentials from oauth2client.client import DEFAULT_ENV_NAME +from oauth2client.client import DeviceFlowInfo from oauth2client.client import Error from oauth2client.client import ApplicationDefaultCredentialsError from oauth2client.client import FlowExchangeError @@ -80,6 +81,8 @@ from oauth2client.client import credentials_from_code from oauth2client.client import flow_from_clientsecrets from oauth2client.client import save_to_well_known_file from oauth2client.clientsecrets import _loadfile +from oauth2client.clientsecrets import InvalidClientSecretsError +from oauth2client.clientsecrets import TYPE_WEB from oauth2client.service_account import ServiceAccountCredentials from oauth2client._helpers import _to_bytes @@ -1695,6 +1698,153 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0]) self.assertEqual('online', q['access_type'][0]) + @mock.patch('oauth2client.client.logger') + def test_step1_get_authorize_url_redirect_override(self, logger): + flow = OAuth2WebServerFlow('client_id+1', scope='foo', + redirect_uri=OOB_CALLBACK_URN) + alt_redirect = 'foo:bar' + self.assertEqual(flow.redirect_uri, OOB_CALLBACK_URN) + result = flow.step1_get_authorize_url(redirect_uri=alt_redirect) + # Make sure the redirect value was updated. + self.assertEqual(flow.redirect_uri, alt_redirect) + query_params = { + 'client_id': flow.client_id, + 'redirect_uri': alt_redirect, + 'scope': flow.scope, + 'access_type': 'offline', + 'response_type': 'code', + } + expected = _update_query_params(flow.auth_uri, query_params) + assertUrisEqual(self, expected, result) + # Check stubs. + self.assertEqual(logger.warning.call_count, 1) + + def test_step1_get_authorize_url_without_redirect(self): + flow = OAuth2WebServerFlow('client_id+1', scope='foo', + redirect_uri=None) + with self.assertRaises(ValueError): + flow.step1_get_authorize_url(redirect_uri=None) + + def test_step1_get_authorize_url_without_login_hint(self): + login_hint = 'There are wascally wabbits nearby' + flow = OAuth2WebServerFlow('client_id+1', scope='foo', + redirect_uri=OOB_CALLBACK_URN, + login_hint=login_hint) + result = flow.step1_get_authorize_url() + query_params = { + 'client_id': flow.client_id, + 'login_hint': login_hint, + 'redirect_uri': OOB_CALLBACK_URN, + 'scope': flow.scope, + 'access_type': 'offline', + 'response_type': 'code', + } + expected = _update_query_params(flow.auth_uri, query_params) + assertUrisEqual(self, expected, result) + + def test_step1_get_device_and_user_codes_wo_device_uri(self): + flow = OAuth2WebServerFlow('CID', scope='foo', device_uri=None) + with self.assertRaises(ValueError): + flow.step1_get_device_and_user_codes() + + def _step1_get_device_and_user_codes_helper( + self, extra_headers=None, user_agent=None, default_http=False, + content=None): + flow = OAuth2WebServerFlow('CID', scope='foo', + user_agent=user_agent) + device_code = 'bfc06756-062e-430f-9f0f-460ca44724e5' + user_code = '5faf2780-fc83-11e5-9bc2-00c2c63e5792' + ver_url = 'http://foo.bar' + if content is None: + content = json.dumps({ + 'device_code': device_code, + 'user_code': user_code, + 'verification_url': ver_url, + }) + http = HttpMockSequence([ + ({'status': http_client.OK}, content), + ]) + if default_http: + with mock.patch('httplib2.Http', return_value=http): + result = flow.step1_get_device_and_user_codes() + else: + result = flow.step1_get_device_and_user_codes(http=http) + + expected = DeviceFlowInfo(device_code, user_code, + None, ver_url, None) + self.assertEqual(result, expected) + self.assertEqual(len(http.requests), 1) + self.assertEqual(http.requests[0]['uri'], client.GOOGLE_DEVICE_URI) + body = http.requests[0]['body'] + self.assertEqual(urllib.parse.parse_qs(body), + {'client_id': [flow.client_id], + 'scope': [flow.scope]}) + headers = {'content-type': 'application/x-www-form-urlencoded'} + if extra_headers is not None: + headers.update(extra_headers) + self.assertEqual(http.requests[0]['headers'], headers) + + def test_step1_get_device_and_user_codes(self): + self._step1_get_device_and_user_codes_helper() + + def test_step1_get_device_and_user_codes_w_user_agent(self): + user_agent = 'spiderman' + extra_headers = {'user-agent': user_agent} + self._step1_get_device_and_user_codes_helper( + user_agent=user_agent, extra_headers=extra_headers) + + def test_step1_get_device_and_user_codes_w_default_http(self): + self._step1_get_device_and_user_codes_helper(default_http=True) + + def test_step1_get_device_and_user_codes_bad_payload(self): + non_json_content = b'{' + with self.assertRaises(client.OAuth2DeviceCodeError): + self._step1_get_device_and_user_codes_helper( + content=non_json_content) + + def _step1_get_device_and_user_codes_fail_helper(self, status, + content, error_msg): + flow = OAuth2WebServerFlow('CID', scope='foo') + http = HttpMockSequence([ + ({'status': status}, content), + ]) + with self.assertRaises(client.OAuth2DeviceCodeError) as exc_manager: + flow.step1_get_device_and_user_codes(http=http) + + self.assertEqual(exc_manager.exception.args, (error_msg,)) + + def test_step1_get_device_and_user_codes_non_json_failure(self): + status = http_client.BAD_REQUEST + content = 'Nope not JSON.' + error_msg = 'Invalid response %s.' % (status,) + self._step1_get_device_and_user_codes_fail_helper(status, content, + error_msg) + + def test_step1_get_device_and_user_codes_basic_failure(self): + status = http_client.INTERNAL_SERVER_ERROR + content = b'{}' + error_msg = 'Invalid response %s.' % (status,) + self._step1_get_device_and_user_codes_fail_helper(status, content, + error_msg) + + def test_step1_get_device_and_user_codes_failure_w_json_error(self): + status = http_client.BAD_GATEWAY + base_error = 'ZOMG user codes failure.' + content = json.dumps({'error': base_error}) + error_msg = 'Invalid response %s. Error: %s' % (status, base_error) + self._step1_get_device_and_user_codes_fail_helper(status, content, + error_msg) + + def test_step2_exchange_no_input(self): + flow = OAuth2WebServerFlow('client_id+1', scope='foo') + with self.assertRaises(ValueError): + flow.step2_exchange() + + def test_step2_exchange_code_and_device_flow(self): + flow = OAuth2WebServerFlow('client_id+1', scope='foo') + with self.assertRaises(ValueError): + flow.step2_exchange(code='code', device_flow_info='dfi') + def test_scope_is_required(self): self.assertRaises(TypeError, OAuth2WebServerFlow, 'client_id+1') @@ -1704,7 +1854,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): ]) with self.assertRaises(FlowExchangeError): - credentials = self.flow.step2_exchange('some random code', + credentials = self.flow.step2_exchange(code='some random code', http=http) def test_urlencoded_exchange_failure(self): @@ -1714,7 +1864,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): with self.assertRaisesRegexp(FlowExchangeError, 'invalid_request'): - credentials = self.flow.step2_exchange('some random code', + credentials = self.flow.step2_exchange(code='some random code', http=http) def test_exchange_failure_with_json_error(self): @@ -1731,23 +1881,32 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): http = HttpMockSequence([({'status': '400'}, payload)]) with self.assertRaises(FlowExchangeError): - credentials = self.flow.step2_exchange('some random code', + credentials = self.flow.step2_exchange(code='some random code', http=http) - def test_exchange_success(self): + def _exchange_success_test_helper(self, code=None, device_flow_info=None): payload = (b'{' b' "access_token":"SlAV32hkKG",' b' "expires_in":3600,' b' "refresh_token":"8xLOxBtZp8"' b'}') http = HttpMockSequence([({'status': '200'}, payload)]) - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange( + code=code, device_flow_info=device_flow_info, http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) self.assertNotEqual(None, credentials.token_expiry) self.assertEqual('8xLOxBtZp8', credentials.refresh_token) self.assertEqual('dummy_revoke_uri', credentials.revoke_uri) self.assertEqual(set(['foo']), credentials.scopes) + def test_exchange_success(self): + self._exchange_success_test_helper(code='some random code') + + def test_exchange_success_with_device_flow_info(self): + device_flow_info = DeviceFlowInfo('some random code', None, + None, None, None) + self._exchange_success_test_helper(device_flow_info=device_flow_info) + def test_exchange_success_binary_code(self): binary_code = b'some random code' access_token = 'SlAV32hkKG' @@ -1761,7 +1920,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): ' "refresh_token":"' + refresh_token + '"' '}') http = HttpMockSequence([({'status': '200'}, _to_bytes(payload))]) - credentials = self.flow.step2_exchange(binary_code, http=http) + credentials = self.flow.step2_exchange(code=binary_code, http=http) self.assertEqual(access_token, credentials.access_token) self.assertIsNotNone(credentials.token_expiry) self.assertEqual(refresh_token, credentials.refresh_token) @@ -1788,7 +1947,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): b'}') http = HttpMockSequence([({'status': '200'}, payload)]) - credentials = self.flow.step2_exchange(not_a_dict, http=http) + credentials = self.flow.step2_exchange(code=not_a_dict, http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) self.assertNotEqual(None, credentials.token_expiry) self.assertEqual('8xLOxBtZp8', credentials.refresh_token) @@ -1812,7 +1971,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): ({'status': '200'}, b'access_token=SlAV32hkKG'), ]) - credentials = flow.step2_exchange('some random code', http=http) + credentials = flow.step2_exchange(code='some random code', http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) test_request = http.requests[0] @@ -1826,7 +1985,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): ({'status': '200'}, b'access_token=SlAV32hkKG&expires_in=3600'), ]) - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange(code='some random code', + http=http) self.assertEqual('SlAV32hkKG', credentials.access_token) self.assertNotEqual(None, credentials.token_expiry) @@ -1837,7 +1997,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): ({'status': '200'}, b'access_token=SlAV32hkKG&expires=3600'), ]) - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange(code='some random code', + http=http) self.assertNotEqual(None, credentials.token_expiry) def test_exchange_no_expires_in(self): @@ -1847,7 +2008,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): b'}') http = HttpMockSequence([({'status': '200'}, payload)]) - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange(code='some random code', + http=http) self.assertEqual(None, credentials.token_expiry) def test_urlencoded_exchange_no_expires_in(self): @@ -1857,7 +2019,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): ({'status': '200'}, b'access_token=SlAV32hkKG'), ]) - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange(code='some random code', + http=http) self.assertEqual(None, credentials.token_expiry) def test_exchange_fails_if_no_code(self): @@ -1870,7 +2033,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): code = {'error': 'thou shall not pass'} with self.assertRaisesRegexp(FlowExchangeError, 'shall not pass'): - credentials = self.flow.step2_exchange(code, http=http) + credentials = self.flow.step2_exchange(code=code, http=http) def test_exchange_id_token_fail(self): payload = (b'{' @@ -1881,7 +2044,7 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): http = HttpMockSequence([({'status': '200'}, payload)]) self.assertRaises(VerifyJwtTokenError, self.flow.step2_exchange, - 'some random code', http=http) + code='some random code', http=http) def test_exchange_id_token(self): body = {'foo': 'bar'} @@ -1896,7 +2059,8 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): b' "id_token": "' + jwt + b'"' b'}') http = HttpMockSequence([({'status': '200'}, payload)]) - credentials = self.flow.step2_exchange('some random code', http=http) + credentials = self.flow.step2_exchange(code='some random code', + http=http) self.assertEqual(credentials.id_token, body) @@ -1910,6 +2074,82 @@ class FlowFromCachedClientsecrets(unittest2.TestCase): 'some_secrets', '', redirect_uri='oob', cache=cache_mock) self.assertEqual('foo_client_secret', flow.client_secret) + @mock.patch('oauth2client.clientsecrets.loadfile') + def _flow_from_clientsecrets_success_helper(self, loadfile_mock, + device_uri=None, + revoke_uri=None): + client_type = TYPE_WEB + client_info = { + 'auth_uri': 'auth_uri', + 'token_uri': 'token_uri', + 'client_id': 'client_id', + 'client_secret': 'client_secret', + } + if revoke_uri is not None: + client_info['revoke_uri'] = revoke_uri + loadfile_mock.return_value = client_type, client_info + filename = object() + scope = ['baz'] + cache = object() + + if device_uri is not None: + result = flow_from_clientsecrets(filename, scope, cache=cache, + device_uri=device_uri) + self.assertEqual(result.device_uri, device_uri) + else: + result = flow_from_clientsecrets(filename, scope, cache=cache) + + self.assertIsInstance(result, OAuth2WebServerFlow) + loadfile_mock.assert_called_once_with(filename, cache=cache) + + def test_flow_from_clientsecrets_success(self): + self._flow_from_clientsecrets_success_helper() + + def test_flow_from_clientsecrets_success_w_device_uri(self): + device_uri = 'http://device.uri' + self._flow_from_clientsecrets_success_helper(device_uri=device_uri) + + def test_flow_from_clientsecrets_success_w_revoke_uri(self): + revoke_uri = 'http://revoke.uri' + self._flow_from_clientsecrets_success_helper(revoke_uri=revoke_uri) + + @mock.patch('oauth2client.clientsecrets.loadfile', + side_effect=InvalidClientSecretsError) + def test_flow_from_clientsecrets_invalid(self, loadfile_mock): + filename = object() + cache = object() + with self.assertRaises(InvalidClientSecretsError): + flow_from_clientsecrets(filename, None, cache=cache, + message=None) + loadfile_mock.assert_called_once_with(filename, cache=cache) + + @mock.patch('oauth2client.clientsecrets.loadfile', + side_effect=InvalidClientSecretsError) + @mock.patch('sys.exit') + def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit, + loadfile_mock): + filename = object() + cache = object() + message = 'hi mom' + + flow_from_clientsecrets(filename, None, cache=cache, message=message) + sys_exit.assert_called_once_with(message) + loadfile_mock.assert_called_once_with(filename, cache=cache) + + @mock.patch('oauth2client.clientsecrets.loadfile') + def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock): + client_type = 'UNKNOWN' + loadfile_mock.return_value = client_type, None + filename = object() + cache = object() + + err_msg = 'This OAuth 2.0 flow is unsupported: %r' % (client_type,) + with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError, + err_msg): + flow_from_clientsecrets(filename, None, cache=cache) + + loadfile_mock.assert_called_once_with(filename, cache=cache) + class CredentialsFromCodeTests(unittest2.TestCase): @@ -2061,5 +2301,60 @@ class Test__require_crypto_or_die(unittest2.TestCase): client._require_crypto_or_die() +class TestDeviceFlowInfo(unittest2.TestCase): + + DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4' + USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792' + VER_URL = 'http://foo.bar' + + def test_FromResponse(self): + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + 'verification_url': self.VER_URL, + } + result = DeviceFlowInfo.FromResponse(response) + expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE, + None, self.VER_URL, None) + self.assertEqual(result, expected_result) + + def test_FromResponse_fallback_to_uri(self): + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + 'verification_uri': self.VER_URL, + } + result = DeviceFlowInfo.FromResponse(response) + expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE, + None, self.VER_URL, None) + self.assertEqual(result, expected_result) + + def test_FromResponse_missing_url(self): + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + } + with self.assertRaises(client.OAuth2DeviceCodeError): + DeviceFlowInfo.FromResponse(response) + + @mock.patch('oauth2client.client._NOW') + def test_FromResponse_with_expires_in(self, dt_now): + expires_in = 23 + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + 'verification_url': self.VER_URL, + 'expires_in': expires_in, + } + now = datetime.datetime(1999, 1, 1, 12, 30, 27) + expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in) + dt_now.return_value = now + + result = DeviceFlowInfo.FromResponse(response) + expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE, + None, self.VER_URL, expire) + self.assertEqual(result, expected_result) + + if __name__ == '__main__': # pragma: NO COVER unittest2.main() diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 38d28c7..93010ca 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -17,6 +17,8 @@ import os import tempfile import time + +import mock import unittest2 from .http_mock import HttpMockSequence @@ -129,6 +131,20 @@ class CryptTests(unittest2.TestCase): self.assertEqual('billy bob', contents['user']) self.assertEqual('data', contents['metadata']['meta']) + def test_verify_id_token_with_certs_uri_default_http(self): + jwt = self._create_signed_jwt() + + http = HttpMockSequence([ + ({'status': '200'}, datafile('certs.json')), + ]) + + with mock.patch('oauth2client.client._cached_http', new=http): + contents = verify_id_token( + jwt, 'some_audience_address@testing.gserviceaccount.com') + + self.assertEqual('billy bob', contents['user']) + self.assertEqual('data', contents['metadata']['meta']) + def test_verify_id_token_with_certs_uri_fails(self): jwt = self._create_signed_jwt() |