diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-02-04 11:56:57 -0800 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-02-04 11:59:58 -0800 |
commit | 7219cd776a26e21e6ab4a3a49e1335d5de079a87 (patch) | |
tree | 54928856aed5bdecfcca91e8a93b169171b5b3f6 /tests | |
parent | 4f4b7de8d1697a8dd601dd4d2920e88659dd8abc (diff) | |
download | oauth2client-7219cd776a26e21e6ab4a3a49e1335d5de079a87.tar.gz |
Removing novel crypto in service_account.py.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_client.py | 16 | ||||
-rw-r--r-- | tests/test_service_account.py | 66 |
2 files changed, 50 insertions, 32 deletions
diff --git a/tests/test_client.py b/tests/test_client.py index 131e72e..708db4d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -628,15 +628,17 @@ class GoogleCredentialsTests(unittest2.TestCase): self.assertEqual(creds.__dict__, creds2.__dict__) def test_to_from_json_service_account(self): - self.maxDiff=None credentials_file = datafile( os.path.join('gcloud', _WELL_KNOWN_CREDENTIALS_FILE)) - creds = GoogleCredentials.from_stream(credentials_file) - - json = creds.to_json() - creds2 = GoogleCredentials.from_json(json) - - self.assertEqual(creds.__dict__, creds2.__dict__) + creds1 = GoogleCredentials.from_stream(credentials_file) + # Convert to and then back from json. + creds2 = GoogleCredentials.from_json(creds1.to_json()) + + creds1_vals = creds1.__dict__ + creds1_vals.pop('_signer') + creds2_vals = creds2.__dict__ + creds2_vals.pop('_signer') + self.assertEqual(creds1_vals, creds2_vals) def test_parse_expiry(self): dt = datetime.datetime(2016, 1, 1) diff --git a/tests/test_service_account.py b/tests/test_service_account.py index 09d6234..df48f31 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -91,13 +91,29 @@ class ServiceAccountCredentialsTests(unittest.TestCase): self.assertEqual('dummy_scope', new_credentials._scopes) @mock.patch('oauth2client.client._UTCNOW') - @mock.patch('rsa.pkcs1.sign', return_value=b'signed-value') - def test_access_token(self, sign_func, utcnow): + def test_access_token(self, utcnow): # Configure the patch. seconds = 11 NOW = datetime.datetime(1992, 12, 31, second=seconds) utcnow.return_value = NOW + # Create a custom credentials with a mock signer. + signer = mock.MagicMock() + signed_value = b'signed-content' + signer.sign = mock.MagicMock(name='sign', + return_value=signed_value) + signer_patch = mock.patch('oauth2client.crypt.Signer.from_string', + return_value=signer) + with signer_patch as signer_factory: + credentials = _ServiceAccountCredentials( + self.service_account_id, + self.service_account_email, + self.private_key_id, + self.private_key, + '', + ) + + # Begin testing. lifetime = 2 # number of seconds in which the token expires EXPIRY_TIME = datetime.datetime(1992, 12, 31, second=seconds + lifetime) @@ -120,51 +136,51 @@ class ServiceAccountCredentialsTests(unittest.TestCase): ]) # Get Access Token, First attempt. - self.assertEqual(self.credentials.access_token, None) - self.assertFalse(self.credentials.access_token_expired) - self.assertEqual(self.credentials.token_expiry, None) - token = self.credentials.get_access_token(http=http) - self.assertEqual(self.credentials.token_expiry, EXPIRY_TIME) + self.assertEqual(credentials.access_token, None) + self.assertFalse(credentials.access_token_expired) + self.assertEqual(credentials.token_expiry, None) + token = credentials.get_access_token(http=http) + self.assertEqual(credentials.token_expiry, EXPIRY_TIME) self.assertEqual(token1, token.access_token) self.assertEqual(lifetime, token.expires_in) self.assertEqual(token_response_first, - self.credentials.token_response) + credentials.token_response) # Two utcnow calls are expected: # - get_access_token() -> _do_refresh_request (setting expires in) # - get_access_token() -> _expires_in() expected_utcnow_calls = [mock.call()] * 2 self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) - # One rsa.pkcs1.sign expected: Actual refresh was needed. - self.assertEqual(len(sign_func.mock_calls), 1) + # One call to sign() expected: Actual refresh was needed. + self.assertEqual(len(signer.sign.mock_calls), 1) # Get Access Token, Second Attempt (not expired) - self.assertEqual(self.credentials.access_token, token1) - self.assertFalse(self.credentials.access_token_expired) - token = self.credentials.get_access_token(http=http) + self.assertEqual(credentials.access_token, token1) + self.assertFalse(credentials.access_token_expired) + token = credentials.get_access_token(http=http) # Make sure no refresh occurred since the token was not expired. self.assertEqual(token1, token.access_token) self.assertEqual(lifetime, token.expires_in) - self.assertEqual(token_response_first, self.credentials.token_response) + self.assertEqual(token_response_first, credentials.token_response) # Three more utcnow calls are expected: # - access_token_expired # - get_access_token() -> access_token_expired # - get_access_token -> _expires_in expected_utcnow_calls = [mock.call()] * (2 + 3) self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) - # No rsa.pkcs1.sign expected: the token was not expired. - self.assertEqual(len(sign_func.mock_calls), 1 + 0) + # No call to sign() expected: the token was not expired. + self.assertEqual(len(signer.sign.mock_calls), 1 + 0) # Get Access Token, Third Attempt (force expiration) - self.assertEqual(self.credentials.access_token, token1) - self.credentials.token_expiry = NOW # Manually force expiry. - self.assertTrue(self.credentials.access_token_expired) - token = self.credentials.get_access_token(http=http) + self.assertEqual(credentials.access_token, token1) + credentials.token_expiry = NOW # Manually force expiry. + self.assertTrue(credentials.access_token_expired) + token = credentials.get_access_token(http=http) # Make sure refresh occurred since the token was not expired. self.assertEqual(token2, token.access_token) self.assertEqual(lifetime, token.expires_in) - self.assertFalse(self.credentials.access_token_expired) + self.assertFalse(credentials.access_token_expired) self.assertEqual(token_response_second, - self.credentials.token_response) + credentials.token_response) # Five more utcnow calls are expected: # - access_token_expired # - get_access_token -> access_token_expired @@ -173,10 +189,10 @@ class ServiceAccountCredentialsTests(unittest.TestCase): # - access_token_expired expected_utcnow_calls = [mock.call()] * (2 + 3 + 5) self.assertEqual(expected_utcnow_calls, utcnow.mock_calls) - # One more rsa.pkcs1.sign expected: Actual refresh was needed. - self.assertEqual(len(sign_func.mock_calls), 1 + 0 + 1) + # One more call to sign() expected: Actual refresh was needed. + self.assertEqual(len(signer.sign.mock_calls), 1 + 0 + 1) - self.assertEqual(self.credentials.access_token, token2) + self.assertEqual(credentials.access_token, token2) if __name__ == '__main__': # pragma: NO COVER |