aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDanny Hermes <daniel.j.hermes@gmail.com>2016-02-05 04:52:32 -0800
committerNathaniel Manista <nathaniel@google.com>2016-02-05 15:27:06 +0000
commitdcd20c9375308979e45ae280ec102a28d2ca60d1 (patch)
tree6854820216602468a5cc8baf1ff1afdb057c768a /tests
parentd3391bc91deec3f5d8addc5ca21f3174e2818d79 (diff)
downloadoauth2client-dcd20c9375308979e45ae280ec102a28d2ca60d1.tar.gz
Removing SignedJwtAssertionCredentials.
This completes the consolidation of the two service account credentials implementations. In the process, also adding test coverage for some untested code paths within the crypto helpers.
Diffstat (limited to 'tests')
-rw-r--r--tests/test__pure_python_crypt.py5
-rw-r--r--tests/test__pycrypto_crypt.py14
-rw-r--r--tests/test_crypt.py41
-rw-r--r--tests/test_jwt.py109
-rw-r--r--tests/test_service_account.py17
5 files changed, 100 insertions, 86 deletions
diff --git a/tests/test__pure_python_crypt.py b/tests/test__pure_python_crypt.py
index da18fbf..c20a25c 100644
--- a/tests/test__pure_python_crypt.py
+++ b/tests/test__pure_python_crypt.py
@@ -174,6 +174,11 @@ class TestRsaSigner(unittest2.TestCase):
with self.assertRaises(ValueError):
RsaSigner.from_string(key_bytes)
+ def test_from_string_bogus_key(self):
+ key_bytes = 'bogus-key'
+ with self.assertRaises(ValueError):
+ RsaSigner.from_string(key_bytes)
+
if __name__ == '__main__': # pragma: NO COVER
unittest2.main()
diff --git a/tests/test__pycrypto_crypt.py b/tests/test__pycrypto_crypt.py
index 1323ee2..d871e7c 100644
--- a/tests/test__pycrypto_crypt.py
+++ b/tests/test__pycrypto_crypt.py
@@ -14,13 +14,13 @@
"""Unit tests for oauth2client._pycrypto_crypt."""
import os
-import unittest
+import unittest2
from oauth2client.crypt import PyCryptoSigner
from oauth2client.crypt import PyCryptoVerifier
-class TestPyCryptoVerifier(unittest.TestCase):
+class TestPyCryptoVerifier(unittest2.TestCase):
PUBLIC_CERT_FILENAME = os.path.join(os.path.dirname(__file__),
'data', 'public_cert.pem')
@@ -63,5 +63,13 @@ class TestPyCryptoVerifier(unittest.TestCase):
self.assertTrue(isinstance(verifier, PyCryptoVerifier))
+class TestPyCryptoSigner(unittest2.TestCase):
+
+ def test_from_string_bad_key(self):
+ key_bytes = 'definitely-not-pem-format'
+ with self.assertRaises(NotImplementedError):
+ PyCryptoSigner.from_string(key_bytes)
+
+
if __name__ == '__main__': # pragma: NO COVER
- unittest.main()
+ unittest2.main()
diff --git a/tests/test_crypt.py b/tests/test_crypt.py
index aec703d..5b54532 100644
--- a/tests/test_crypt.py
+++ b/tests/test_crypt.py
@@ -20,15 +20,17 @@ import mock
from oauth2client import _helpers
from oauth2client.client import HAS_OPENSSL
-from oauth2client.client import SignedJwtAssertionCredentials
from oauth2client import crypt
+from oauth2client.service_account import ServiceAccountCredentials
+
+
+def data_filename(filename):
+ return os.path.join(os.path.dirname(__file__), 'data', filename)
def datafile(filename):
- f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb')
- data = f.read()
- f.close()
- return data
+ with open(data_filename(filename), 'rb') as file_obj:
+ return file_obj.read()
class Test__bad_pkcs12_key_as_pem(unittest.TestCase):
@@ -39,23 +41,23 @@ class Test__bad_pkcs12_key_as_pem(unittest.TestCase):
class Test_pkcs12_key_as_pem(unittest.TestCase):
- def _make_signed_jwt_creds(self, private_key_file='privatekey.p12',
- private_key=None):
- private_key = private_key or datafile(private_key_file)
- return SignedJwtAssertionCredentials(
+ def _make_svc_account_creds(self, private_key_file='privatekey.p12'):
+ filename = data_filename(private_key_file)
+ credentials = ServiceAccountCredentials.from_p12_keyfile(
'some_account@example.com',
- private_key,
- scope='read+write',
- sub='joe@example.org')
+ filename,
+ scopes='read+write')
+ credentials._kwargs['sub'] ='joe@example.org'
+ return credentials
def _succeeds_helper(self, password=None):
self.assertEqual(True, HAS_OPENSSL)
- credentials = self._make_signed_jwt_creds()
+ credentials = self._make_svc_account_creds()
if password is None:
- password = credentials.private_key_password
- pem_contents = crypt.pkcs12_key_as_pem(credentials.private_key,
- password)
+ password = credentials._private_key_password
+ pem_contents = crypt.pkcs12_key_as_pem(
+ credentials._private_key_pkcs12, password)
pkcs12_key_as_pem = datafile('pem_from_pkcs12.pem')
pkcs12_key_as_pem = _helpers._parse_pem_key(pkcs12_key_as_pem)
alternate_pem = datafile('pem_from_pkcs12_alternate.pem')
@@ -68,13 +70,6 @@ class Test_pkcs12_key_as_pem(unittest.TestCase):
password = u'notasecret'
self._succeeds_helper(password)
- def test_with_nonsense_key(self):
- from OpenSSL import crypto
- credentials = self._make_signed_jwt_creds(private_key=b'NOT_A_KEY')
- self.assertRaises(crypto.Error, crypt.pkcs12_key_as_pem,
- credentials.private_key,
- credentials.private_key_password)
-
class Test__verify_signature(unittest.TestCase):
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index f4a3bdb..38d28c7 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -21,39 +21,48 @@ import unittest2
from .http_mock import HttpMockSequence
from oauth2client.client import Credentials
-from oauth2client.client import SignedJwtAssertionCredentials
from oauth2client.client import VerifyJwtTokenError
from oauth2client.client import verify_id_token
from oauth2client.client import HAS_OPENSSL
from oauth2client.client import HAS_CRYPTO
from oauth2client import crypt
from oauth2client.file import Storage
+from oauth2client.service_account import _PASSWORD_DEFAULT
+from oauth2client.service_account import ServiceAccountCredentials
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
+_FORMATS_TO_CONSTRUCTOR_ARGS = {
+ 'p12': 'private_key_pkcs12',
+ 'pem': 'private_key_pkcs8_pem',
+}
+
+
+def data_filename(filename):
+ return os.path.join(os.path.dirname(__file__), 'data', filename)
+
+
def datafile(filename):
- f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb')
- data = f.read()
- f.close()
- return data
+ with open(data_filename(filename), 'rb') as file_obj:
+ return file_obj.read()
class CryptTests(unittest2.TestCase):
def setUp(self):
- self.format = 'p12'
+ self.format_ = 'p12'
self.signer = crypt.OpenSSLSigner
self.verifier = crypt.OpenSSLVerifier
def test_sign_and_verify(self):
- self._check_sign_and_verify('privatekey.%s' % self.format)
+ self._check_sign_and_verify('privatekey.' + self.format_)
def test_sign_and_verify_from_converted_pkcs12(self):
# Tests that following instructions to convert from PKCS12 to
# PEM works.
- if self.format == 'pem':
+ if self.format_ == 'pem':
self._check_sign_and_verify('pem_from_pkcs12.pem')
def _check_sign_and_verify(self, private_key_file):
@@ -85,7 +94,7 @@ class CryptTests(unittest2.TestCase):
self.assertTrue(expected_error in str(exc_manager.exception))
def _create_signed_jwt(self):
- private_key = datafile('privatekey.%s' % self.format)
+ private_key = datafile('privatekey.' + self.format_)
signer = self.signer.from_string(private_key)
audience = 'some_audience_address@testing.gserviceaccount.com'
now = int(time.time())
@@ -132,7 +141,7 @@ class CryptTests(unittest2.TestCase):
http=http)
def test_verify_id_token_bad_tokens(self):
- private_key = datafile('privatekey.%s' % self.format)
+ private_key = datafile('privatekey.' + self.format_)
# Wrong number of segments
self._check_jwt_failure('foo', 'Wrong number of segments')
@@ -198,7 +207,7 @@ class CryptTests(unittest2.TestCase):
class PEMCryptTestsPyCrypto(CryptTests):
def setUp(self):
- self.format = 'pem'
+ self.format_ = 'pem'
self.signer = crypt.PyCryptoSigner
self.verifier = crypt.PyCryptoVerifier
@@ -206,7 +215,7 @@ class PEMCryptTestsPyCrypto(CryptTests):
class PEMCryptTestsOpenSSL(CryptTests):
def setUp(self):
- self.format = 'pem'
+ self.format_ = 'pem'
self.signer = crypt.OpenSSLSigner
self.verifier = crypt.OpenSSLVerifier
@@ -214,16 +223,27 @@ class PEMCryptTestsOpenSSL(CryptTests):
class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
def setUp(self):
- self.format = 'p12'
+ self.format_ = 'p12'
crypt.Signer = crypt.OpenSSLSigner
- def test_credentials_good(self):
- private_key = datafile('privatekey.%s' % self.format)
- credentials = SignedJwtAssertionCredentials(
- 'some_account@example.com',
- private_key,
- scope='read+write',
+ def _make_credentials(self):
+ private_key = datafile('privatekey.' + self.format_)
+ signer = crypt.Signer.from_string(private_key)
+ credentials = ServiceAccountCredentials(
+ 'some_account@example.com', signer,
+ scopes='read+write',
sub='joe@example.org')
+ if self.format_ == 'pem':
+ credentials._private_key_pkcs8_pem = private_key
+ elif self.format_ == 'p12':
+ credentials._private_key_pkcs12 = private_key
+ credentials._private_key_password = _PASSWORD_DEFAULT
+ else: # pragma: NO COVER
+ raise ValueError('Unexpected format.')
+ return credentials
+
+ def test_credentials_good(self):
+ credentials = self._make_credentials()
http = HttpMockSequence([
({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'),
({'status': '200'}, 'echo_request_headers'),
@@ -233,18 +253,14 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
self.assertEqual(b'Bearer 1/3w', content[b'Authorization'])
def test_credentials_to_from_json(self):
- private_key = datafile('privatekey.%s' % self.format)
- credentials = SignedJwtAssertionCredentials(
- 'some_account@example.com',
- private_key,
- scope='read+write',
- sub='joe@example.org')
+ credentials = self._make_credentials()
json = credentials.to_json()
restored = Credentials.new_from_json(json)
- self.assertEqual(credentials.private_key, restored.private_key)
- self.assertEqual(credentials.private_key_password,
- restored.private_key_password)
- self.assertEqual(credentials.kwargs, restored.kwargs)
+ self.assertEqual(credentials._private_key_pkcs12,
+ restored._private_key_pkcs12)
+ self.assertEqual(credentials._private_key_password,
+ restored._private_key_password)
+ self.assertEqual(credentials._kwargs, restored._kwargs)
def _credentials_refresh(self, credentials):
http = HttpMockSequence([
@@ -258,24 +274,12 @@ class SignedJwtAssertionCredentialsTests(unittest2.TestCase):
return content
def test_credentials_refresh_without_storage(self):
- private_key = datafile('privatekey.%s' % self.format)
- credentials = SignedJwtAssertionCredentials(
- 'some_account@example.com',
- private_key,
- scope='read+write',
- sub='joe@example.org')
-
+ credentials = self._make_credentials()
content = self._credentials_refresh(credentials)
-
self.assertEqual(b'Bearer 3/3w', content[b'Authorization'])
def test_credentials_refresh_with_storage(self):
- private_key = datafile('privatekey.%s' % self.format)
- credentials = SignedJwtAssertionCredentials(
- 'some_account@example.com',
- private_key,
- scope='read+write',
- sub='joe@example.org')
+ credentials = self._make_credentials()
filehandle, filename = tempfile.mkstemp()
os.close(filehandle)
@@ -293,7 +297,7 @@ class PEMSignedJwtAssertionCredentialsOpenSSLTests(
SignedJwtAssertionCredentialsTests):
def setUp(self):
- self.format = 'pem'
+ self.format_ = 'pem'
crypt.Signer = crypt.OpenSSLSigner
@@ -301,25 +305,10 @@ class PEMSignedJwtAssertionCredentialsPyCryptoTests(
SignedJwtAssertionCredentialsTests):
def setUp(self):
- self.format = 'pem'
+ self.format_ = 'pem'
crypt.Signer = crypt.PyCryptoSigner
-class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest2.TestCase):
-
- def test_for_failure(self):
- crypt.Signer = crypt.PyCryptoSigner
- private_key = datafile('privatekey.p12')
- credentials = SignedJwtAssertionCredentials(
- 'some_account@example.com',
- private_key,
- scope='read+write',
- sub='joe@example.org')
-
- self.assertRaises(NotImplementedError,
- credentials._generate_assertion)
-
-
class TestHasOpenSSLFlag(unittest2.TestCase):
def test_true(self):
diff --git a/tests/test_service_account.py b/tests/test_service_account.py
index e7c9e0a..3c91d19 100644
--- a/tests/test_service_account.py
+++ b/tests/test_service_account.py
@@ -57,6 +57,23 @@ class ServiceAccountCredentialsTests(unittest2.TestCase):
client_id=self.client_id,
)
+ def test__to_json_override(self):
+ signer = object()
+ creds = ServiceAccountCredentials('name@email.com',
+ signer)
+ self.assertEqual(creds._signer, signer)
+ # Serialize over-ridden data (unrelated to ``creds``).
+ to_serialize = {'unrelated': 'data'}
+ serialized_str = creds._to_json([], to_serialize.copy())
+ serialized_data = json.loads(serialized_str)
+ expected_serialized = {
+ '_class': 'ServiceAccountCredentials',
+ '_module': 'oauth2client.service_account',
+ 'token_expiry': None,
+ }
+ expected_serialized.update(to_serialize)
+ self.assertEqual(serialized_data, expected_serialized)
+
def test_sign_blob(self):
private_key_id, signature = self.credentials.sign_blob('Google')
self.assertEqual(self.private_key_id, private_key_id)