aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorDanny Hermes <daniel.j.hermes@gmail.com>2016-02-22 19:12:15 -0800
committerDanny Hermes <daniel.j.hermes@gmail.com>2016-02-22 19:12:15 -0800
commit498d0b6871bdb8eb2c9471f051c8d5efab4324de (patch)
treeb40be0ec6e487387782ebcb7bd792780ec1c5840 /tests
parent9f89019808eddbc2d371205617a7b61f258798c5 (diff)
parentce0d71a497c1db7c4d5de36565df57eaec017eae (diff)
downloadoauth2client-498d0b6871bdb8eb2c9471f051c8d5efab4324de.tar.gz
Merge pull request #421 from dhermes/sign-blob-all-svc-accounts
Adding common sign_blob() service account types.
Diffstat (limited to 'tests')
-rw-r--r--tests/contrib/test_appengine.py60
-rw-r--r--tests/contrib/test_gce.py90
-rw-r--r--tests/test_client.py5
3 files changed, 151 insertions, 4 deletions
diff --git a/tests/contrib/test_appengine.py b/tests/contrib/test_appengine.py
index 4e82429..438548b 100644
--- a/tests/contrib/test_appengine.py
+++ b/tests/contrib/test_appengine.py
@@ -116,14 +116,29 @@ class TestAppAssertionCredentials(unittest.TestCase):
class AppIdentityStubImpl(apiproxy_stub.APIProxyStub):
- def __init__(self):
+ def __init__(self, key_name=None, sig_bytes=None,
+ svc_acct=None):
super(TestAppAssertionCredentials.AppIdentityStubImpl,
self).__init__('app_identity_service')
+ self._key_name = key_name
+ self._sig_bytes = sig_bytes
+ self._sign_calls = []
+ self._svc_acct = svc_acct
+ self._get_acct_name_calls = 0
def _Dynamic_GetAccessToken(self, request, response):
response.set_access_token('a_token_123')
response.set_expiration_time(time.time() + 1800)
+ def _Dynamic_SignForApp(self, request, response):
+ response.set_key_name(self._key_name)
+ response.set_signature_bytes(self._sig_bytes)
+ self._sign_calls.append(request.bytes_to_sign())
+
+ def _Dynamic_GetServiceAccountName(self, request, response):
+ response.set_service_account_name(self._svc_acct)
+ self._get_acct_name_calls += 1
+
class ErroringAppIdentityStubImpl(apiproxy_stub.APIProxyStub):
def __init__(self):
@@ -210,6 +225,49 @@ class TestAppAssertionCredentials(unittest.TestCase):
self.assertTrue(isinstance(new_credentials, AppAssertionCredentials))
self.assertEqual('dummy_scope', new_credentials.scope)
+ def test_sign_blob(self):
+ key_name = b'1234567890'
+ sig_bytes = b'himom'
+ app_identity_stub = self.AppIdentityStubImpl(
+ key_name=key_name, sig_bytes=sig_bytes)
+ apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
+ apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service',
+ app_identity_stub)
+ credentials = AppAssertionCredentials([])
+ to_sign = b'blob'
+ self.assertEqual(app_identity_stub._sign_calls, [])
+ result = credentials.sign_blob(to_sign)
+ self.assertEqual(result, (key_name, sig_bytes))
+ self.assertEqual(app_identity_stub._sign_calls, [to_sign])
+
+ def test_service_account_email(self):
+ acct_name = 'new-value@appspot.gserviceaccount.com'
+ app_identity_stub = self.AppIdentityStubImpl(svc_acct=acct_name)
+ apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
+ apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service',
+ app_identity_stub)
+
+ credentials = AppAssertionCredentials([])
+ self.assertIsNone(credentials._service_account_email)
+ self.assertEqual(app_identity_stub._get_acct_name_calls, 0)
+ self.assertEqual(credentials.service_account_email, acct_name)
+ self.assertIsNotNone(credentials._service_account_email)
+ self.assertEqual(app_identity_stub._get_acct_name_calls, 1)
+
+ def test_service_account_email_already_set(self):
+ acct_name = 'existing@appspot.gserviceaccount.com'
+ credentials = AppAssertionCredentials([])
+ credentials._service_account_email = acct_name
+
+ app_identity_stub = self.AppIdentityStubImpl(svc_acct=acct_name)
+ apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
+ apiproxy_stub_map.apiproxy.RegisterStub('app_identity_service',
+ app_identity_stub)
+
+ self.assertEqual(app_identity_stub._get_acct_name_calls, 0)
+ self.assertEqual(credentials.service_account_email, acct_name)
+ self.assertEqual(app_identity_stub._get_acct_name_calls, 0)
+
def test_get_access_token(self):
app_identity_stub = self.AppIdentityStubImpl()
apiproxy_stub_map.apiproxy = apiproxy_stub_map.APIProxyStubMap()
diff --git a/tests/contrib/test_gce.py b/tests/contrib/test_gce.py
index 3c8f33c..48da976 100644
--- a/tests/contrib/test_gce.py
+++ b/tests/contrib/test_gce.py
@@ -17,14 +17,17 @@
import json
from six.moves import http_client
from six.moves import urllib
-import unittest
+import unittest2
import mock
+import httplib2
from oauth2client._helpers import _to_bytes
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import Credentials
from oauth2client.client import save_to_well_known_file
+from oauth2client.contrib.gce import _DEFAULT_EMAIL_METADATA
+from oauth2client.contrib.gce import _get_service_account_email
from oauth2client.contrib.gce import _SCOPES_WARNING
from oauth2client.contrib.gce import AppAssertionCredentials
@@ -32,7 +35,7 @@ from oauth2client.contrib.gce import AppAssertionCredentials
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
-class AppAssertionCredentialsTests(unittest.TestCase):
+class AppAssertionCredentialsTests(unittest2.TestCase):
def test_constructor(self):
credentials = AppAssertionCredentials(foo='bar')
@@ -150,6 +153,49 @@ class AppAssertionCredentialsTests(unittest.TestCase):
self.assertEqual('dummy_scope', new_credentials.scope)
warn_mock.assert_called_once_with(_SCOPES_WARNING)
+ def test_sign_blob_not_implemented(self):
+ credentials = AppAssertionCredentials([])
+ with self.assertRaises(NotImplementedError):
+ credentials.sign_blob(b'blob')
+
+ @mock.patch('oauth2client.contrib.gce._get_service_account_email',
+ return_value=(None, 'retrieved@email.com'))
+ def test_service_account_email(self, get_email):
+ credentials = AppAssertionCredentials([])
+ self.assertIsNone(credentials._service_account_email)
+ self.assertEqual(credentials.service_account_email,
+ get_email.return_value[1])
+ self.assertIsNotNone(credentials._service_account_email)
+ get_email.assert_called_once_with()
+
+ @mock.patch('oauth2client.contrib.gce._get_service_account_email')
+ def test_service_account_email_already_set(self, get_email):
+ credentials = AppAssertionCredentials([])
+ acct_name = 'existing@email.com'
+ credentials._service_account_email = acct_name
+ self.assertEqual(credentials.service_account_email, acct_name)
+ get_email.assert_not_called()
+
+ @mock.patch('oauth2client.contrib.gce._get_service_account_email')
+ def test_service_account_email_failure(self, get_email):
+ # Set-up the mock.
+ bad_response = httplib2.Response({'status': http_client.NOT_FOUND})
+ content = b'bad-bytes-nothing-here'
+ get_email.return_value = (bad_response, content)
+ # Test the failure.
+ credentials = AppAssertionCredentials([])
+ self.assertIsNone(credentials._service_account_email)
+ with self.assertRaises(AttributeError) as exc_manager:
+ getattr(credentials, 'service_account_email')
+
+ error_msg = ('Failed to retrieve the email from the '
+ 'Google Compute Engine metadata service')
+ self.assertEqual(
+ exc_manager.exception.args,
+ (error_msg, bad_response, content))
+ self.assertIsNone(credentials._service_account_email)
+ get_email.assert_called_once_with()
+
def test_get_access_token(self):
http = mock.MagicMock()
http.request = mock.MagicMock(
@@ -178,5 +224,43 @@ class AppAssertionCredentialsTests(unittest.TestCase):
os.path.isdir = ORIGINAL_ISDIR
+class Test__get_service_account_email(unittest2.TestCase):
+
+ def test_success(self):
+ http_request = mock.MagicMock()
+ acct_name = b'1234567890@developer.gserviceaccount.com'
+ http_request.return_value = (
+ httplib2.Response({'status': http_client.OK}), acct_name)
+ result = _get_service_account_email(http_request)
+ self.assertEqual(result, (None, acct_name.decode('utf-8')))
+ http_request.assert_called_once_with(
+ _DEFAULT_EMAIL_METADATA,
+ headers={'Metadata-Flavor': 'Google'})
+
+ @mock.patch.object(httplib2.Http, 'request')
+ def test_success_default_http(self, http_request):
+ # Don't make _from_bytes() work too hard.
+ acct_name = u'1234567890@developer.gserviceaccount.com'
+ http_request.return_value = (
+ httplib2.Response({'status': http_client.OK}), acct_name)
+ result = _get_service_account_email()
+ self.assertEqual(result, (None, acct_name))
+ http_request.assert_called_once_with(
+ _DEFAULT_EMAIL_METADATA,
+ headers={'Metadata-Flavor': 'Google'})
+
+ def test_failure(self):
+ http_request = mock.MagicMock()
+ response = httplib2.Response({'status': http_client.NOT_FOUND})
+ content = b'Not found'
+ http_request.return_value = (response, content)
+ result = _get_service_account_email(http_request)
+
+ self.assertEqual(result, (response, content))
+ http_request.assert_called_once_with(
+ _DEFAULT_EMAIL_METADATA,
+ headers={'Metadata-Flavor': 'Google'})
+
+
if __name__ == '__main__': # pragma: NO COVER
- unittest.main()
+ unittest2.main()
diff --git a/tests/test_client.py b/tests/test_client.py
index 018727c..03b30fb 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -1064,6 +1064,11 @@ class TestAssertionCredentials(unittest2.TestCase):
self, '400', revoke_raise=True,
valid_bool_value=False, token_attr='access_token')
+ def test_sign_blob_abstract(self):
+ credentials = AssertionCredentials(None)
+ with self.assertRaises(NotImplementedError):
+ credentials.sign_blob(b'blob')
+
class UpdateQueryParamsTest(unittest2.TestCase):
def test_update_query_params_no_params(self):