aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorKen Payson <kpayson@google.com>2016-04-22 10:10:28 -0700
committerKen Payson <kpayson@google.com>2016-05-16 10:30:08 -0700
commit82cde04917da5c2db31241e4bb334ed5e0ba66fe (patch)
tree1eeb811e6419a88a889b90569d294356ecc25dfb /tests
parente99a4ac3c5d3c2944680ea180b71a0d144d6128b (diff)
downloadoauth2client-82cde04917da5c2db31241e4bb334ed5e0ba66fe.tar.gz
Added JWTAccessCredentials.
Newer Google APIs can accept JWTs signed using ServiceAccountCredentials for authentication. (See https://jwt.io/). The new behavior for GoogleCredentials.get_application_default() will attempt to use a signed JWT if ServiceAccountCredentials are available and no scope is specified. Upon specifying a scope, OAuth2 authentication will be used.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_client.py14
-rw-r--r--tests/test_service_account.py228
2 files changed, 242 insertions, 0 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 6e0d997..7e39877 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -840,6 +840,20 @@ class GoogleCredentialsTests(unittest2.TestCase):
creds2_vals.pop('_signer')
self.assertEqual(creds1_vals, creds2_vals)
+ def test_to_from_json_service_account_scoped(self):
+ credentials_file = datafile(
+ os.path.join('gcloud', _WELL_KNOWN_CREDENTIALS_FILE))
+ creds1 = GoogleCredentials.from_stream(credentials_file)
+ creds1 = creds1.create_scoped(['dummy_scope'])
+ # Convert to and then back from json.
+ creds2 = GoogleCredentials.from_json(creds1.to_json())
+
+ creds1_vals = creds1.__dict__
+ creds1_vals.pop('_signer')
+ creds2_vals = creds2.__dict__
+ creds2_vals.pop('_signer')
+ self.assertEqual(creds1_vals, creds2_vals)
+
def test_parse_expiry(self):
dt = datetime.datetime(2016, 1, 1)
parsed_expiry = client._parse_expiry(dt)
diff --git a/tests/test_service_account.py b/tests/test_service_account.py
index 3d9e7db..9697bc5 100644
--- a/tests/test_service_account.py
+++ b/tests/test_service_account.py
@@ -23,11 +23,13 @@ import os
import rsa
import tempfile
+import httplib2
import mock
import unittest2
from .http_mock import HttpMockSequence
from oauth2client import crypt
+from oauth2client.service_account import _JWTAccessCredentials
from oauth2client.service_account import ServiceAccountCredentials
from oauth2client.service_account import SERVICE_ACCOUNT
@@ -354,6 +356,232 @@ class ServiceAccountCredentialsTests(unittest2.TestCase):
self.assertEqual(credentials.access_token, token2)
+TOKEN_LIFE = _JWTAccessCredentials._MAX_TOKEN_LIFETIME_SECS
+T1 = 42
+T1_DATE = datetime.datetime(1970, 1, 1, second=T1)
+T1_EXPIRY = T1 + TOKEN_LIFE
+T1_EXPIRY_DATE = T1_DATE + datetime.timedelta(seconds=TOKEN_LIFE)
+
+T2 = T1 + 100
+T2_DATE = T1_DATE + datetime.timedelta(seconds=100)
+T2_EXPIRY = T2 + TOKEN_LIFE
+T2_EXPIRY_DATE = T2_DATE + datetime.timedelta(seconds=TOKEN_LIFE)
+
+T3 = T1 + TOKEN_LIFE + 1
+T3_DATE = T1_DATE + datetime.timedelta(seconds=TOKEN_LIFE + 1)
+T3_EXPIRY = T3 + TOKEN_LIFE
+T3_EXPIRY_DATE = T3_DATE + datetime.timedelta(seconds=TOKEN_LIFE)
+
+
+class JWTAccessCredentialsTests(unittest2.TestCase):
+
+ def setUp(self):
+ self.client_id = '123'
+ self.service_account_email = 'dummy@google.com'
+ self.private_key_id = 'ABCDEF'
+ self.private_key = datafile('pem_from_pkcs12.pem')
+ self.signer = crypt.Signer.from_string(self.private_key)
+ self.url = 'https://test.url.com'
+ self.jwt = _JWTAccessCredentials(self.service_account_email,
+ self.signer,
+ private_key_id=self.private_key_id,
+ client_id=self.client_id,
+ additional_claims={'aud': self.url})
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ @mock.patch('oauth2client.client._UTCNOW')
+ @mock.patch('time.time')
+ def test_get_access_token_no_claims(self, time, client_utcnow, utcnow):
+ utcnow.return_value = T1_DATE
+ client_utcnow.return_value = T1_DATE
+ time.return_value = T1
+
+ token_info = self.jwt.get_access_token()
+ payload = crypt.verify_signed_jwt_with_certs(
+ token_info.access_token,
+ {'key': datafile('public_cert.pem')}, audience=self.url)
+ self.assertEqual(payload['iss'], self.service_account_email)
+ self.assertEqual(payload['sub'], self.service_account_email)
+ self.assertEqual(payload['iat'], T1)
+ self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(token_info.expires_in, T1_EXPIRY - T1)
+
+ # Verify that we vend the same token after 100 seconds
+ utcnow.return_value = T2_DATE
+ client_utcnow.return_value = T2_DATE
+ token_info = self.jwt.get_access_token()
+ payload = crypt.verify_signed_jwt_with_certs(
+ token_info.access_token,
+ {'key': datafile('public_cert.pem')}, audience=self.url)
+ self.assertEqual(payload['iat'], T1)
+ self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(token_info.expires_in, T1_EXPIRY - T2)
+
+ # Verify that we vend a new token after _MAX_TOKEN_LIFETIME_SECS
+ utcnow.return_value = T3_DATE
+ client_utcnow.return_value = T3_DATE
+ time.return_value = T3
+ token_info = self.jwt.get_access_token()
+ payload = crypt.verify_signed_jwt_with_certs(
+ token_info.access_token,
+ {'key': datafile('public_cert.pem')}, audience=self.url)
+ expires_in = token_info.expires_in
+ self.assertEqual(payload['iat'], T3)
+ self.assertEqual(payload['exp'], T3_EXPIRY)
+ self.assertEqual(expires_in, T3_EXPIRY - T3)
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ @mock.patch('time.time')
+ def test_get_access_token_additional_claims(self, time, utcnow):
+ utcnow.return_value = T1_DATE
+ time.return_value = T1
+
+ token_info = self.jwt.get_access_token(additional_claims=
+ {'aud': 'https://test2.url.com',
+ 'sub': 'dummy2@google.com'
+ })
+ payload = crypt.verify_signed_jwt_with_certs(
+ token_info.access_token,
+ {'key' : datafile('public_cert.pem')},
+ audience='https://test2.url.com')
+ expires_in = token_info.expires_in
+ self.assertEqual(payload['iss'], self.service_account_email)
+ self.assertEqual(payload['sub'], 'dummy2@google.com')
+ self.assertEqual(payload['iat'], T1)
+ self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(expires_in, T1_EXPIRY - T1)
+
+ def test_revoke(self):
+ self.jwt.revoke(None)
+
+ def test_create_scoped_required(self):
+ self.assertTrue(self.jwt.create_scoped_required())
+
+ def test_create_scoped(self):
+ self.jwt._private_key_pkcs12 = ''
+ self.jwt._private_key_password = ''
+
+ new_credentials = self.jwt.create_scoped('dummy_scope')
+ self.assertNotEqual(self.jwt, new_credentials)
+ self.assertIsInstance(new_credentials, ServiceAccountCredentials)
+ self.assertEqual('dummy_scope', new_credentials._scopes)
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ @mock.patch('oauth2client.client._UTCNOW')
+ @mock.patch('time.time')
+ def test_authorize_success(self, time, client_utcnow, utcnow):
+ utcnow.return_value = T1_DATE
+ client_utcnow.return_value = T1_DATE
+ time.return_value = T1
+
+ def mock_request(uri, method='GET', body=None, headers=None,
+ redirections=0, connection_type=None):
+ self.assertEqual(uri, self.url)
+ bearer, token = headers[b'Authorization'].split()
+ payload = crypt.verify_signed_jwt_with_certs(
+ token,
+ {'key': datafile('public_cert.pem')},
+ audience=self.url)
+ self.assertEqual(payload['iss'], self.service_account_email)
+ self.assertEqual(payload['sub'], self.service_account_email)
+ self.assertEqual(payload['iat'], T1)
+ self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(uri, self.url)
+ self.assertEqual(bearer, b'Bearer')
+ return (httplib2.Response({'status': '200'}), b'')
+
+ h = httplib2.Http()
+ h.request = mock_request
+ self.jwt.authorize(h)
+ h.request(self.url)
+
+ # Ensure we use the cached token
+ utcnow.return_value = T2_DATE
+ client_utcnow.return_value = T2_DATE
+ h.request(self.url)
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ @mock.patch('oauth2client.client._UTCNOW')
+ @mock.patch('time.time')
+ def test_authorize_no_aud(self, time, client_utcnow, utcnow):
+ utcnow.return_value = T1_DATE
+ client_utcnow.return_value = T1_DATE
+ time.return_value = T1
+
+ jwt = _JWTAccessCredentials(self.service_account_email,
+ self.signer,
+ private_key_id=self.private_key_id,
+ client_id=self.client_id)
+
+ def mock_request(uri, method='GET', body=None, headers=None,
+ redirections=0, connection_type=None):
+ self.assertEqual(uri, self.url)
+ bearer, token = headers[b'Authorization'].split()
+ payload = crypt.verify_signed_jwt_with_certs(
+ token,
+ {'key': datafile('public_cert.pem')},
+ audience=self.url)
+ self.assertEqual(payload['iss'], self.service_account_email)
+ self.assertEqual(payload['sub'], self.service_account_email)
+ self.assertEqual(payload['iat'], T1)
+ self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(uri, self.url)
+ self.assertEqual(bearer, b'Bearer')
+ return (httplib2.Response({'status': '200'}), b'')
+
+ h = httplib2.Http()
+ h.request = mock_request
+ jwt.authorize(h)
+ h.request(self.url)
+
+ # Ensure we do not cache the token
+ self.assertIsNone(jwt.access_token)
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ def test_authorize_stale_token(self, utcnow):
+ utcnow.return_value = T1_DATE
+ # Create an initial token
+ h = HttpMockSequence([({'status': '200'}, b''),
+ ({'status': '200'}, b'')])
+ self.jwt.authorize(h)
+ h.request(self.url)
+ token_1 = self.jwt.access_token
+
+ # Expire the token
+ utcnow.return_value = T3_DATE
+ h.request(self.url)
+ token_2 = self.jwt.access_token
+ self.assertEquals(self.jwt.token_expiry, T3_EXPIRY_DATE)
+ self.assertNotEqual(token_1, token_2)
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ def test_authorize_401(self, utcnow):
+ utcnow.return_value = T1_DATE
+
+ h = HttpMockSequence([
+ ({'status': '200'}, b''),
+ ({'status': '401'}, b''),
+ ({'status': '200'}, b'')])
+ self.jwt.authorize(h)
+ h.request(self.url)
+ token_1 = self.jwt.access_token
+
+ utcnow.return_value = T2_DATE
+ self.assertEquals(h.request(self.url)[0].status, 200)
+ token_2 = self.jwt.access_token
+ # Check the 401 forced a new token
+ self.assertNotEqual(token_1, token_2)
+
+ @mock.patch('oauth2client.service_account._UTCNOW')
+ def test_refresh(self, utcnow):
+ utcnow.return_value = T1_DATE
+ token_1 = self.jwt.access_token
+
+ utcnow.return_value = T2_DATE
+ self.jwt.refresh(None)
+ token_2 = self.jwt.access_token
+ self.assertEquals(self.jwt.token_expiry, T2_EXPIRY_DATE)
+ self.assertNotEqual(token_1, token_2)
if __name__ == '__main__': # pragma: NO COVER
unittest2.main()