diff options
author | Ken Payson <kpayson@google.com> | 2016-04-22 10:10:28 -0700 |
---|---|---|
committer | Ken Payson <kpayson@google.com> | 2016-05-16 10:30:08 -0700 |
commit | 82cde04917da5c2db31241e4bb334ed5e0ba66fe (patch) | |
tree | 1eeb811e6419a88a889b90569d294356ecc25dfb /tests | |
parent | e99a4ac3c5d3c2944680ea180b71a0d144d6128b (diff) | |
download | oauth2client-82cde04917da5c2db31241e4bb334ed5e0ba66fe.tar.gz |
Added JWTAccessCredentials.
Newer Google APIs can accept JWTs signed using ServiceAccountCredentials
for authentication. (See https://jwt.io/). The new behavior for
GoogleCredentials.get_application_default() will attempt to use a signed JWT
if ServiceAccountCredentials are available and no scope is specified.
Upon specifying a scope, OAuth2 authentication will be used.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_client.py | 14 | ||||
-rw-r--r-- | tests/test_service_account.py | 228 |
2 files changed, 242 insertions, 0 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 6e0d997..7e39877 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -840,6 +840,20 @@ class GoogleCredentialsTests(unittest2.TestCase): creds2_vals.pop('_signer') self.assertEqual(creds1_vals, creds2_vals) + def test_to_from_json_service_account_scoped(self): + credentials_file = datafile( + os.path.join('gcloud', _WELL_KNOWN_CREDENTIALS_FILE)) + creds1 = GoogleCredentials.from_stream(credentials_file) + creds1 = creds1.create_scoped(['dummy_scope']) + # Convert to and then back from json. + creds2 = GoogleCredentials.from_json(creds1.to_json()) + + creds1_vals = creds1.__dict__ + creds1_vals.pop('_signer') + creds2_vals = creds2.__dict__ + creds2_vals.pop('_signer') + self.assertEqual(creds1_vals, creds2_vals) + def test_parse_expiry(self): dt = datetime.datetime(2016, 1, 1) parsed_expiry = client._parse_expiry(dt) diff --git a/tests/test_service_account.py b/tests/test_service_account.py index 3d9e7db..9697bc5 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -23,11 +23,13 @@ import os import rsa import tempfile +import httplib2 import mock import unittest2 from .http_mock import HttpMockSequence from oauth2client import crypt +from oauth2client.service_account import _JWTAccessCredentials from oauth2client.service_account import ServiceAccountCredentials from oauth2client.service_account import SERVICE_ACCOUNT @@ -354,6 +356,232 @@ class ServiceAccountCredentialsTests(unittest2.TestCase): self.assertEqual(credentials.access_token, token2) +TOKEN_LIFE = _JWTAccessCredentials._MAX_TOKEN_LIFETIME_SECS +T1 = 42 +T1_DATE = datetime.datetime(1970, 1, 1, second=T1) +T1_EXPIRY = T1 + TOKEN_LIFE +T1_EXPIRY_DATE = T1_DATE + datetime.timedelta(seconds=TOKEN_LIFE) + +T2 = T1 + 100 +T2_DATE = T1_DATE + datetime.timedelta(seconds=100) +T2_EXPIRY = T2 + TOKEN_LIFE +T2_EXPIRY_DATE = T2_DATE + datetime.timedelta(seconds=TOKEN_LIFE) + +T3 = T1 + TOKEN_LIFE + 1 +T3_DATE = T1_DATE + datetime.timedelta(seconds=TOKEN_LIFE + 1) +T3_EXPIRY = T3 + TOKEN_LIFE +T3_EXPIRY_DATE = T3_DATE + datetime.timedelta(seconds=TOKEN_LIFE) + + +class JWTAccessCredentialsTests(unittest2.TestCase): + + def setUp(self): + self.client_id = '123' + self.service_account_email = 'dummy@google.com' + self.private_key_id = 'ABCDEF' + self.private_key = datafile('pem_from_pkcs12.pem') + self.signer = crypt.Signer.from_string(self.private_key) + self.url = 'https://test.url.com' + self.jwt = _JWTAccessCredentials(self.service_account_email, + self.signer, + private_key_id=self.private_key_id, + client_id=self.client_id, + additional_claims={'aud': self.url}) + + @mock.patch('oauth2client.service_account._UTCNOW') + @mock.patch('oauth2client.client._UTCNOW') + @mock.patch('time.time') + def test_get_access_token_no_claims(self, time, client_utcnow, utcnow): + utcnow.return_value = T1_DATE + client_utcnow.return_value = T1_DATE + time.return_value = T1 + + token_info = self.jwt.get_access_token() + payload = crypt.verify_signed_jwt_with_certs( + token_info.access_token, + {'key': datafile('public_cert.pem')}, audience=self.url) + self.assertEqual(payload['iss'], self.service_account_email) + self.assertEqual(payload['sub'], self.service_account_email) + self.assertEqual(payload['iat'], T1) + self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(token_info.expires_in, T1_EXPIRY - T1) + + # Verify that we vend the same token after 100 seconds + utcnow.return_value = T2_DATE + client_utcnow.return_value = T2_DATE + token_info = self.jwt.get_access_token() + payload = crypt.verify_signed_jwt_with_certs( + token_info.access_token, + {'key': datafile('public_cert.pem')}, audience=self.url) + self.assertEqual(payload['iat'], T1) + self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(token_info.expires_in, T1_EXPIRY - T2) + + # Verify that we vend a new token after _MAX_TOKEN_LIFETIME_SECS + utcnow.return_value = T3_DATE + client_utcnow.return_value = T3_DATE + time.return_value = T3 + token_info = self.jwt.get_access_token() + payload = crypt.verify_signed_jwt_with_certs( + token_info.access_token, + {'key': datafile('public_cert.pem')}, audience=self.url) + expires_in = token_info.expires_in + self.assertEqual(payload['iat'], T3) + self.assertEqual(payload['exp'], T3_EXPIRY) + self.assertEqual(expires_in, T3_EXPIRY - T3) + + @mock.patch('oauth2client.service_account._UTCNOW') + @mock.patch('time.time') + def test_get_access_token_additional_claims(self, time, utcnow): + utcnow.return_value = T1_DATE + time.return_value = T1 + + token_info = self.jwt.get_access_token(additional_claims= + {'aud': 'https://test2.url.com', + 'sub': 'dummy2@google.com' + }) + payload = crypt.verify_signed_jwt_with_certs( + token_info.access_token, + {'key' : datafile('public_cert.pem')}, + audience='https://test2.url.com') + expires_in = token_info.expires_in + self.assertEqual(payload['iss'], self.service_account_email) + self.assertEqual(payload['sub'], 'dummy2@google.com') + self.assertEqual(payload['iat'], T1) + self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(expires_in, T1_EXPIRY - T1) + + def test_revoke(self): + self.jwt.revoke(None) + + def test_create_scoped_required(self): + self.assertTrue(self.jwt.create_scoped_required()) + + def test_create_scoped(self): + self.jwt._private_key_pkcs12 = '' + self.jwt._private_key_password = '' + + new_credentials = self.jwt.create_scoped('dummy_scope') + self.assertNotEqual(self.jwt, new_credentials) + self.assertIsInstance(new_credentials, ServiceAccountCredentials) + self.assertEqual('dummy_scope', new_credentials._scopes) + + @mock.patch('oauth2client.service_account._UTCNOW') + @mock.patch('oauth2client.client._UTCNOW') + @mock.patch('time.time') + def test_authorize_success(self, time, client_utcnow, utcnow): + utcnow.return_value = T1_DATE + client_utcnow.return_value = T1_DATE + time.return_value = T1 + + def mock_request(uri, method='GET', body=None, headers=None, + redirections=0, connection_type=None): + self.assertEqual(uri, self.url) + bearer, token = headers[b'Authorization'].split() + payload = crypt.verify_signed_jwt_with_certs( + token, + {'key': datafile('public_cert.pem')}, + audience=self.url) + self.assertEqual(payload['iss'], self.service_account_email) + self.assertEqual(payload['sub'], self.service_account_email) + self.assertEqual(payload['iat'], T1) + self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(uri, self.url) + self.assertEqual(bearer, b'Bearer') + return (httplib2.Response({'status': '200'}), b'') + + h = httplib2.Http() + h.request = mock_request + self.jwt.authorize(h) + h.request(self.url) + + # Ensure we use the cached token + utcnow.return_value = T2_DATE + client_utcnow.return_value = T2_DATE + h.request(self.url) + + @mock.patch('oauth2client.service_account._UTCNOW') + @mock.patch('oauth2client.client._UTCNOW') + @mock.patch('time.time') + def test_authorize_no_aud(self, time, client_utcnow, utcnow): + utcnow.return_value = T1_DATE + client_utcnow.return_value = T1_DATE + time.return_value = T1 + + jwt = _JWTAccessCredentials(self.service_account_email, + self.signer, + private_key_id=self.private_key_id, + client_id=self.client_id) + + def mock_request(uri, method='GET', body=None, headers=None, + redirections=0, connection_type=None): + self.assertEqual(uri, self.url) + bearer, token = headers[b'Authorization'].split() + payload = crypt.verify_signed_jwt_with_certs( + token, + {'key': datafile('public_cert.pem')}, + audience=self.url) + self.assertEqual(payload['iss'], self.service_account_email) + self.assertEqual(payload['sub'], self.service_account_email) + self.assertEqual(payload['iat'], T1) + self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(uri, self.url) + self.assertEqual(bearer, b'Bearer') + return (httplib2.Response({'status': '200'}), b'') + + h = httplib2.Http() + h.request = mock_request + jwt.authorize(h) + h.request(self.url) + + # Ensure we do not cache the token + self.assertIsNone(jwt.access_token) + + @mock.patch('oauth2client.service_account._UTCNOW') + def test_authorize_stale_token(self, utcnow): + utcnow.return_value = T1_DATE + # Create an initial token + h = HttpMockSequence([({'status': '200'}, b''), + ({'status': '200'}, b'')]) + self.jwt.authorize(h) + h.request(self.url) + token_1 = self.jwt.access_token + + # Expire the token + utcnow.return_value = T3_DATE + h.request(self.url) + token_2 = self.jwt.access_token + self.assertEquals(self.jwt.token_expiry, T3_EXPIRY_DATE) + self.assertNotEqual(token_1, token_2) + + @mock.patch('oauth2client.service_account._UTCNOW') + def test_authorize_401(self, utcnow): + utcnow.return_value = T1_DATE + + h = HttpMockSequence([ + ({'status': '200'}, b''), + ({'status': '401'}, b''), + ({'status': '200'}, b'')]) + self.jwt.authorize(h) + h.request(self.url) + token_1 = self.jwt.access_token + + utcnow.return_value = T2_DATE + self.assertEquals(h.request(self.url)[0].status, 200) + token_2 = self.jwt.access_token + # Check the 401 forced a new token + self.assertNotEqual(token_1, token_2) + + @mock.patch('oauth2client.service_account._UTCNOW') + def test_refresh(self, utcnow): + utcnow.return_value = T1_DATE + token_1 = self.jwt.access_token + + utcnow.return_value = T2_DATE + self.jwt.refresh(None) + token_2 = self.jwt.access_token + self.assertEquals(self.jwt.token_expiry, T2_EXPIRY_DATE) + self.assertNotEqual(token_1, token_2) if __name__ == '__main__': # pragma: NO COVER unittest2.main() |