diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-04-06 21:36:29 -0700 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-04-06 21:36:29 -0700 |
commit | fffede8cbb9398ee8556de463ebce914e6790801 (patch) | |
tree | b86e7e8845ce45527947a449ca19460b5e93a025 /tests | |
parent | 3ff581077bd54b91410a72204bf9a1bead55f539 (diff) | |
download | oauth2client-fffede8cbb9398ee8556de463ebce914e6790801.tar.gz |
Adding test coverage for client.py.
- `DeviceFlowInfo` (and using datetime now patch).
- Corner cases for `step1_get_authorize_url` and
`step2_exchange`
- Failure cases for `flow_from_clientsecrets`
- `verify_id_token` when the cached HTTP is used (i.e.
the default argument)
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_client.py | 148 | ||||
-rw-r--r-- | tests/test_jwt.py | 16 |
2 files changed, 164 insertions, 0 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 8f51d57..7c5c9dc 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -50,6 +50,7 @@ from oauth2client.client import AssertionCredentials from oauth2client.client import AUTHORIZED_USER from oauth2client.client import Credentials from oauth2client.client import DEFAULT_ENV_NAME +from oauth2client.client import DeviceFlowInfo from oauth2client.client import Error from oauth2client.client import ApplicationDefaultCredentialsError from oauth2client.client import FlowExchangeError @@ -80,6 +81,7 @@ from oauth2client.client import credentials_from_code from oauth2client.client import flow_from_clientsecrets from oauth2client.client import save_to_well_known_file from oauth2client.clientsecrets import _loadfile +from oauth2client.clientsecrets import InvalidClientSecretsError from oauth2client.service_account import ServiceAccountCredentials from oauth2client._helpers import _to_bytes @@ -1695,6 +1697,60 @@ class OAuth2WebServerFlowTest(unittest2.TestCase): self.assertEqual(OOB_CALLBACK_URN, q['redirect_uri'][0]) self.assertEqual('online', q['access_type'][0]) + @mock.patch('oauth2client.client.logger') + def test_step1_get_authorize_url_redirect_override(self, logger): + flow = OAuth2WebServerFlow('client_id+1', scope='foo', + redirect_uri=OOB_CALLBACK_URN) + alt_redirect = 'foo:bar' + self.assertEqual(flow.redirect_uri, OOB_CALLBACK_URN) + result = flow.step1_get_authorize_url(redirect_uri=alt_redirect) + # Make sure the redirect value was updated. + self.assertEqual(flow.redirect_uri, alt_redirect) + query_params = { + 'client_id': flow.client_id, + 'redirect_uri': alt_redirect, + 'scope': flow.scope, + 'access_type': 'offline', + 'response_type': 'code', + } + expected = _update_query_params(flow.auth_uri, query_params) + assertUrisEqual(self, expected, result) + # Check stubs. + self.assertEqual(logger.warning.call_count, 1) + + def test_step1_get_authorize_url_without_redirect(self): + flow = OAuth2WebServerFlow('client_id+1', scope='foo', + redirect_uri=None) + with self.assertRaises(ValueError): + flow.step1_get_authorize_url(redirect_uri=None) + + def test_step1_get_authorize_url_without_login_hint(self): + login_hint = 'There are wascally wabbits nearby' + flow = OAuth2WebServerFlow('client_id+1', scope='foo', + redirect_uri=OOB_CALLBACK_URN, + login_hint=login_hint) + result = flow.step1_get_authorize_url() + query_params = { + 'client_id': flow.client_id, + 'login_hint': login_hint, + 'redirect_uri': OOB_CALLBACK_URN, + 'scope': flow.scope, + 'access_type': 'offline', + 'response_type': 'code', + } + expected = _update_query_params(flow.auth_uri, query_params) + assertUrisEqual(self, expected, result) + + def test_step2_exchange_no_input(self): + flow = OAuth2WebServerFlow('client_id+1', scope='foo') + with self.assertRaises(ValueError): + flow.step2_exchange() + + def test_step2_exchange_code_and_device_flow(self): + flow = OAuth2WebServerFlow('client_id+1', scope='foo') + with self.assertRaises(ValueError): + flow.step2_exchange(code='code', device_flow_info='dfi') + def test_scope_is_required(self): self.assertRaises(TypeError, OAuth2WebServerFlow, 'client_id+1') @@ -1910,6 +1966,43 @@ class FlowFromCachedClientsecrets(unittest2.TestCase): 'some_secrets', '', redirect_uri='oob', cache=cache_mock) self.assertEqual('foo_client_secret', flow.client_secret) + @mock.patch('oauth2client.clientsecrets.loadfile', + side_effect=InvalidClientSecretsError) + def test_flow_from_clientsecrets_invalid(self, loadfile_mock): + filename = object() + cache = object() + with self.assertRaises(InvalidClientSecretsError): + flow_from_clientsecrets(filename, None, cache=cache, + message=None) + loadfile_mock.assert_called_once_with(filename, cache=cache) + + @mock.patch('oauth2client.clientsecrets.loadfile', + side_effect=InvalidClientSecretsError) + @mock.patch('sys.exit') + def test_flow_from_clientsecrets_invalid_w_msg(self, sys_exit, + loadfile_mock): + filename = object() + cache = object() + message = 'hi mom' + + flow_from_clientsecrets(filename, None, cache=cache, message=message) + sys_exit.assert_called_once_with(message) + loadfile_mock.assert_called_once_with(filename, cache=cache) + + @mock.patch('oauth2client.clientsecrets.loadfile') + def test_flow_from_clientsecrets_unknown_flow(self, loadfile_mock): + client_type = 'UNKNOWN' + loadfile_mock.return_value = client_type, None + filename = object() + cache = object() + + err_msg = 'This OAuth 2.0 flow is unsupported: %r' % (client_type,) + with self.assertRaisesRegexp(client.UnknownClientSecretsFlowError, + err_msg): + flow_from_clientsecrets(filename, None, cache=cache) + + loadfile_mock.assert_called_once_with(filename, cache=cache) + class CredentialsFromCodeTests(unittest2.TestCase): @@ -2061,5 +2154,60 @@ class Test__require_crypto_or_die(unittest2.TestCase): client._require_crypto_or_die() +class TestDeviceFlowInfo(unittest2.TestCase): + + DEVICE_CODE = 'e80ff179-fd65-416c-9dbf-56a23e5d23e4' + USER_CODE = '4bbd8b82-fc73-11e5-adf3-00c2c63e5792' + VER_URL = 'http://foo.bar' + + def test_FromResponse(self): + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + 'verification_url': self.VER_URL, + } + result = DeviceFlowInfo.FromResponse(response) + expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE, + None, self.VER_URL, None) + self.assertEqual(result, expected_result) + + def test_FromResponse_fallback_to_uri(self): + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + 'verification_uri': self.VER_URL, + } + result = DeviceFlowInfo.FromResponse(response) + expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE, + None, self.VER_URL, None) + self.assertEqual(result, expected_result) + + def test_FromResponse_missing_url(self): + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + } + with self.assertRaises(client.OAuth2DeviceCodeError): + DeviceFlowInfo.FromResponse(response) + + @mock.patch('oauth2client.client._NOW') + def test_FromResponse_with_expires_in(self, dt_now): + expires_in = 23 + response = { + 'device_code': self.DEVICE_CODE, + 'user_code': self.USER_CODE, + 'verification_url': self.VER_URL, + 'expires_in': expires_in, + } + now = datetime.datetime(1999, 1, 1, 12, 30, 27) + expire = datetime.datetime(1999, 1, 1, 12, 30, 27 + expires_in) + dt_now.return_value = now + + result = DeviceFlowInfo.FromResponse(response) + expected_result = DeviceFlowInfo(self.DEVICE_CODE, self.USER_CODE, + None, self.VER_URL, expire) + self.assertEqual(result, expected_result) + + if __name__ == '__main__': # pragma: NO COVER unittest2.main() diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 38d28c7..93010ca 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -17,6 +17,8 @@ import os import tempfile import time + +import mock import unittest2 from .http_mock import HttpMockSequence @@ -129,6 +131,20 @@ class CryptTests(unittest2.TestCase): self.assertEqual('billy bob', contents['user']) self.assertEqual('data', contents['metadata']['meta']) + def test_verify_id_token_with_certs_uri_default_http(self): + jwt = self._create_signed_jwt() + + http = HttpMockSequence([ + ({'status': '200'}, datafile('certs.json')), + ]) + + with mock.patch('oauth2client.client._cached_http', new=http): + contents = verify_id_token( + jwt, 'some_audience_address@testing.gserviceaccount.com') + + self.assertEqual('billy bob', contents['user']) + self.assertEqual('data', contents['metadata']['meta']) + def test_verify_id_token_with_certs_uri_fails(self): jwt = self._create_signed_jwt() |