aboutsummaryrefslogtreecommitdiff
path: root/tests/test_service_account.py
diff options
context:
space:
mode:
authorDanny Hermes <daniel.j.hermes@gmail.com>2016-08-10 16:53:46 -0700
committerDanny Hermes <daniel.j.hermes@gmail.com>2016-08-11 08:43:51 -0700
commit1a0c4dbf92eddda508164a8578a62147590521ed (patch)
tree471923facc270209d3db2b182d83cc5918d91f0a /tests/test_service_account.py
parentb7f3eca135994d4541a51cd88583562ab2e81069 (diff)
downloadoauth2client-1a0c4dbf92eddda508164a8578a62147590521ed.tar.gz
Use transport.request in tests.
In the process - "spring clean" the modules that were touched - use HttpMock when HttpMockSequence not needed - add some verifications on new HttpMock's
Diffstat (limited to 'tests/test_service_account.py')
-rw-r--r--tests/test_service_account.py100
1 files changed, 80 insertions, 20 deletions
diff --git a/tests/test_service_account.py b/tests/test_service_account.py
index 7dc8ad0..d6b2f07 100644
--- a/tests/test_service_account.py
+++ b/tests/test_service_account.py
@@ -407,13 +407,15 @@ class JWTAccessCredentialsTests(unittest.TestCase):
time.return_value = T1
token_info = self.jwt.get_access_token()
+ certs = {'key': datafile('public_cert.pem')}
payload = crypt.verify_signed_jwt_with_certs(
- token_info.access_token,
- {'key': datafile('public_cert.pem')}, audience=self.url)
+ token_info.access_token, certs, audience=self.url)
+ self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
self.assertEqual(payload['sub'], self.service_account_email)
self.assertEqual(payload['iat'], T1)
self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(payload['aud'], self.url)
self.assertEqual(token_info.expires_in, T1_EXPIRY - T1)
# Verify that we vend the same token after 100 seconds
@@ -444,19 +446,20 @@ class JWTAccessCredentialsTests(unittest.TestCase):
utcnow.return_value = T1_DATE
time.return_value = T1
- token_info = self.jwt.get_access_token(
- additional_claims={'aud': 'https://test2.url.com',
- 'sub': 'dummy2@google.com'
- })
+ audience = 'https://test2.url.com'
+ subject = 'dummy2@google.com'
+ claims = {'aud': audience, 'sub': subject}
+ token_info = self.jwt.get_access_token(additional_claims=claims)
+ certs = {'key': datafile('public_cert.pem')}
payload = crypt.verify_signed_jwt_with_certs(
- token_info.access_token,
- {'key': datafile('public_cert.pem')},
- audience='https://test2.url.com')
+ token_info.access_token, certs, audience=audience)
expires_in = token_info.expires_in
+ self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
- self.assertEqual(payload['sub'], 'dummy2@google.com')
+ self.assertEqual(payload['sub'], subject)
self.assertEqual(payload['iat'], T1)
self.assertEqual(payload['exp'], T1_EXPIRY)
+ self.assertEqual(payload['aud'], audience)
self.assertEqual(expires_in, T1_EXPIRY - T1)
def test_revoke(self):
@@ -502,13 +505,15 @@ class JWTAccessCredentialsTests(unittest.TestCase):
self.assertIsNone(info['body'])
self.assertEqual(len(info['headers']), 1)
bearer, token = info['headers'][b'Authorization'].split()
+ self.assertEqual(bearer, b'Bearer')
payload = crypt.verify_signed_jwt_with_certs(
token, certs, audience=self.url)
+ self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
self.assertEqual(payload['sub'], self.service_account_email)
self.assertEqual(payload['iat'], T1)
self.assertEqual(payload['exp'], T1_EXPIRY)
- self.assertEqual(bearer, b'Bearer')
+ self.assertEqual(payload['aud'], self.url)
@mock.patch('oauth2client.client._UTCNOW')
@mock.patch('time.time')
@@ -538,53 +543,108 @@ class JWTAccessCredentialsTests(unittest.TestCase):
self.assertIsNone(info['body'])
self.assertEqual(len(info['headers']), 1)
bearer, token = info['headers'][b'Authorization'].split()
+ self.assertEqual(bearer, b'Bearer')
certs = {'key': datafile('public_cert.pem')}
payload = crypt.verify_signed_jwt_with_certs(
token, certs, audience=self.url)
+ self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
self.assertEqual(payload['sub'], self.service_account_email)
self.assertEqual(payload['iat'], T1)
self.assertEqual(payload['exp'], T1_EXPIRY)
- self.assertEqual(bearer, b'Bearer')
+ self.assertEqual(payload['aud'], self.url)
@mock.patch('oauth2client.client._UTCNOW')
def test_authorize_stale_token(self, utcnow):
utcnow.return_value = T1_DATE
# Create an initial token
- h = http_mock.HttpMockSequence([
+ http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b''),
({'status': http_client.OK}, b''),
])
- self.jwt.authorize(h)
- h.request(self.url)
+ self.jwt.authorize(http)
+ transport.request(http, self.url)
token_1 = self.jwt.access_token
# Expire the token
utcnow.return_value = T3_DATE
- h.request(self.url)
+ transport.request(http, self.url)
token_2 = self.jwt.access_token
self.assertEquals(self.jwt.token_expiry, T3_EXPIRY_DATE)
self.assertNotEqual(token_1, token_2)
+ # Verify mocks.
+ certs = {'key': datafile('public_cert.pem')}
+ self.assertEqual(len(http.requests), 2)
+ issued_at_vals = (T1, T3)
+ exp_vals = (T1_EXPIRY, T3_EXPIRY)
+ for info, issued_at, exp_val in zip(http.requests, issued_at_vals,
+ exp_vals):
+ self.assertEqual(info['uri'], self.url)
+ self.assertEqual(info['method'], 'GET')
+ self.assertIsNone(info['body'])
+ self.assertEqual(len(info['headers']), 1)
+ bearer, token = info['headers'][b'Authorization'].split()
+ self.assertEqual(bearer, b'Bearer')
+ # To parse the token, skip the time check, since this
+ # test intentionally has stale tokens.
+ with mock.patch('oauth2client.crypt._verify_time_range',
+ return_value=True):
+ payload = crypt.verify_signed_jwt_with_certs(
+ token, certs, audience=self.url)
+ self.assertEqual(len(payload), 5)
+ self.assertEqual(payload['iss'], self.service_account_email)
+ self.assertEqual(payload['sub'], self.service_account_email)
+ self.assertEqual(payload['iat'], issued_at)
+ self.assertEqual(payload['exp'], exp_val)
+ self.assertEqual(payload['aud'], self.url)
+
@mock.patch('oauth2client.client._UTCNOW')
def test_authorize_401(self, utcnow):
utcnow.return_value = T1_DATE
- h = http_mock.HttpMockSequence([
+ http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b''),
({'status': http_client.UNAUTHORIZED}, b''),
({'status': http_client.OK}, b''),
])
- self.jwt.authorize(h)
- h.request(self.url)
+ self.jwt.authorize(http)
+ transport.request(http, self.url)
token_1 = self.jwt.access_token
utcnow.return_value = T2_DATE
- self.assertEquals(h.request(self.url)[0].status, 200)
+ response, _ = transport.request(http, self.url)
+ self.assertEquals(response.status, http_client.OK)
token_2 = self.jwt.access_token
# Check the 401 forced a new token
self.assertNotEqual(token_1, token_2)
+ # Verify mocks.
+ certs = {'key': datafile('public_cert.pem')}
+ self.assertEqual(len(http.requests), 3)
+ issued_at_vals = (T1, T1, T2)
+ exp_vals = (T1_EXPIRY, T1_EXPIRY, T2_EXPIRY)
+ for info, issued_at, exp_val in zip(http.requests, issued_at_vals,
+ exp_vals):
+ self.assertEqual(info['uri'], self.url)
+ self.assertEqual(info['method'], 'GET')
+ self.assertIsNone(info['body'])
+ self.assertEqual(len(info['headers']), 1)
+ bearer, token = info['headers'][b'Authorization'].split()
+ self.assertEqual(bearer, b'Bearer')
+ # To parse the token, skip the time check, since this
+ # test intentionally has stale tokens.
+ with mock.patch('oauth2client.crypt._verify_time_range',
+ return_value=True):
+ payload = crypt.verify_signed_jwt_with_certs(
+ token, certs, audience=self.url)
+ self.assertEqual(len(payload), 5)
+ self.assertEqual(payload['iss'], self.service_account_email)
+ self.assertEqual(payload['sub'], self.service_account_email)
+ self.assertEqual(payload['iat'], issued_at)
+ self.assertEqual(payload['exp'], exp_val)
+ self.assertEqual(payload['aud'], self.url)
+
@mock.patch('oauth2client.client._UTCNOW')
def test_refresh(self, utcnow):
utcnow.return_value = T1_DATE