aboutsummaryrefslogtreecommitdiff
path: root/oauth2client/crypt.py
blob: 70bef8c184f92bc39c4778a1277d7990ab240ffe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# -*- coding: utf-8 -*-
#
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Crypto-related routines for oauth2client."""

import json
import logging
import time

from oauth2client._helpers import _from_bytes
from oauth2client._helpers import _json_encode
from oauth2client._helpers import _to_bytes
from oauth2client._helpers import _urlsafe_b64decode
from oauth2client._helpers import _urlsafe_b64encode
from oauth2client._pure_python_crypt import RsaSigner
from oauth2client._pure_python_crypt import RsaVerifier


CLOCK_SKEW_SECS = 300  # 5 minutes in seconds
AUTH_TOKEN_LIFETIME_SECS = 300  # 5 minutes in seconds
MAX_TOKEN_LIFETIME_SECS = 86400  # 1 day in seconds

logger = logging.getLogger(__name__)


class AppIdentityError(Exception):
    """Error to indicate crypto failure."""


def _bad_pkcs12_key_as_pem(*args, **kwargs):
    raise NotImplementedError('pkcs12_key_as_pem requires OpenSSL.')


try:
    from oauth2client._openssl_crypt import OpenSSLVerifier
    from oauth2client._openssl_crypt import OpenSSLSigner
    from oauth2client._openssl_crypt import pkcs12_key_as_pem
except ImportError:  # pragma: NO COVER
    OpenSSLVerifier = None
    OpenSSLSigner = None
    pkcs12_key_as_pem = _bad_pkcs12_key_as_pem

try:
    from oauth2client._pycrypto_crypt import PyCryptoVerifier
    from oauth2client._pycrypto_crypt import PyCryptoSigner
except ImportError:  # pragma: NO COVER
    PyCryptoVerifier = None
    PyCryptoSigner = None


if OpenSSLSigner:
    Signer = OpenSSLSigner
    Verifier = OpenSSLVerifier
elif PyCryptoSigner:  # pragma: NO COVER
    Signer = PyCryptoSigner
    Verifier = PyCryptoVerifier
else:  # pragma: NO COVER
    Signer = RsaSigner
    Verifier = RsaVerifier


def make_signed_jwt(signer, payload, key_id=None):
    """Make a signed JWT.

    See http://self-issued.info/docs/draft-jones-json-web-token.html.

    Args:
        signer: crypt.Signer, Cryptographic signer.
        payload: dict, Dictionary of data to convert to JSON and then sign.
        key_id: string, (Optional) Key ID header.

    Returns:
        string, The JWT for the payload.
    """
    header = {'typ': 'JWT', 'alg': 'RS256'}
    if key_id is not None:
        header['kid'] = key_id

    segments = [
        _urlsafe_b64encode(_json_encode(header)),
        _urlsafe_b64encode(_json_encode(payload)),
    ]
    signing_input = b'.'.join(segments)

    signature = signer.sign(signing_input)
    segments.append(_urlsafe_b64encode(signature))

    logger.debug(str(segments))

    return b'.'.join(segments)


def _verify_signature(message, signature, certs):
    """Verifies signed content using a list of certificates.

    Args:
        message: string or bytes, The message to verify.
        signature: string or bytes, The signature on the message.
        certs: iterable, certificates in PEM format.

    Raises:
        AppIdentityError: If none of the certificates can verify the message
                          against the signature.
    """
    for pem in certs:
        verifier = Verifier.from_string(pem, is_x509_cert=True)
        if verifier.verify(message, signature):
            return

    # If we have not returned, no certificate confirms the signature.
    raise AppIdentityError('Invalid token signature')


def _check_audience(payload_dict, audience):
    """Checks audience field from a JWT payload.

    Does nothing if the passed in ``audience`` is null.

    Args:
        payload_dict: dict, A dictionary containing a JWT payload.
        audience: string or NoneType, an audience to check for in
                  the JWT payload.

    Raises:
        AppIdentityError: If there is no ``'aud'`` field in the payload
                          dictionary but there is an ``audience`` to check.
        AppIdentityError: If the ``'aud'`` field in the payload dictionary
                          does not match the ``audience``.
    """
    if audience is None:
        return

    audience_in_payload = payload_dict.get('aud')
    if audience_in_payload is None:
        raise AppIdentityError('No aud field in token: %s' %
                               (payload_dict,))
    if audience_in_payload != audience:
        raise AppIdentityError('Wrong recipient, %s != %s: %s' %
                               (audience_in_payload, audience, payload_dict))


def _verify_time_range(payload_dict):
    """Verifies the issued at and expiration from a JWT payload.

    Makes sure the current time (in UTC) falls between the issued at and
    expiration for the JWT (with some skew allowed for via
    ``CLOCK_SKEW_SECS``).

    Args:
        payload_dict: dict, A dictionary containing a JWT payload.

    Raises:
        AppIdentityError: If there is no ``'iat'`` field in the payload
                          dictionary.
        AppIdentityError: If there is no ``'exp'`` field in the payload
                          dictionary.
        AppIdentityError: If the JWT expiration is too far in the future (i.e.
                          if the expiration would imply a token lifetime
                          longer than what is allowed.)
        AppIdentityError: If the token appears to have been issued in the
                          future (up to clock skew).
        AppIdentityError: If the token appears to have expired in the past
                          (up to clock skew).
    """
    # Get the current time to use throughout.
    now = int(time.time())

    # Make sure issued at and expiration are in the payload.
    issued_at = payload_dict.get('iat')
    if issued_at is None:
        raise AppIdentityError('No iat field in token: %s' % (payload_dict,))
    expiration = payload_dict.get('exp')
    if expiration is None:
        raise AppIdentityError('No exp field in token: %s' % (payload_dict,))

    # Make sure the expiration gives an acceptable token lifetime.
    if expiration >= now + MAX_TOKEN_LIFETIME_SECS:
        raise AppIdentityError('exp field too far in future: %s' %
                               (payload_dict,))

    # Make sure (up to clock skew) that the token wasn't issued in the future.
    earliest = issued_at - CLOCK_SKEW_SECS
    if now < earliest:
        raise AppIdentityError('Token used too early, %d < %d: %s' %
                               (now, earliest, payload_dict))
    # Make sure (up to clock skew) that the token isn't already expired.
    latest = expiration + CLOCK_SKEW_SECS
    if now > latest:
        raise AppIdentityError('Token used too late, %d > %d: %s' %
                               (now, latest, payload_dict))


def verify_signed_jwt_with_certs(jwt, certs, audience=None):
    """Verify a JWT against public certs.

    See http://self-issued.info/docs/draft-jones-json-web-token.html.

    Args:
        jwt: string, A JWT.
        certs: dict, Dictionary where values of public keys in PEM format.
        audience: string, The audience, 'aud', that this JWT should contain. If
                  None then the JWT's 'aud' parameter is not verified.

    Returns:
        dict, The deserialized JSON payload in the JWT.

    Raises:
        AppIdentityError: if any checks are failed.
    """
    jwt = _to_bytes(jwt)

    if jwt.count(b'.') != 2:
        raise AppIdentityError(
            'Wrong number of segments in token: %s' % (jwt,))

    header, payload, signature = jwt.split(b'.')
    message_to_sign = header + b'.' + payload
    signature = _urlsafe_b64decode(signature)

    # Parse token.
    payload_bytes = _urlsafe_b64decode(payload)
    try:
        payload_dict = json.loads(_from_bytes(payload_bytes))
    except:
        raise AppIdentityError('Can\'t parse token: %s' % (payload_bytes,))

    # Verify that the signature matches the message.
    _verify_signature(message_to_sign, signature, certs.values())

    # Verify the issued at and created times in the payload.
    _verify_time_range(payload_dict)

    # Check audience.
    _check_audience(payload_dict, audience)

    return payload_dict