aboutsummaryrefslogtreecommitdiff
path: root/tests/test_service_account.py
diff options
context:
space:
mode:
authorDanny Hermes <daniel.j.hermes@gmail.com>2016-01-05 00:02:20 -0800
committerDanny Hermes <daniel.j.hermes@gmail.com>2016-01-05 10:04:19 -0800
commit30c342a437a137c82444364073f4e4afebeb891a (patch)
treef2322c313096e2db4185c7fa001458e93e33fd8a /tests/test_service_account.py
parent24214201c67e816eb97a4f74c07a86e6be8d611c (diff)
downloadoauth2client-30c342a437a137c82444364073f4e4afebeb891a.tar.gz
Factor out usage of utcnow() in client.
This is to enable better stubs in testing and eliminate two sleep() statements in unit tests. (The philosophy is "unit tests should be fast".)
Diffstat (limited to 'tests/test_service_account.py')
-rw-r--r--tests/test_service_account.py92
1 files changed, 74 insertions, 18 deletions
diff --git a/tests/test_service_account.py b/tests/test_service_account.py
index 1ba0cae..09d6234 100644
--- a/tests/test_service_account.py
+++ b/tests/test_service_account.py
@@ -17,12 +17,14 @@
Unit tests for service account credentials implemented using RSA.
"""
+import datetime
import json
import os
import rsa
-import time
import unittest
+import mock
+
from .http_mock import HttpMockSequence
from oauth2client.service_account import _ServiceAccountCredentials
@@ -88,11 +90,28 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
_ServiceAccountCredentials))
self.assertEqual('dummy_scope', new_credentials._scopes)
- def test_access_token(self):
- S = 2 # number of seconds in which the token expires
- token_response_first = {'access_token': 'first_token', 'expires_in': S}
- token_response_second = {'access_token': 'second_token',
- 'expires_in': S}
+ @mock.patch('oauth2client.client._UTCNOW')
+ @mock.patch('rsa.pkcs1.sign', return_value=b'signed-value')
+ def test_access_token(self, sign_func, utcnow):
+ # Configure the patch.
+ seconds = 11
+ NOW = datetime.datetime(1992, 12, 31, second=seconds)
+ utcnow.return_value = NOW
+
+ lifetime = 2 # number of seconds in which the token expires
+ EXPIRY_TIME = datetime.datetime(1992, 12, 31,
+ second=seconds + lifetime)
+
+ token1 = u'first_token'
+ token_response_first = {
+ 'access_token': token1,
+ 'expires_in': lifetime,
+ }
+ token2 = u'second_token'
+ token_response_second = {
+ 'access_token': token2,
+ 'expires_in': lifetime,
+ }
http = HttpMockSequence([
({'status': '200'},
json.dumps(token_response_first).encode('utf-8')),
@@ -100,27 +119,64 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
json.dumps(token_response_second).encode('utf-8')),
])
- token = self.credentials.get_access_token(http=http)
- self.assertEqual('first_token', token.access_token)
- self.assertEqual(S - 1, token.expires_in)
+ # Get Access Token, First attempt.
+ self.assertEqual(self.credentials.access_token, None)
self.assertFalse(self.credentials.access_token_expired)
- self.assertEqual(token_response_first, self.credentials.token_response)
-
+ self.assertEqual(self.credentials.token_expiry, None)
token = self.credentials.get_access_token(http=http)
- self.assertEqual('first_token', token.access_token)
- self.assertEqual(S - 1, token.expires_in)
+ self.assertEqual(self.credentials.token_expiry, EXPIRY_TIME)
+ self.assertEqual(token1, token.access_token)
+ self.assertEqual(lifetime, token.expires_in)
+ self.assertEqual(token_response_first,
+ self.credentials.token_response)
+ # Two utcnow calls are expected:
+ # - get_access_token() -> _do_refresh_request (setting expires in)
+ # - get_access_token() -> _expires_in()
+ expected_utcnow_calls = [mock.call()] * 2
+ self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
+ # One rsa.pkcs1.sign expected: Actual refresh was needed.
+ self.assertEqual(len(sign_func.mock_calls), 1)
+
+ # Get Access Token, Second Attempt (not expired)
+ self.assertEqual(self.credentials.access_token, token1)
self.assertFalse(self.credentials.access_token_expired)
+ token = self.credentials.get_access_token(http=http)
+ # Make sure no refresh occurred since the token was not expired.
+ self.assertEqual(token1, token.access_token)
+ self.assertEqual(lifetime, token.expires_in)
self.assertEqual(token_response_first, self.credentials.token_response)
-
- time.sleep(S + 0.5) # some margin to avoid flakiness
+ # Three more utcnow calls are expected:
+ # - access_token_expired
+ # - get_access_token() -> access_token_expired
+ # - get_access_token -> _expires_in
+ expected_utcnow_calls = [mock.call()] * (2 + 3)
+ self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
+ # No rsa.pkcs1.sign expected: the token was not expired.
+ self.assertEqual(len(sign_func.mock_calls), 1 + 0)
+
+ # Get Access Token, Third Attempt (force expiration)
+ self.assertEqual(self.credentials.access_token, token1)
+ self.credentials.token_expiry = NOW # Manually force expiry.
self.assertTrue(self.credentials.access_token_expired)
-
token = self.credentials.get_access_token(http=http)
- self.assertEqual('second_token', token.access_token)
- self.assertEqual(S - 1, token.expires_in)
+ # Make sure refresh occurred since the token was not expired.
+ self.assertEqual(token2, token.access_token)
+ self.assertEqual(lifetime, token.expires_in)
self.assertFalse(self.credentials.access_token_expired)
self.assertEqual(token_response_second,
self.credentials.token_response)
+ # Five more utcnow calls are expected:
+ # - access_token_expired
+ # - get_access_token -> access_token_expired
+ # - get_access_token -> _do_refresh_request
+ # - get_access_token -> _expires_in
+ # - access_token_expired
+ expected_utcnow_calls = [mock.call()] * (2 + 3 + 5)
+ self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
+ # One more rsa.pkcs1.sign expected: Actual refresh was needed.
+ self.assertEqual(len(sign_func.mock_calls), 1 + 0 + 1)
+
+ self.assertEqual(self.credentials.access_token, token2)
if __name__ == '__main__': # pragma: NO COVER