aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDanny Hermes <daniel.j.hermes@gmail.com>2016-02-04 11:56:57 -0800
committerDanny Hermes <daniel.j.hermes@gmail.com>2016-02-04 11:59:58 -0800
commit7219cd776a26e21e6ab4a3a49e1335d5de079a87 (patch)
tree54928856aed5bdecfcca91e8a93b169171b5b3f6 /tests
parent4f4b7de8d1697a8dd601dd4d2920e88659dd8abc (diff)
downloadoauth2client-7219cd776a26e21e6ab4a3a49e1335d5de079a87.tar.gz
Removing novel crypto in service_account.py.
Diffstat (limited to 'tests')
-rw-r--r--tests/test_client.py16
-rw-r--r--tests/test_service_account.py66
2 files changed, 50 insertions, 32 deletions
diff --git a/tests/test_client.py b/tests/test_client.py
index 131e72e..708db4d 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -628,15 +628,17 @@ class GoogleCredentialsTests(unittest2.TestCase):
self.assertEqual(creds.__dict__, creds2.__dict__)
def test_to_from_json_service_account(self):
- self.maxDiff=None
credentials_file = datafile(
os.path.join('gcloud', _WELL_KNOWN_CREDENTIALS_FILE))
- creds = GoogleCredentials.from_stream(credentials_file)
-
- json = creds.to_json()
- creds2 = GoogleCredentials.from_json(json)
-
- self.assertEqual(creds.__dict__, creds2.__dict__)
+ creds1 = GoogleCredentials.from_stream(credentials_file)
+ # Convert to and then back from json.
+ creds2 = GoogleCredentials.from_json(creds1.to_json())
+
+ creds1_vals = creds1.__dict__
+ creds1_vals.pop('_signer')
+ creds2_vals = creds2.__dict__
+ creds2_vals.pop('_signer')
+ self.assertEqual(creds1_vals, creds2_vals)
def test_parse_expiry(self):
dt = datetime.datetime(2016, 1, 1)
diff --git a/tests/test_service_account.py b/tests/test_service_account.py
index 09d6234..df48f31 100644
--- a/tests/test_service_account.py
+++ b/tests/test_service_account.py
@@ -91,13 +91,29 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
self.assertEqual('dummy_scope', new_credentials._scopes)
@mock.patch('oauth2client.client._UTCNOW')
- @mock.patch('rsa.pkcs1.sign', return_value=b'signed-value')
- def test_access_token(self, sign_func, utcnow):
+ def test_access_token(self, utcnow):
# Configure the patch.
seconds = 11
NOW = datetime.datetime(1992, 12, 31, second=seconds)
utcnow.return_value = NOW
+ # Create a custom credentials with a mock signer.
+ signer = mock.MagicMock()
+ signed_value = b'signed-content'
+ signer.sign = mock.MagicMock(name='sign',
+ return_value=signed_value)
+ signer_patch = mock.patch('oauth2client.crypt.Signer.from_string',
+ return_value=signer)
+ with signer_patch as signer_factory:
+ credentials = _ServiceAccountCredentials(
+ self.service_account_id,
+ self.service_account_email,
+ self.private_key_id,
+ self.private_key,
+ '',
+ )
+
+ # Begin testing.
lifetime = 2 # number of seconds in which the token expires
EXPIRY_TIME = datetime.datetime(1992, 12, 31,
second=seconds + lifetime)
@@ -120,51 +136,51 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
])
# Get Access Token, First attempt.
- self.assertEqual(self.credentials.access_token, None)
- self.assertFalse(self.credentials.access_token_expired)
- self.assertEqual(self.credentials.token_expiry, None)
- token = self.credentials.get_access_token(http=http)
- self.assertEqual(self.credentials.token_expiry, EXPIRY_TIME)
+ self.assertEqual(credentials.access_token, None)
+ self.assertFalse(credentials.access_token_expired)
+ self.assertEqual(credentials.token_expiry, None)
+ token = credentials.get_access_token(http=http)
+ self.assertEqual(credentials.token_expiry, EXPIRY_TIME)
self.assertEqual(token1, token.access_token)
self.assertEqual(lifetime, token.expires_in)
self.assertEqual(token_response_first,
- self.credentials.token_response)
+ credentials.token_response)
# Two utcnow calls are expected:
# - get_access_token() -> _do_refresh_request (setting expires in)
# - get_access_token() -> _expires_in()
expected_utcnow_calls = [mock.call()] * 2
self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
- # One rsa.pkcs1.sign expected: Actual refresh was needed.
- self.assertEqual(len(sign_func.mock_calls), 1)
+ # One call to sign() expected: Actual refresh was needed.
+ self.assertEqual(len(signer.sign.mock_calls), 1)
# Get Access Token, Second Attempt (not expired)
- self.assertEqual(self.credentials.access_token, token1)
- self.assertFalse(self.credentials.access_token_expired)
- token = self.credentials.get_access_token(http=http)
+ self.assertEqual(credentials.access_token, token1)
+ self.assertFalse(credentials.access_token_expired)
+ token = credentials.get_access_token(http=http)
# Make sure no refresh occurred since the token was not expired.
self.assertEqual(token1, token.access_token)
self.assertEqual(lifetime, token.expires_in)
- self.assertEqual(token_response_first, self.credentials.token_response)
+ self.assertEqual(token_response_first, credentials.token_response)
# Three more utcnow calls are expected:
# - access_token_expired
# - get_access_token() -> access_token_expired
# - get_access_token -> _expires_in
expected_utcnow_calls = [mock.call()] * (2 + 3)
self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
- # No rsa.pkcs1.sign expected: the token was not expired.
- self.assertEqual(len(sign_func.mock_calls), 1 + 0)
+ # No call to sign() expected: the token was not expired.
+ self.assertEqual(len(signer.sign.mock_calls), 1 + 0)
# Get Access Token, Third Attempt (force expiration)
- self.assertEqual(self.credentials.access_token, token1)
- self.credentials.token_expiry = NOW # Manually force expiry.
- self.assertTrue(self.credentials.access_token_expired)
- token = self.credentials.get_access_token(http=http)
+ self.assertEqual(credentials.access_token, token1)
+ credentials.token_expiry = NOW # Manually force expiry.
+ self.assertTrue(credentials.access_token_expired)
+ token = credentials.get_access_token(http=http)
# Make sure refresh occurred since the token was not expired.
self.assertEqual(token2, token.access_token)
self.assertEqual(lifetime, token.expires_in)
- self.assertFalse(self.credentials.access_token_expired)
+ self.assertFalse(credentials.access_token_expired)
self.assertEqual(token_response_second,
- self.credentials.token_response)
+ credentials.token_response)
# Five more utcnow calls are expected:
# - access_token_expired
# - get_access_token -> access_token_expired
@@ -173,10 +189,10 @@ class ServiceAccountCredentialsTests(unittest.TestCase):
# - access_token_expired
expected_utcnow_calls = [mock.call()] * (2 + 3 + 5)
self.assertEqual(expected_utcnow_calls, utcnow.mock_calls)
- # One more rsa.pkcs1.sign expected: Actual refresh was needed.
- self.assertEqual(len(sign_func.mock_calls), 1 + 0 + 1)
+ # One more call to sign() expected: Actual refresh was needed.
+ self.assertEqual(len(signer.sign.mock_calls), 1 + 0 + 1)
- self.assertEqual(self.credentials.access_token, token2)
+ self.assertEqual(credentials.access_token, token2)
if __name__ == '__main__': # pragma: NO COVER