diff options
author | Danny Hermes <daniel.j.hermes@gmail.com> | 2015-08-19 22:03:50 -0700 |
---|---|---|
committer | Danny Hermes <daniel.j.hermes@gmail.com> | 2015-08-21 08:04:14 -0700 |
commit | 34c1ff543dd16edf58dcf5b336076cf27de3721a (patch) | |
tree | c4bfda94143239939a56cec584a5c707b87bcced /tests/test_file.py | |
parent | 043e066e54d00bd3f8c5a90d2cb83292f30f8ede (diff) | |
download | oauth2client-34c1ff543dd16edf58dcf5b336076cf27de3721a.tar.gz |
Raw pep8ify changes.
Simply ran
pep8ify -w oauth2client/
pep8ify -w tests/
Diffstat (limited to 'tests/test_file.py')
-rw-r--r-- | tests/test_file.py | 528 |
1 files changed, 263 insertions, 265 deletions
diff --git a/tests/test_file.py b/tests/test_file.py index 89c3e02..c92c68f 100644 --- a/tests/test_file.py +++ b/tests/test_file.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - """Oauth2client.file tests Unit tests for oauth2client.file @@ -42,363 +41,362 @@ from oauth2client.client import AccessTokenCredentials from oauth2client.client import OAuth2Credentials from six.moves import http_client try: - # Python2 - from future_builtins import oct + # Python2 + from future_builtins import oct except: - pass - + pass FILENAME = tempfile.mktemp('oauth2client_test.data') class OAuth2ClientFileTests(unittest.TestCase): - def tearDown(self): - try: - os.unlink(FILENAME) - except OSError: - pass + def tearDown(self): + try: + os.unlink(FILENAME) + except OSError: + pass - def setUp(self): - try: - os.unlink(FILENAME) - except OSError: - pass + def setUp(self): + try: + os.unlink(FILENAME) + except OSError: + pass - def create_test_credentials(self, client_id='some_client_id', + def create_test_credentials(self, client_id='some_client_id', expiration=None): - access_token = 'foo' - client_secret = 'cOuDdkfjxxnv+' - refresh_token = '1/0/a.df219fjls0' - token_expiry = expiration or datetime.datetime.utcnow() - token_uri = 'https://www.google.com/accounts/o8/oauth2/token' - user_agent = 'refresh_checker/1.0' - - credentials = OAuth2Credentials( + access_token = 'foo' + client_secret = 'cOuDdkfjxxnv+' + refresh_token = '1/0/a.df219fjls0' + token_expiry = expiration or datetime.datetime.utcnow() + token_uri = 'https://www.google.com/accounts/o8/oauth2/token' + user_agent = 'refresh_checker/1.0' + + credentials = OAuth2Credentials( access_token, client_id, client_secret, refresh_token, token_expiry, token_uri, user_agent) - return credentials - - def test_non_existent_file_storage(self): - s = file.Storage(FILENAME) - credentials = s.get() - self.assertEquals(None, credentials) - - def test_no_sym_link_credentials(self): - if hasattr(os, 'symlink'): - SYMFILENAME = FILENAME + '.sym' - os.symlink(FILENAME, SYMFILENAME) - s = file.Storage(SYMFILENAME) - try: - s.get() - self.fail('Should have raised an exception.') - except file.CredentialsFileSymbolicLinkError: - pass - finally: - os.unlink(SYMFILENAME) - - def test_pickle_and_json_interop(self): - # Write a file with a pickled OAuth2Credentials. - credentials = self.create_test_credentials() - - f = open(FILENAME, 'wb') - pickle.dump(credentials, f) - f.close() - - # Storage should be not be able to read that object, as the capability to - # read and write credentials as pickled objects has been removed. - s = file.Storage(FILENAME) - read_credentials = s.get() - self.assertEquals(None, read_credentials) - - # Now write it back out and confirm it has been rewritten as JSON - s.put(credentials) - with open(FILENAME) as f: - data = json.load(f) - - self.assertEquals(data['access_token'], 'foo') - self.assertEquals(data['_class'], 'OAuth2Credentials') - self.assertEquals(data['_module'], OAuth2Credentials.__module__) - - def test_token_refresh_store_expired(self): - expiration = datetime.datetime.utcnow() - datetime.timedelta(minutes=15) - credentials = self.create_test_credentials(expiration=expiration) - - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() - new_cred = copy.copy(credentials) - new_cred.access_token = 'bar' - s.put(new_cred) - - access_token = '1/3w' - token_response = {'access_token': access_token, 'expires_in': 3600} - http = HttpMockSequence([ + return credentials + + def test_non_existent_file_storage(self): + s = file.Storage(FILENAME) + credentials = s.get() + self.assertEquals(None, credentials) + + def test_no_sym_link_credentials(self): + if hasattr(os, 'symlink'): + SYMFILENAME = FILENAME + '.sym' + os.symlink(FILENAME, SYMFILENAME) + s = file.Storage(SYMFILENAME) + try: + s.get() + self.fail('Should have raised an exception.') + except file.CredentialsFileSymbolicLinkError: + pass + finally: + os.unlink(SYMFILENAME) + + def test_pickle_and_json_interop(self): + # Write a file with a pickled OAuth2Credentials. + credentials = self.create_test_credentials() + + f = open(FILENAME, 'wb') + pickle.dump(credentials, f) + f.close() + + # Storage should be not be able to read that object, as the capability to + # read and write credentials as pickled objects has been removed. + s = file.Storage(FILENAME) + read_credentials = s.get() + self.assertEquals(None, read_credentials) + + # Now write it back out and confirm it has been rewritten as JSON + s.put(credentials) + with open(FILENAME) as f: + data = json.load(f) + + self.assertEquals(data['access_token'], 'foo') + self.assertEquals(data['_class'], 'OAuth2Credentials') + self.assertEquals(data['_module'], OAuth2Credentials.__module__) + + def test_token_refresh_store_expired(self): + expiration = datetime.datetime.utcnow() - datetime.timedelta(minutes=15) + credentials = self.create_test_credentials(expiration=expiration) + + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() + new_cred = copy.copy(credentials) + new_cred.access_token = 'bar' + s.put(new_cred) + + access_token = '1/3w' + token_response = {'access_token': access_token, 'expires_in': 3600} + http = HttpMockSequence([ ({'status': '200'}, json.dumps(token_response).encode('utf-8')), - ]) - - credentials._refresh(http.request) - self.assertEquals(credentials.access_token, access_token) - - def test_token_refresh_store_expires_soon(self): - # Tests the case where an access token that is valid when it is read from - # the store expires before the original request succeeds. - expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) - credentials = self.create_test_credentials(expiration=expiration) - - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() - new_cred = copy.copy(credentials) - new_cred.access_token = 'bar' - s.put(new_cred) - - access_token = '1/3w' - token_response = {'access_token': access_token, 'expires_in': 3600} - http = HttpMockSequence([ + ]) + + credentials._refresh(http.request) + self.assertEquals(credentials.access_token, access_token) + + def test_token_refresh_store_expires_soon(self): + # Tests the case where an access token that is valid when it is read from + # the store expires before the original request succeeds. + expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) + credentials = self.create_test_credentials(expiration=expiration) + + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() + new_cred = copy.copy(credentials) + new_cred.access_token = 'bar' + s.put(new_cred) + + access_token = '1/3w' + token_response = {'access_token': access_token, 'expires_in': 3600} + http = HttpMockSequence([ ({'status': str(http_client.UNAUTHORIZED)}, b'Initial token expired'), ({'status': str(http_client.UNAUTHORIZED)}, b'Store token expired'), ({'status': str(http_client.OK)}, json.dumps(token_response).encode('utf-8')), ({'status': str(http_client.OK)}, b'Valid response to original request') - ]) - - credentials.authorize(http) - http.request('https://example.com') - self.assertEqual(credentials.access_token, access_token) - - def test_token_refresh_good_store(self): - expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) - credentials = self.create_test_credentials(expiration=expiration) - - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() - new_cred = copy.copy(credentials) - new_cred.access_token = 'bar' - s.put(new_cred) - - credentials._refresh(lambda x: x) - self.assertEquals(credentials.access_token, 'bar') - - def test_token_refresh_stream_body(self): - expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) - credentials = self.create_test_credentials(expiration=expiration) - - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() - new_cred = copy.copy(credentials) - new_cred.access_token = 'bar' - s.put(new_cred) - - valid_access_token = '1/3w' - token_response = {'access_token': valid_access_token, 'expires_in': 3600} - http = HttpMockSequence([ + ]) + + credentials.authorize(http) + http.request('https://example.com') + self.assertEqual(credentials.access_token, access_token) + + def test_token_refresh_good_store(self): + expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) + credentials = self.create_test_credentials(expiration=expiration) + + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() + new_cred = copy.copy(credentials) + new_cred.access_token = 'bar' + s.put(new_cred) + + credentials._refresh(lambda x: x) + self.assertEquals(credentials.access_token, 'bar') + + def test_token_refresh_stream_body(self): + expiration = datetime.datetime.utcnow() + datetime.timedelta(minutes=15) + credentials = self.create_test_credentials(expiration=expiration) + + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() + new_cred = copy.copy(credentials) + new_cred.access_token = 'bar' + s.put(new_cred) + + valid_access_token = '1/3w' + token_response = {'access_token': valid_access_token, 'expires_in': 3600} + http = HttpMockSequence([ ({'status': str(http_client.UNAUTHORIZED)}, b'Initial token expired'), ({'status': str(http_client.UNAUTHORIZED)}, b'Store token expired'), ({'status': str(http_client.OK)}, json.dumps(token_response).encode('utf-8')), ({'status': str(http_client.OK)}, 'echo_request_body') - ]) + ]) - body = six.StringIO('streaming body') + body = six.StringIO('streaming body') - credentials.authorize(http) - _, content = http.request('https://example.com', body=body) - self.assertEqual(content, 'streaming body') - self.assertEqual(credentials.access_token, valid_access_token) + credentials.authorize(http) + _, content = http.request('https://example.com', body=body) + self.assertEqual(content, 'streaming body') + self.assertEqual(credentials.access_token, valid_access_token) - def test_credentials_delete(self): - credentials = self.create_test_credentials() + def test_credentials_delete(self): + credentials = self.create_test_credentials() - s = file.Storage(FILENAME) - s.put(credentials) - credentials = s.get() - self.assertNotEquals(None, credentials) - s.delete() - credentials = s.get() - self.assertEquals(None, credentials) + s = file.Storage(FILENAME) + s.put(credentials) + credentials = s.get() + self.assertNotEquals(None, credentials) + s.delete() + credentials = s.get() + self.assertEquals(None, credentials) - def test_access_token_credentials(self): - access_token = 'foo' - user_agent = 'refresh_checker/1.0' + def test_access_token_credentials(self): + access_token = 'foo' + user_agent = 'refresh_checker/1.0' - credentials = AccessTokenCredentials(access_token, user_agent) + credentials = AccessTokenCredentials(access_token, user_agent) - s = file.Storage(FILENAME) - credentials = s.put(credentials) - credentials = s.get() + s = file.Storage(FILENAME) + credentials = s.put(credentials) + credentials = s.get() - self.assertNotEquals(None, credentials) - self.assertEquals('foo', credentials.access_token) - mode = os.stat(FILENAME).st_mode + self.assertNotEquals(None, credentials) + self.assertEquals('foo', credentials.access_token) + mode = os.stat(FILENAME).st_mode - if os.name == 'posix': - self.assertEquals('0o600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) + if os.name == 'posix': + self.assertEquals('0o600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) - def test_read_only_file_fail_lock(self): - credentials = self.create_test_credentials() + def test_read_only_file_fail_lock(self): + credentials = self.create_test_credentials() - open(FILENAME, 'a+b').close() - os.chmod(FILENAME, 0o400) + open(FILENAME, 'a+b').close() + os.chmod(FILENAME, 0o400) - store = multistore_file.get_credential_storage( + store = multistore_file.get_credential_storage( FILENAME, credentials.client_id, credentials.user_agent, ['some-scope', 'some-other-scope']) - store.put(credentials) - if os.name == 'posix': - self.assertTrue(store._multistore._read_only) - os.chmod(FILENAME, 0o600) + store.put(credentials) + if os.name == 'posix': + self.assertTrue(store._multistore._read_only) + os.chmod(FILENAME, 0o600) - def test_multistore_no_symbolic_link_files(self): - if hasattr(os, 'symlink'): - SYMFILENAME = FILENAME + 'sym' - os.symlink(FILENAME, SYMFILENAME) - store = multistore_file.get_credential_storage( + def test_multistore_no_symbolic_link_files(self): + if hasattr(os, 'symlink'): + SYMFILENAME = FILENAME + 'sym' + os.symlink(FILENAME, SYMFILENAME) + store = multistore_file.get_credential_storage( SYMFILENAME, 'some_client_id', 'user-agent/1.0', ['some-scope', 'some-other-scope']) - try: - store.get() - self.fail('Should have raised an exception.') - except locked_file.CredentialsFileSymbolicLinkError: - pass - finally: - os.unlink(SYMFILENAME) - - def test_multistore_non_existent_file(self): - store = multistore_file.get_credential_storage( + try: + store.get() + self.fail('Should have raised an exception.') + except locked_file.CredentialsFileSymbolicLinkError: + pass + finally: + os.unlink(SYMFILENAME) + + def test_multistore_non_existent_file(self): + store = multistore_file.get_credential_storage( FILENAME, 'some_client_id', 'user-agent/1.0', ['some-scope', 'some-other-scope']) - credentials = store.get() - self.assertEquals(None, credentials) + credentials = store.get() + self.assertEquals(None, credentials) - def test_multistore_file(self): - credentials = self.create_test_credentials() + def test_multistore_file(self): + credentials = self.create_test_credentials() - store = multistore_file.get_credential_storage( + store = multistore_file.get_credential_storage( FILENAME, credentials.client_id, credentials.user_agent, ['some-scope', 'some-other-scope']) - store.put(credentials) - credentials = store.get() + store.put(credentials) + credentials = store.get() - self.assertNotEquals(None, credentials) - self.assertEquals('foo', credentials.access_token) + self.assertNotEquals(None, credentials) + self.assertEquals('foo', credentials.access_token) - store.delete() - credentials = store.get() + store.delete() + credentials = store.get() - self.assertEquals(None, credentials) + self.assertEquals(None, credentials) - if os.name == 'posix': - self.assertEquals('0o600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) + if os.name == 'posix': + self.assertEquals('0o600', oct(stat.S_IMODE(os.stat(FILENAME).st_mode))) - def test_multistore_file_custom_key(self): - credentials = self.create_test_credentials() + def test_multistore_file_custom_key(self): + credentials = self.create_test_credentials() - custom_key = {'myapp': 'testing', 'clientid': 'some client'} - store = multistore_file.get_credential_storage_custom_key( + custom_key = {'myapp': 'testing', 'clientid': 'some client'} + store = multistore_file.get_credential_storage_custom_key( FILENAME, custom_key) - store.put(credentials) - stored_credentials = store.get() + store.put(credentials) + stored_credentials = store.get() - self.assertNotEquals(None, stored_credentials) - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertNotEquals(None, stored_credentials) + self.assertEqual(credentials.access_token, stored_credentials.access_token) - store.delete() - stored_credentials = store.get() + store.delete() + stored_credentials = store.get() - self.assertEquals(None, stored_credentials) + self.assertEquals(None, stored_credentials) - def test_multistore_file_custom_string_key(self): - credentials = self.create_test_credentials() + def test_multistore_file_custom_string_key(self): + credentials = self.create_test_credentials() - # store with string key - store = multistore_file.get_credential_storage_custom_string_key( + # store with string key + store = multistore_file.get_credential_storage_custom_string_key( FILENAME, 'mykey') - store.put(credentials) - stored_credentials = store.get() + store.put(credentials) + stored_credentials = store.get() - self.assertNotEquals(None, stored_credentials) - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertNotEquals(None, stored_credentials) + self.assertEqual(credentials.access_token, stored_credentials.access_token) - # try retrieving with a dictionary - store_dict = multistore_file.get_credential_storage_custom_string_key( + # try retrieving with a dictionary + store_dict = multistore_file.get_credential_storage_custom_string_key( FILENAME, {'key': 'mykey'}) - stored_credentials = store.get() - self.assertNotEquals(None, stored_credentials) - self.assertEqual(credentials.access_token, stored_credentials.access_token) + stored_credentials = store.get() + self.assertNotEquals(None, stored_credentials) + self.assertEqual(credentials.access_token, stored_credentials.access_token) - store.delete() - stored_credentials = store.get() + store.delete() + stored_credentials = store.get() - self.assertEquals(None, stored_credentials) + self.assertEquals(None, stored_credentials) - def test_multistore_file_backwards_compatibility(self): - credentials = self.create_test_credentials() - scopes = ['scope1', 'scope2'] + def test_multistore_file_backwards_compatibility(self): + credentials = self.create_test_credentials() + scopes = ['scope1', 'scope2'] - # store the credentials using the legacy key method - store = multistore_file.get_credential_storage( + # store the credentials using the legacy key method + store = multistore_file.get_credential_storage( FILENAME, 'client_id', 'user_agent', scopes) - store.put(credentials) + store.put(credentials) - # retrieve the credentials using a custom key that matches the legacy key - key = {'clientId': 'client_id', 'userAgent': 'user_agent', + # retrieve the credentials using a custom key that matches the legacy key + key = {'clientId': 'client_id', 'userAgent': 'user_agent', 'scope': util.scopes_to_string(scopes)} - store = multistore_file.get_credential_storage_custom_key(FILENAME, key) - stored_credentials = store.get() + store = multistore_file.get_credential_storage_custom_key(FILENAME, key) + stored_credentials = store.get() - self.assertEqual(credentials.access_token, stored_credentials.access_token) + self.assertEqual(credentials.access_token, stored_credentials.access_token) - def test_multistore_file_get_all_keys(self): - # start with no keys - keys = multistore_file.get_all_credential_keys(FILENAME) - self.assertEquals([], keys) + def test_multistore_file_get_all_keys(self): + # start with no keys + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals([], keys) - # store credentials - credentials = self.create_test_credentials(client_id='client1') - custom_key = {'myapp': 'testing', 'clientid': 'client1'} - store1 = multistore_file.get_credential_storage_custom_key( + # store credentials + credentials = self.create_test_credentials(client_id='client1') + custom_key = {'myapp': 'testing', 'clientid': 'client1'} + store1 = multistore_file.get_credential_storage_custom_key( FILENAME, custom_key) - store1.put(credentials) + store1.put(credentials) - keys = multistore_file.get_all_credential_keys(FILENAME) - self.assertEquals([custom_key], keys) + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals([custom_key], keys) - # store more credentials - credentials = self.create_test_credentials(client_id='client2') - string_key = 'string_key' - store2 = multistore_file.get_credential_storage_custom_string_key( + # store more credentials + credentials = self.create_test_credentials(client_id='client2') + string_key = 'string_key' + store2 = multistore_file.get_credential_storage_custom_string_key( FILENAME, string_key) - store2.put(credentials) + store2.put(credentials) - keys = multistore_file.get_all_credential_keys(FILENAME) - self.assertEquals(2, len(keys)) - self.assertTrue(custom_key in keys) - self.assertTrue({'key': string_key} in keys) + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals(2, len(keys)) + self.assertTrue(custom_key in keys) + self.assertTrue({'key': string_key} in keys) - # back to no keys - store1.delete() - store2.delete() - keys = multistore_file.get_all_credential_keys(FILENAME) - self.assertEquals([], keys) + # back to no keys + store1.delete() + store2.delete() + keys = multistore_file.get_all_credential_keys(FILENAME) + self.assertEquals([], keys) if __name__ == '__main__': - unittest.main() + unittest.main() |