diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2016-02-05 04:52:32 -0800 |
---|---|---|
committer | Nathaniel Manista <nathaniel@google.com> | 2016-02-05 15:27:06 +0000 |
commit | dcd20c9375308979e45ae280ec102a28d2ca60d1 (patch) | |
tree | 6854820216602468a5cc8baf1ff1afdb057c768a /tests | |
parent | d3391bc91deec3f5d8addc5ca21f3174e2818d79 (diff) | |
download | oauth2client-dcd20c9375308979e45ae280ec102a28d2ca60d1.tar.gz |
Removing SignedJwtAssertionCredentials.
This completes the consolidation of the two service
account credentials implementations.
In the process, also adding test coverage for some untested
code paths within the crypto helpers.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test__pure_python_crypt.py | 5 | ||||
-rw-r--r-- | tests/test__pycrypto_crypt.py | 14 | ||||
-rw-r--r-- | tests/test_crypt.py | 41 | ||||
-rw-r--r-- | tests/test_jwt.py | 109 | ||||
-rw-r--r-- | tests/test_service_account.py | 17 |
5 files changed, 100 insertions, 86 deletions
diff --git a/tests/test__pure_python_crypt.py b/tests/test__pure_python_crypt.py index da18fbf..c20a25c 100644 --- a/tests/test__pure_python_crypt.py +++ b/tests/test__pure_python_crypt.py @@ -174,6 +174,11 @@ class TestRsaSigner(unittest2.TestCase): with self.assertRaises(ValueError): RsaSigner.from_string(key_bytes) + def test_from_string_bogus_key(self): + key_bytes = 'bogus-key' + with self.assertRaises(ValueError): + RsaSigner.from_string(key_bytes) + if __name__ == '__main__': # pragma: NO COVER unittest2.main() diff --git a/tests/test__pycrypto_crypt.py b/tests/test__pycrypto_crypt.py index 1323ee2..d871e7c 100644 --- a/tests/test__pycrypto_crypt.py +++ b/tests/test__pycrypto_crypt.py @@ -14,13 +14,13 @@ """Unit tests for oauth2client._pycrypto_crypt.""" import os -import unittest +import unittest2 from oauth2client.crypt import PyCryptoSigner from oauth2client.crypt import PyCryptoVerifier -class TestPyCryptoVerifier(unittest.TestCase): +class TestPyCryptoVerifier(unittest2.TestCase): PUBLIC_CERT_FILENAME = os.path.join(os.path.dirname(__file__), 'data', 'public_cert.pem') @@ -63,5 +63,13 @@ class TestPyCryptoVerifier(unittest.TestCase): self.assertTrue(isinstance(verifier, PyCryptoVerifier)) +class TestPyCryptoSigner(unittest2.TestCase): + + def test_from_string_bad_key(self): + key_bytes = 'definitely-not-pem-format' + with self.assertRaises(NotImplementedError): + PyCryptoSigner.from_string(key_bytes) + + if __name__ == '__main__': # pragma: NO COVER - unittest.main() + unittest2.main() diff --git a/tests/test_crypt.py b/tests/test_crypt.py index aec703d..5b54532 100644 --- a/tests/test_crypt.py +++ b/tests/test_crypt.py @@ -20,15 +20,17 @@ import mock from oauth2client import _helpers from oauth2client.client import HAS_OPENSSL -from oauth2client.client import SignedJwtAssertionCredentials from oauth2client import crypt +from oauth2client.service_account import ServiceAccountCredentials + + +def data_filename(filename): + return os.path.join(os.path.dirname(__file__), 'data', filename) def datafile(filename): - f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb') - data = f.read() - f.close() - return data + with open(data_filename(filename), 'rb') as file_obj: + return file_obj.read() class Test__bad_pkcs12_key_as_pem(unittest.TestCase): @@ -39,23 +41,23 @@ class Test__bad_pkcs12_key_as_pem(unittest.TestCase): class Test_pkcs12_key_as_pem(unittest.TestCase): - def _make_signed_jwt_creds(self, private_key_file='privatekey.p12', - private_key=None): - private_key = private_key or datafile(private_key_file) - return SignedJwtAssertionCredentials( + def _make_svc_account_creds(self, private_key_file='privatekey.p12'): + filename = data_filename(private_key_file) + credentials = ServiceAccountCredentials.from_p12_keyfile( 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + filename, + scopes='read+write') + credentials._kwargs['sub'] ='joe@example.org' + return credentials def _succeeds_helper(self, password=None): self.assertEqual(True, HAS_OPENSSL) - credentials = self._make_signed_jwt_creds() + credentials = self._make_svc_account_creds() if password is None: - password = credentials.private_key_password - pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key, - password) + password = credentials._private_key_password + pem_contents = crypt.pkcs12_key_as_pem( + credentials._private_key_pkcs12, password) pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem') pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem) alternate_pem = datafile('pem_from_pkcs12_alternate.pem') @@ -68,13 +70,6 @@ class Test_pkcs12_key_as_pem(unittest.TestCase): password = u'notasecret' self._succeeds_helper(password) - def test_with_nonsense_key(self): - from OpenSSL import crypto - credentials = self._make_signed_jwt_creds(private_key=b'NOT_A_KEY') - self.assertRaises(crypto.Error, crypt.pkcs12_key_as_pem, - credentials.private_key, - credentials.private_key_password) - class Test__verify_signature(unittest.TestCase): diff --git a/tests/test_jwt.py b/tests/test_jwt.py index f4a3bdb..38d28c7 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -21,39 +21,48 @@ import unittest2 from .http_mock import HttpMockSequence from oauth2client.client import Credentials -from oauth2client.client import SignedJwtAssertionCredentials from oauth2client.client import VerifyJwtTokenError from oauth2client.client import verify_id_token from oauth2client.client import HAS_OPENSSL from oauth2client.client import HAS_CRYPTO from oauth2client import crypt from oauth2client.file import Storage +from oauth2client.service_account import _PASSWORD_DEFAULT +from oauth2client.service_account import ServiceAccountCredentials __author__ = 'jcgregorio@google.com (Joe Gregorio)' +_FORMATS_TO_CONSTRUCTOR_ARGS = { + 'p12': 'private_key_pkcs12', + 'pem': 'private_key_pkcs8_pem', +} + + +def data_filename(filename): + return os.path.join(os.path.dirname(__file__), 'data', filename) + + def datafile(filename): - f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb') - data = f.read() - f.close() - return data + with open(data_filename(filename), 'rb') as file_obj: + return file_obj.read() class CryptTests(unittest2.TestCase): def setUp(self): - self.format = 'p12' + self.format_ = 'p12' self.signer = crypt.OpenSSLSigner self.verifier = crypt.OpenSSLVerifier def test_sign_and_verify(self): - self._check_sign_and_verify('privatekey.%s' % self.format) + self._check_sign_and_verify('privatekey.' + self.format_) def test_sign_and_verify_from_converted_pkcs12(self): # Tests that following instructions to convert from PKCS12 to # PEM works. - if self.format == 'pem': + if self.format_ == 'pem': self._check_sign_and_verify('pem_from_pkcs12.pem') def _check_sign_and_verify(self, private_key_file): @@ -85,7 +94,7 @@ class CryptTests(unittest2.TestCase): self.assertTrue(expected_error in str(exc_manager.exception)) def _create_signed_jwt(self): - private_key = datafile('privatekey.%s' % self.format) + private_key = datafile('privatekey.' + self.format_) signer = self.signer.from_string(private_key) audience = 'some_audience_address@testing.gserviceaccount.com' now = int(time.time()) @@ -132,7 +141,7 @@ class CryptTests(unittest2.TestCase): http=http) def test_verify_id_token_bad_tokens(self): - private_key = datafile('privatekey.%s' % self.format) + private_key = datafile('privatekey.' + self.format_) # Wrong number of segments self._check_jwt_failure('foo', 'Wrong number of segments') @@ -198,7 +207,7 @@ class CryptTests(unittest2.TestCase): class PEMCryptTestsPyCrypto(CryptTests): def setUp(self): - self.format = 'pem' + self.format_ = 'pem' self.signer = crypt.PyCryptoSigner self.verifier = crypt.PyCryptoVerifier @@ -206,7 +215,7 @@ class PEMCryptTestsPyCrypto(CryptTests): class PEMCryptTestsOpenSSL(CryptTests): def setUp(self): - self.format = 'pem' + self.format_ = 'pem' self.signer = crypt.OpenSSLSigner self.verifier = crypt.OpenSSLVerifier @@ -214,16 +223,27 @@ class PEMCryptTestsOpenSSL(CryptTests): class SignedJwtAssertionCredentialsTests(unittest2.TestCase): def setUp(self): - self.format = 'p12' + self.format_ = 'p12' crypt.Signer = crypt.OpenSSLSigner - def test_credentials_good(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', + def _make_credentials(self): + private_key = datafile('privatekey.' + self.format_) + signer = crypt.Signer.from_string(private_key) + credentials = ServiceAccountCredentials( + 'some_account@example.com', signer, + scopes='read+write', sub='joe@example.org') + if self.format_ == 'pem': + credentials._private_key_pkcs8_pem = private_key + elif self.format_ == 'p12': + credentials._private_key_pkcs12 = private_key + credentials._private_key_password = _PASSWORD_DEFAULT + else: # pragma: NO COVER + raise ValueError('Unexpected format.') + return credentials + + def test_credentials_good(self): + credentials = self._make_credentials() http = HttpMockSequence([ ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), ({'status': '200'}, 'echo_request_headers'), @@ -233,18 +253,14 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase): self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) def test_credentials_to_from_json(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + credentials = self._make_credentials() json = credentials.to_json() restored = Credentials.new_from_json(json) - self.assertEqual(credentials.private_key, restored.private_key) - self.assertEqual(credentials.private_key_password, - restored.private_key_password) - self.assertEqual(credentials.kwargs, restored.kwargs) + self.assertEqual(credentials._private_key_pkcs12, + restored._private_key_pkcs12) + self.assertEqual(credentials._private_key_password, + restored._private_key_password) + self.assertEqual(credentials._kwargs, restored._kwargs) def _credentials_refresh(self, credentials): http = HttpMockSequence([ @@ -258,24 +274,12 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase): return content def test_credentials_refresh_without_storage(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') - + credentials = self._make_credentials() content = self._credentials_refresh(credentials) - self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) def test_credentials_refresh_with_storage(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') + credentials = self._make_credentials() filehandle, filename = tempfile.mkstemp() os.close(filehandle) @@ -293,7 +297,7 @@ class PEMSignedJwtAssertionCredentialsOpenSSLTests( SignedJwtAssertionCredentialsTests): def setUp(self): - self.format = 'pem' + self.format_ = 'pem' crypt.Signer = crypt.OpenSSLSigner @@ -301,25 +305,10 @@ class PEMSignedJwtAssertionCredentialsPyCryptoTests( SignedJwtAssertionCredentialsTests): def setUp(self): - self.format = 'pem' + self.format_ = 'pem' crypt.Signer = crypt.PyCryptoSigner -class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest2.TestCase): - - def test_for_failure(self): - crypt.Signer = crypt.PyCryptoSigner - private_key = datafile('privatekey.p12') - credentials = SignedJwtAssertionCredentials( - 'some_account@example.com', - private_key, - scope='read+write', - sub='joe@example.org') - - self.assertRaises(NotImplementedError, - credentials._generate_assertion) - - class TestHasOpenSSLFlag(unittest2.TestCase): def test_true(self): diff --git a/tests/test_service_account.py b/tests/test_service_account.py index e7c9e0a..3c91d19 100644 --- a/tests/test_service_account.py +++ b/tests/test_service_account.py @@ -57,6 +57,23 @@ class ServiceAccountCredentialsTests(unittest2.TestCase): client_id=self.client_id, ) + def test__to_json_override(self): + signer = object() + creds = ServiceAccountCredentials('name@email.com', + signer) + self.assertEqual(creds._signer, signer) + # Serialize over-ridden data (unrelated to ``creds``). + to_serialize = {'unrelated': 'data'} + serialized_str = creds._to_json([], to_serialize.copy()) + serialized_data = json.loads(serialized_str) + expected_serialized = { + '_class': 'ServiceAccountCredentials', + '_module': 'oauth2client.service_account', + 'token_expiry': None, + } + expected_serialized.update(to_serialize) + self.assertEqual(serialized_data, expected_serialized) + def test_sign_blob(self): private_key_id, signature = self.credentials.sign_blob('Google') self.assertEqual(self.private_key_id, private_key_id) |