diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-08-10 16:53:46 -0700 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-08-11 08:43:51 -0700 |
commit | 1a0c4dbf92eddda508164a8578a62147590521ed (patch) | |
tree | 471923facc270209d3db2b182d83cc5918d91f0a /tests/test_service_account.py | |
parent | b7f3eca135994d4541a51cd88583562ab2e81069 (diff) | |
download | oauth2client-1a0c4dbf92eddda508164a8578a62147590521ed.tar.gz |
Use transport.request in tests.
In the process
- "spring clean" the modules that were touched
- use HttpMock when HttpMockSequence not needed
- add some verifications on new HttpMock's
Diffstat (limited to 'tests/test_service_account.py')
-rw-r--r-- | tests/test_service_account.py | 100 |
1 files changed, 80 insertions, 20 deletions
diff --git a/tests/test_service_account.py b/tests/test_service_account.py index 7dc8ad0..d6b2f07 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -407,13 +407,15 @@ class JWTAccessCredentialsTests(unittest.TestCase): time.return_value = T1 token_info = self.jwt.get_access_token() + certs = {'key': datafile('public_cert.pem')} payload = crypt.verify_signed_jwt_with_certs( - token_info.access_token, - {'key': datafile('public_cert.pem')}, audience=self.url) + token_info.access_token, certs, audience=self.url) + self.assertEqual(len(payload), 5) self.assertEqual(payload['iss'], self.service_account_email) self.assertEqual(payload['sub'], self.service_account_email) self.assertEqual(payload['iat'], T1) self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(payload['aud'], self.url) self.assertEqual(token_info.expires_in, T1_EXPIRY - T1) # Verify that we vend the same token after 100 seconds @@ -444,19 +446,20 @@ class JWTAccessCredentialsTests(unittest.TestCase): utcnow.return_value = T1_DATE time.return_value = T1 - token_info = self.jwt.get_access_token( - additional_claims={'aud': 'https://test2.url.com', - 'sub': 'dummy2@google.com' - }) + audience = 'https://test2.url.com' + subject = 'dummy2@google.com' + claims = {'aud': audience, 'sub': subject} + token_info = self.jwt.get_access_token(additional_claims=claims) + certs = {'key': datafile('public_cert.pem')} payload = crypt.verify_signed_jwt_with_certs( - token_info.access_token, - {'key': datafile('public_cert.pem')}, - audience='https://test2.url.com') + token_info.access_token, certs, audience=audience) expires_in = token_info.expires_in + self.assertEqual(len(payload), 5) self.assertEqual(payload['iss'], self.service_account_email) - self.assertEqual(payload['sub'], 'dummy2@google.com') + self.assertEqual(payload['sub'], subject) self.assertEqual(payload['iat'], T1) self.assertEqual(payload['exp'], T1_EXPIRY) + self.assertEqual(payload['aud'], audience) self.assertEqual(expires_in, T1_EXPIRY - T1) def test_revoke(self): @@ -502,13 +505,15 @@ class JWTAccessCredentialsTests(unittest.TestCase): self.assertIsNone(info['body']) self.assertEqual(len(info['headers']), 1) bearer, token = info['headers'][b'Authorization'].split() + self.assertEqual(bearer, b'Bearer') payload = crypt.verify_signed_jwt_with_certs( token, certs, audience=self.url) + self.assertEqual(len(payload), 5) self.assertEqual(payload['iss'], self.service_account_email) self.assertEqual(payload['sub'], self.service_account_email) self.assertEqual(payload['iat'], T1) self.assertEqual(payload['exp'], T1_EXPIRY) - self.assertEqual(bearer, b'Bearer') + self.assertEqual(payload['aud'], self.url) @mock.patch('oauth2client.client._UTCNOW') @mock.patch('time.time') @@ -538,53 +543,108 @@ class JWTAccessCredentialsTests(unittest.TestCase): self.assertIsNone(info['body']) self.assertEqual(len(info['headers']), 1) bearer, token = info['headers'][b'Authorization'].split() + self.assertEqual(bearer, b'Bearer') certs = {'key': datafile('public_cert.pem')} payload = crypt.verify_signed_jwt_with_certs( token, certs, audience=self.url) + self.assertEqual(len(payload), 5) self.assertEqual(payload['iss'], self.service_account_email) self.assertEqual(payload['sub'], self.service_account_email) self.assertEqual(payload['iat'], T1) self.assertEqual(payload['exp'], T1_EXPIRY) - self.assertEqual(bearer, b'Bearer') + self.assertEqual(payload['aud'], self.url) @mock.patch('oauth2client.client._UTCNOW') def test_authorize_stale_token(self, utcnow): utcnow.return_value = T1_DATE # Create an initial token - h = http_mock.HttpMockSequence([ + http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b''), ({'status': http_client.OK}, b''), ]) - self.jwt.authorize(h) - h.request(self.url) + self.jwt.authorize(http) + transport.request(http, self.url) token_1 = self.jwt.access_token # Expire the token utcnow.return_value = T3_DATE - h.request(self.url) + transport.request(http, self.url) token_2 = self.jwt.access_token self.assertEquals(self.jwt.token_expiry, T3_EXPIRY_DATE) self.assertNotEqual(token_1, token_2) + # Verify mocks. + certs = {'key': datafile('public_cert.pem')} + self.assertEqual(len(http.requests), 2) + issued_at_vals = (T1, T3) + exp_vals = (T1_EXPIRY, T3_EXPIRY) + for info, issued_at, exp_val in zip(http.requests, issued_at_vals, + exp_vals): + self.assertEqual(info['uri'], self.url) + self.assertEqual(info['method'], 'GET') + self.assertIsNone(info['body']) + self.assertEqual(len(info['headers']), 1) + bearer, token = info['headers'][b'Authorization'].split() + self.assertEqual(bearer, b'Bearer') + # To parse the token, skip the time check, since this + # test intentionally has stale tokens. + with mock.patch('oauth2client.crypt._verify_time_range', + return_value=True): + payload = crypt.verify_signed_jwt_with_certs( + token, certs, audience=self.url) + self.assertEqual(len(payload), 5) + self.assertEqual(payload['iss'], self.service_account_email) + self.assertEqual(payload['sub'], self.service_account_email) + self.assertEqual(payload['iat'], issued_at) + self.assertEqual(payload['exp'], exp_val) + self.assertEqual(payload['aud'], self.url) + @mock.patch('oauth2client.client._UTCNOW') def test_authorize_401(self, utcnow): utcnow.return_value = T1_DATE - h = http_mock.HttpMockSequence([ + http = http_mock.HttpMockSequence([ ({'status': http_client.OK}, b''), ({'status': http_client.UNAUTHORIZED}, b''), ({'status': http_client.OK}, b''), ]) - self.jwt.authorize(h) - h.request(self.url) + self.jwt.authorize(http) + transport.request(http, self.url) token_1 = self.jwt.access_token utcnow.return_value = T2_DATE - self.assertEquals(h.request(self.url)[0].status, 200) + response, _ = transport.request(http, self.url) + self.assertEquals(response.status, http_client.OK) token_2 = self.jwt.access_token # Check the 401 forced a new token self.assertNotEqual(token_1, token_2) + # Verify mocks. + certs = {'key': datafile('public_cert.pem')} + self.assertEqual(len(http.requests), 3) + issued_at_vals = (T1, T1, T2) + exp_vals = (T1_EXPIRY, T1_EXPIRY, T2_EXPIRY) + for info, issued_at, exp_val in zip(http.requests, issued_at_vals, + exp_vals): + self.assertEqual(info['uri'], self.url) + self.assertEqual(info['method'], 'GET') + self.assertIsNone(info['body']) + self.assertEqual(len(info['headers']), 1) + bearer, token = info['headers'][b'Authorization'].split() + self.assertEqual(bearer, b'Bearer') + # To parse the token, skip the time check, since this + # test intentionally has stale tokens. + with mock.patch('oauth2client.crypt._verify_time_range', + return_value=True): + payload = crypt.verify_signed_jwt_with_certs( + token, certs, audience=self.url) + self.assertEqual(len(payload), 5) + self.assertEqual(payload['iss'], self.service_account_email) + self.assertEqual(payload['sub'], self.service_account_email) + self.assertEqual(payload['iat'], issued_at) + self.assertEqual(payload['exp'], exp_val) + self.assertEqual(payload['aud'], self.url) + @mock.patch('oauth2client.client._UTCNOW') def test_refresh(self, utcnow): utcnow.return_value = T1_DATE |