diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2015-08-19 22:03:50 -0700 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2015-08-21 08:04:14 -0700 |
commit | 34c1ff543dd16edf58dcf5b336076cf27de3721a (patch) | |
tree | c4bfda94143239939a56cec584a5c707b87bcced /tests/test_jwt.py | |
parent | 043e066e54d00bd3f8c5a90d2cb83292f30f8ede (diff) | |
download | oauth2client-34c1ff543dd16edf58dcf5b336076cf27de3721a.tar.gz |
Raw pep8ify changes.
Simply ran
pep8ify -w oauth2client/
pep8ify -w tests/
Diffstat (limited to 'tests/test_jwt.py')
-rw-r--r-- | tests/test_jwt.py | 343 |
1 files changed, 171 insertions, 172 deletions
diff --git a/tests/test_jwt.py b/tests/test_jwt.py index 5dd594f..a56f24e 100644 --- a/tests/test_jwt.py +++ b/tests/test_jwt.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - """Oauth2client tests Unit tests for oauth2client. @@ -42,295 +41,295 @@ from oauth2client.file import Storage def datafile(filename): - f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb') - data = f.read() - f.close() - return data + f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'rb') + data = f.read() + f.close() + return data class CryptTests(unittest.TestCase): - def setUp(self): - self.format = 'p12' - self.signer = crypt.OpenSSLSigner - self.verifier = crypt.OpenSSLVerifier + def setUp(self): + self.format = 'p12' + self.signer = crypt.OpenSSLSigner + self.verifier = crypt.OpenSSLVerifier - def test_sign_and_verify(self): - self._check_sign_and_verify('privatekey.%s' % self.format) + def test_sign_and_verify(self): + self._check_sign_and_verify('privatekey.%s' % self.format) - def test_sign_and_verify_from_converted_pkcs12(self): - # Tests that following instructions to convert from PKCS12 to PEM works. - if self.format == 'pem': - self._check_sign_and_verify('pem_from_pkcs12.pem') + def test_sign_and_verify_from_converted_pkcs12(self): + # Tests that following instructions to convert from PKCS12 to PEM works. + if self.format == 'pem': + self._check_sign_and_verify('pem_from_pkcs12.pem') - def _check_sign_and_verify(self, private_key_file): - private_key = datafile(private_key_file) - public_key = datafile('publickey.pem') + def _check_sign_and_verify(self, private_key_file): + private_key = datafile(private_key_file) + public_key = datafile('publickey.pem') - # We pass in a non-bytes password to make sure all branches - # are traversed in tests. - signer = self.signer.from_string(private_key, + # We pass in a non-bytes password to make sure all branches + # are traversed in tests. + signer = self.signer.from_string(private_key, password=u'notasecret') - signature = signer.sign('foo') + signature = signer.sign('foo') - verifier = self.verifier.from_string(public_key, True) - self.assertTrue(verifier.verify(b'foo', signature)) + verifier = self.verifier.from_string(public_key, True) + self.assertTrue(verifier.verify(b'foo', signature)) - self.assertFalse(verifier.verify(b'bar', signature)) - self.assertFalse(verifier.verify(b'foo', b'bad signagure')) - self.assertFalse(verifier.verify(b'foo', u'bad signagure')) + self.assertFalse(verifier.verify(b'bar', signature)) + self.assertFalse(verifier.verify(b'foo', b'bad signagure')) + self.assertFalse(verifier.verify(b'foo', u'bad signagure')) - def _check_jwt_failure(self, jwt, expected_error): - public_key = datafile('publickey.pem') - certs = {'foo': public_key} - audience = ('https://www.googleapis.com/auth/id?client_id=' + def _check_jwt_failure(self, jwt, expected_error): + public_key = datafile('publickey.pem') + certs = {'foo': public_key} + audience = ('https://www.googleapis.com/auth/id?client_id=' 'external_public_key@testing.gserviceaccount.com') - try: - crypt.verify_signed_jwt_with_certs(jwt, certs, audience) - self.fail() - except crypt.AppIdentityError as e: - self.assertTrue(expected_error in str(e)) - - def _create_signed_jwt(self): - private_key = datafile('privatekey.%s' % self.format) - signer = self.signer.from_string(private_key) - audience = 'some_audience_address@testing.gserviceaccount.com' - now = int(time.time()) - - return crypt.make_signed_jwt(signer, { + try: + crypt.verify_signed_jwt_with_certs(jwt, certs, audience) + self.fail() + except crypt.AppIdentityError as e: + self.assertTrue(expected_error in str(e)) + + def _create_signed_jwt(self): + private_key = datafile('privatekey.%s' % self.format) + signer = self.signer.from_string(private_key) + audience = 'some_audience_address@testing.gserviceaccount.com' + now = int(time.time()) + + return crypt.make_signed_jwt(signer, { 'aud': audience, 'iat': now, 'exp': now + 300, 'user': 'billy bob', 'metadata': {'meta': 'data'}, - }) + }) - def test_verify_id_token(self): - jwt = self._create_signed_jwt() - public_key = datafile('publickey.pem') - certs = {'foo': public_key} - audience = 'some_audience_address@testing.gserviceaccount.com' - contents = crypt.verify_signed_jwt_with_certs(jwt, certs, audience) - self.assertEqual('billy bob', contents['user']) - self.assertEqual('data', contents['metadata']['meta']) + def test_verify_id_token(self): + jwt = self._create_signed_jwt() + public_key = datafile('publickey.pem') + certs = {'foo': public_key} + audience = 'some_audience_address@testing.gserviceaccount.com' + contents = crypt.verify_signed_jwt_with_certs(jwt, certs, audience) + self.assertEqual('billy bob', contents['user']) + self.assertEqual('data', contents['metadata']['meta']) - def test_verify_id_token_with_certs_uri(self): - jwt = self._create_signed_jwt() + def test_verify_id_token_with_certs_uri(self): + jwt = self._create_signed_jwt() - http = HttpMockSequence([ + http = HttpMockSequence([ ({'status': '200'}, datafile('certs.json')), - ]) + ]) - contents = verify_id_token( + contents = verify_id_token( jwt, 'some_audience_address@testing.gserviceaccount.com', http=http) - self.assertEqual('billy bob', contents['user']) - self.assertEqual('data', contents['metadata']['meta']) + self.assertEqual('billy bob', contents['user']) + self.assertEqual('data', contents['metadata']['meta']) - def test_verify_id_token_with_certs_uri_fails(self): - jwt = self._create_signed_jwt() + def test_verify_id_token_with_certs_uri_fails(self): + jwt = self._create_signed_jwt() - http = HttpMockSequence([ + http = HttpMockSequence([ ({'status': '404'}, datafile('certs.json')), - ]) + ]) - self.assertRaises(VerifyJwtTokenError, verify_id_token, jwt, + self.assertRaises(VerifyJwtTokenError, verify_id_token, jwt, 'some_audience_address@testing.gserviceaccount.com', http=http) - def test_verify_id_token_bad_tokens(self): - private_key = datafile('privatekey.%s' % self.format) + def test_verify_id_token_bad_tokens(self): + private_key = datafile('privatekey.%s' % self.format) - # Wrong number of segments - self._check_jwt_failure('foo', 'Wrong number of segments') + # Wrong number of segments + self._check_jwt_failure('foo', 'Wrong number of segments') - # Not json - self._check_jwt_failure('foo.bar.baz', 'Can\'t parse token') + # Not json + self._check_jwt_failure('foo.bar.baz', 'Can\'t parse token') - # Bad signature - jwt = b'.'.join([b'foo', crypt._urlsafe_b64encode('{"a":"b"}'), b'baz']) - self._check_jwt_failure(jwt, 'Invalid token signature') + # Bad signature + jwt = b'.'.join([b'foo', crypt._urlsafe_b64encode('{"a":"b"}'), b'baz']) + self._check_jwt_failure(jwt, 'Invalid token signature') - # No expiration - signer = self.signer.from_string(private_key) - audience = ('https:#www.googleapis.com/auth/id?client_id=' + # No expiration + signer = self.signer.from_string(private_key) + audience = ('https:#www.googleapis.com/auth/id?client_id=' 'external_public_key@testing.gserviceaccount.com') - jwt = crypt.make_signed_jwt(signer, { + jwt = crypt.make_signed_jwt(signer, { 'aud': audience, 'iat': time.time(), - }) - self._check_jwt_failure(jwt, 'No exp field in token') + }) + self._check_jwt_failure(jwt, 'No exp field in token') - # No issued at - jwt = crypt.make_signed_jwt(signer, { + # No issued at + jwt = crypt.make_signed_jwt(signer, { 'aud': 'audience', 'exp': time.time() + 400, - }) - self._check_jwt_failure(jwt, 'No iat field in token') + }) + self._check_jwt_failure(jwt, 'No iat field in token') - # Too early - jwt = crypt.make_signed_jwt(signer, { + # Too early + jwt = crypt.make_signed_jwt(signer, { 'aud': 'audience', 'iat': time.time() + 301, 'exp': time.time() + 400, - }) - self._check_jwt_failure(jwt, 'Token used too early') + }) + self._check_jwt_failure(jwt, 'Token used too early') - # Too late - jwt = crypt.make_signed_jwt(signer, { + # Too late + jwt = crypt.make_signed_jwt(signer, { 'aud': 'audience', 'iat': time.time() - 500, 'exp': time.time() - 301, - }) - self._check_jwt_failure(jwt, 'Token used too late') + }) + self._check_jwt_failure(jwt, 'Token used too late') - # Wrong target - jwt = crypt.make_signed_jwt(signer, { + # Wrong target + jwt = crypt.make_signed_jwt(signer, { 'aud': 'somebody else', 'iat': time.time(), 'exp': time.time() + 300, - }) - self._check_jwt_failure(jwt, 'Wrong recipient') + }) + self._check_jwt_failure(jwt, 'Wrong recipient') - def test_from_string_non_509_cert(self): - # Use a private key instead of a certificate to test the other branch - # of from_string(). - public_key = datafile('privatekey.pem') - verifier = self.verifier.from_string(public_key, is_x509_cert=False) - self.assertTrue(isinstance(verifier, self.verifier)) + def test_from_string_non_509_cert(self): + # Use a private key instead of a certificate to test the other branch + # of from_string(). + public_key = datafile('privatekey.pem') + verifier = self.verifier.from_string(public_key, is_x509_cert=False) + self.assertTrue(isinstance(verifier, self.verifier)) class PEMCryptTestsPyCrypto(CryptTests): - def setUp(self): - self.format = 'pem' - self.signer = crypt.PyCryptoSigner - self.verifier = crypt.PyCryptoVerifier + def setUp(self): + self.format = 'pem' + self.signer = crypt.PyCryptoSigner + self.verifier = crypt.PyCryptoVerifier class PEMCryptTestsOpenSSL(CryptTests): - def setUp(self): - self.format = 'pem' - self.signer = crypt.OpenSSLSigner - self.verifier = crypt.OpenSSLVerifier + def setUp(self): + self.format = 'pem' + self.signer = crypt.OpenSSLSigner + self.verifier = crypt.OpenSSLVerifier class SignedJwtAssertionCredentialsTests(unittest.TestCase): - def setUp(self): - self.format = 'p12' - crypt.Signer = crypt.OpenSSLSigner + def setUp(self): + self.format = 'p12' + crypt.Signer = crypt.OpenSSLSigner - def test_credentials_good(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( + def test_credentials_good(self): + private_key = datafile('privatekey.%s' % self.format) + credentials = SignedJwtAssertionCredentials( 'some_account@example.com', private_key, scope='read+write', sub='joe@example.org') - http = HttpMockSequence([ + http = HttpMockSequence([ ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), ({'status': '200'}, 'echo_request_headers'), - ]) - http = credentials.authorize(http) - resp, content = http.request('http://example.org') - self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) - - def test_credentials_to_from_json(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( + ]) + http = credentials.authorize(http) + resp, content = http.request('http://example.org') + self.assertEqual(b'Bearer 1/3w', content[b'Authorization']) + + def test_credentials_to_from_json(self): + private_key = datafile('privatekey.%s' % self.format) + credentials = SignedJwtAssertionCredentials( 'some_account@example.com', private_key, scope='read+write', sub='joe@example.org') - json = credentials.to_json() - restored = Credentials.new_from_json(json) - self.assertEqual(credentials.private_key, restored.private_key) - self.assertEqual(credentials.private_key_password, + json = credentials.to_json() + restored = Credentials.new_from_json(json) + self.assertEqual(credentials.private_key, restored.private_key) + self.assertEqual(credentials.private_key_password, restored.private_key_password) - self.assertEqual(credentials.kwargs, restored.kwargs) + self.assertEqual(credentials.kwargs, restored.kwargs) - def _credentials_refresh(self, credentials): - http = HttpMockSequence([ + def _credentials_refresh(self, credentials): + http = HttpMockSequence([ ({'status': '200'}, b'{"access_token":"1/3w","expires_in":3600}'), ({'status': '401'}, b''), ({'status': '200'}, b'{"access_token":"3/3w","expires_in":3600}'), ({'status': '200'}, 'echo_request_headers'), - ]) - http = credentials.authorize(http) - _, content = http.request('http://example.org') - return content - - def test_credentials_refresh_without_storage(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( + ]) + http = credentials.authorize(http) + _, content = http.request('http://example.org') + return content + + def test_credentials_refresh_without_storage(self): + private_key = datafile('privatekey.%s' % self.format) + credentials = SignedJwtAssertionCredentials( 'some_account@example.com', private_key, scope='read+write', sub='joe@example.org') - content = self._credentials_refresh(credentials) + content = self._credentials_refresh(credentials) - self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) + self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) - def test_credentials_refresh_with_storage(self): - private_key = datafile('privatekey.%s' % self.format) - credentials = SignedJwtAssertionCredentials( + def test_credentials_refresh_with_storage(self): + private_key = datafile('privatekey.%s' % self.format) + credentials = SignedJwtAssertionCredentials( 'some_account@example.com', private_key, scope='read+write', sub='joe@example.org') - (filehandle, filename) = tempfile.mkstemp() - os.close(filehandle) - store = Storage(filename) - store.put(credentials) - credentials.set_store(store) + (filehandle, filename) = tempfile.mkstemp() + os.close(filehandle) + store = Storage(filename) + store.put(credentials) + credentials.set_store(store) - content = self._credentials_refresh(credentials) + content = self._credentials_refresh(credentials) - self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) - os.unlink(filename) + self.assertEqual(b'Bearer 3/3w', content[b'Authorization']) + os.unlink(filename) class PEMSignedJwtAssertionCredentialsOpenSSLTests( SignedJwtAssertionCredentialsTests): - def setUp(self): - self.format = 'pem' - crypt.Signer = crypt.OpenSSLSigner + def setUp(self): + self.format = 'pem' + crypt.Signer = crypt.OpenSSLSigner class PEMSignedJwtAssertionCredentialsPyCryptoTests( SignedJwtAssertionCredentialsTests): - def setUp(self): - self.format = 'pem' - crypt.Signer = crypt.PyCryptoSigner + def setUp(self): + self.format = 'pem' + crypt.Signer = crypt.PyCryptoSigner class PKCSSignedJwtAssertionCredentialsPyCryptoTests(unittest.TestCase): - def test_for_failure(self): - crypt.Signer = crypt.PyCryptoSigner - private_key = datafile('privatekey.p12') - credentials = SignedJwtAssertionCredentials( + def test_for_failure(self): + crypt.Signer = crypt.PyCryptoSigner + private_key = datafile('privatekey.p12') + credentials = SignedJwtAssertionCredentials( 'some_account@example.com', private_key, scope='read+write', sub='joe@example.org') - try: - credentials._generate_assertion() - self.fail() - except NotImplementedError: - pass + try: + credentials._generate_assertion() + self.fail() + except NotImplementedError: + pass class TestHasOpenSSLFlag(unittest.TestCase): - def test_true(self): - self.assertEqual(True, HAS_OPENSSL) - self.assertEqual(True, HAS_CRYPTO) + def test_true(self): + self.assertEqual(True, HAS_OPENSSL) + self.assertEqual(True, HAS_CRYPTO) if __name__ == '__main__': - unittest.main() + unittest.main() |