diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-01-05 00:02:20 -0800 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-01-05 10:04:19 -0800 |
commit | 30c342a437a137c82444364073f4e4afebeb891a (patch) | |
tree | f2322c313096e2db4185c7fa001458e93e33fd8a /tests/test_service_account.py | |
parent | 24214201c67e816eb97a4f74c07a86e6be8d611c (diff) | |
download | oauth2client-30c342a437a137c82444364073f4e4afebeb891a.tar.gz |
Factor out usage of utcnow() in client.
This is to enable better stubs in testing and eliminate
two sleep() statements in unit tests. (The philosophy
is "unit tests should be fast".)
Diffstat (limited to 'tests/test_service_account.py')
-rw-r--r-- | tests/test_service_account.py | 92 |
1 files changed, 74 insertions, 18 deletions
diff --git a/tests/test_service_account.py b/tests/test_service_account.py index 1ba0cae..09d6234 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -17,12 +17,14 @@ Unit tests for service account credentials implemented using RSA. """ +import datetime import json import os import rsa -import time import unittest +import mock + from .http_mock import HttpMockSequence from oauth2client.service_account import _ServiceAccountCredentials @@ -88,11 +90,28 @@ class ServiceAccountCredentialsTests(unittest.TestCase): _ServiceAccountCredentials)) self.assertEqual('dummy_scope', new_credentials._scopes) - def test_access_token(self): - S = 2 # number of seconds in which the token expires - token_response_first = {'access_token': 'first_token', 'expires_in': S} - token_response_second = {'access_token': 'second_token', - 'expires_in': S} + @mock.patch('oauth2client.client._UTCNOW') + @mock.patch('rsa.pkcs1.sign', return_value=b'signed-value') + def test_access_token(self, sign_func, utcnow): + # Configure the patch. + seconds = 11 + NOW = datetime.datetime(1992, 12, 31, second=seconds) + utcnow.return_value = NOW + + lifetime = 2 # number of seconds in which the token expires + EXPIRY_TIME = datetime.datetime(1992, 12, 31, + second=seconds + lifetime) + + token1 = u'first_token' + token_response_first = { + 'access_token': token1, + 'expires_in': lifetime, + } + token2 = u'second_token' + token_response_second = { + 'access_token': token2, + 'expires_in': lifetime, + } http = HttpMockSequence([ ({'status': '200'}, json.dumps(token_response_first).encode('utf-8')), @@ -100,27 +119,64 @@ class ServiceAccountCredentialsTests(unittest.TestCase): json.dumps(token_response_second).encode('utf-8')), ]) - token = self.credentials.get_access_token(http=http) - self.assertEqual('first_token', token.access_token) - self.assertEqual(S - 1, token.expires_in) + # Get Access Token, First attempt. + self.assertEqual(self.credentials.access_token, None) self.assertFalse(self.credentials.access_token_expired) - self.assertEqual(token_response_first, self.credentials.token_response) - + self.assertEqual(self.credentials.token_expiry, None) token = self.credentials.get_access_token(http=http) - self.assertEqual('first_token', token.access_token) - self.assertEqual(S - 1, token.expires_in) + self.assertEqual(self.credentials.token_expiry, EXPIRY_TIME) + self.assertEqual(token1, token.access_token) + self.assertEqual(lifetime, token.expires_in) + self.assertEqual(token_response_first, + self.credentials.token_response) + # Two utcnow calls are expected: + # - get_access_token() -> _do_refresh_request (setting expires in) + # - get_access_token() -> _expires_in() + expected_utcnow_calls = [mock.call()] * 2 + self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) + # One rsa.pkcs1.sign expected: Actual refresh was needed. + self.assertEqual(len(sign_func.mock_calls), 1) + + # Get Access Token, Second Attempt (not expired) + self.assertEqual(self.credentials.access_token, token1) self.assertFalse(self.credentials.access_token_expired) + token = self.credentials.get_access_token(http=http) + # Make sure no refresh occurred since the token was not expired. + self.assertEqual(token1, token.access_token) + self.assertEqual(lifetime, token.expires_in) self.assertEqual(token_response_first, self.credentials.token_response) - - time.sleep(S + 0.5) # some margin to avoid flakiness + # Three more utcnow calls are expected: + # - access_token_expired + # - get_access_token() -> access_token_expired + # - get_access_token -> _expires_in + expected_utcnow_calls = [mock.call()] * (2 + 3) + self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) + # No rsa.pkcs1.sign expected: the token was not expired. + self.assertEqual(len(sign_func.mock_calls), 1 + 0) + + # Get Access Token, Third Attempt (force expiration) + self.assertEqual(self.credentials.access_token, token1) + self.credentials.token_expiry = NOW # Manually force expiry. self.assertTrue(self.credentials.access_token_expired) - token = self.credentials.get_access_token(http=http) - self.assertEqual('second_token', token.access_token) - self.assertEqual(S - 1, token.expires_in) + # Make sure refresh occurred since the token was not expired. + self.assertEqual(token2, token.access_token) + self.assertEqual(lifetime, token.expires_in) self.assertFalse(self.credentials.access_token_expired) self.assertEqual(token_response_second, self.credentials.token_response) + # Five more utcnow calls are expected: + # - access_token_expired + # - get_access_token -> access_token_expired + # - get_access_token -> _do_refresh_request + # - get_access_token -> _expires_in + # - access_token_expired + expected_utcnow_calls = [mock.call()] * (2 + 3 + 5) + self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) + # One more rsa.pkcs1.sign expected: Actual refresh was needed. + self.assertEqual(len(sign_func.mock_calls), 1 + 0 + 1) + + self.assertEqual(self.credentials.access_token, token2) if __name__ == '__main__': # pragma: NO COVER |