nixpkgs mirror (for testing) github.com/NixOS/nixpkgs
nix
at python-updates 334 lines 14 kB view raw
1diff --git a/pyproject.toml b/pyproject.toml 2index 87068b18..03f30491 100644 3--- a/pyproject.toml 4+++ b/pyproject.toml 5@@ -22,8 +22,6 @@ requires-python = ">= 3.9" 6 dependencies = [ 7 "cryptography >=3.1", 8 "defusedxml", 9- "pyopenssl <24.3.0", 10- "python-dateutil", 11 "requests >=2.0.0,<3.0.0", # ^2 means compatible with 2.x 12 "xmlschema >=2.0.0,<3.0.0" 13 ] 14diff --git a/src/saml2/cert.py b/src/saml2/cert.py 15index e90651e4..926aec8a 100644 16--- a/src/saml2/cert.py 17+++ b/src/saml2/cert.py 18@@ -3,11 +3,13 @@ __author__ = "haho0032" 19 import base64 20 from os import remove 21 from os.path import join 22-from datetime import datetime 23-from datetime import timezone 24+from datetime import datetime, timedelta, timezone 25 26-from OpenSSL import crypto 27-import dateutil.parser 28+from cryptography import x509 29+from cryptography.exceptions import InvalidSignature 30+from cryptography.hazmat.primitives import hashes, serialization 31+from cryptography.hazmat.primitives.asymmetric import rsa 32+from cryptography.x509.oid import NameOID 33 34 import saml2.cryptography.pki 35 36@@ -36,7 +38,6 @@ class OpenSSLWrapper: 37 valid_to=315360000, 38 sn=1, 39 key_length=1024, 40- hash_alg="sha256", 41 write_to_file=False, 42 cert_dir="", 43 cipher_passphrase=None, 44@@ -87,8 +88,6 @@ class OpenSSLWrapper: 45 is 1. 46 :param key_length: Length of the key to be generated. Defaults 47 to 1024. 48- :param hash_alg: Hash algorithm to use for the key. Default 49- is sha256. 50 :param write_to_file: True if you want to write the certificate 51 to a file. The method will then return 52 a tuple with path to certificate file and 53@@ -131,49 +130,68 @@ class OpenSSLWrapper: 54 k_f = join(cert_dir, key_file) 55 56 # create a key pair 57- k = crypto.PKey() 58- k.generate_key(crypto.TYPE_RSA, key_length) 59+ k = rsa.generate_private_key( 60+ public_exponent=65537, 61+ key_size=key_length, 62+ ) 63 64 # create a self-signed cert 65- cert = crypto.X509() 66+ builder = x509.CertificateBuilder() 67 68 if request: 69- cert = crypto.X509Req() 70+ builder = x509.CertificateSigningRequestBuilder() 71 72 if len(cert_info["country_code"]) != 2: 73 raise WrongInput("Country code must be two letters!") 74- cert.get_subject().C = cert_info["country_code"] 75- cert.get_subject().ST = cert_info["state"] 76- cert.get_subject().L = cert_info["city"] 77- cert.get_subject().O = cert_info["organization"] # noqa: E741 78- cert.get_subject().OU = cert_info["organization_unit"] 79- cert.get_subject().CN = cn 80+ subject_name = x509.Name([ 81+ x509.NameAttribute(NameOID.COUNTRY_NAME, 82+ cert_info["country_code"]), 83+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, 84+ cert_info["state"]), 85+ x509.NameAttribute(NameOID.LOCALITY_NAME, 86+ cert_info["city"]), 87+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, 88+ cert_info["organization"]), 89+ x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, 90+ cert_info["organization_unit"]), 91+ x509.NameAttribute(NameOID.COMMON_NAME, cn), 92+ ]) 93+ builder = builder.subject_name(subject_name) 94 if not request: 95- cert.set_serial_number(sn) 96- cert.gmtime_adj_notBefore(valid_from) # Valid before present time 97- cert.gmtime_adj_notAfter(valid_to) # 3 650 days 98- cert.set_issuer(cert.get_subject()) 99- cert.set_pubkey(k) 100- cert.sign(k, hash_alg) 101+ now = datetime.now(timezone.utc) 102+ builder = builder.serial_number( 103+ sn, 104+ ).not_valid_before( 105+ now + timedelta(seconds=valid_from), 106+ ).not_valid_after( 107+ now + timedelta(seconds=valid_to), 108+ ).issuer_name( 109+ subject_name, 110+ ).public_key( 111+ k.public_key(), 112+ ) 113+ cert = builder.sign(k, hashes.SHA256()) 114 115 try: 116- if request: 117- tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, cert) 118- else: 119- tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) 120- tmp_key = None 121+ tmp_cert = cert.public_bytes(serialization.Encoding.PEM) 122+ key_encryption = None 123 if cipher_passphrase is not None: 124 passphrase = cipher_passphrase["passphrase"] 125 if isinstance(cipher_passphrase["passphrase"], str): 126 passphrase = passphrase.encode("utf-8") 127- tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, cipher_passphrase["cipher"], passphrase) 128+ key_encryption = serialization.BestAvailableEncryption(passphrase) 129 else: 130- tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k) 131+ key_encryption = serialization.NoEncryption() 132+ tmp_key = k.private_bytes( 133+ encoding=serialization.Encoding.PEM, 134+ format=serialization.PrivateFormat.TraditionalOpenSSL, 135+ encryption_algorithm=key_encryption, 136+ ) 137 if write_to_file: 138- with open(c_f, "w") as fc: 139- fc.write(tmp_cert.decode("utf-8")) 140- with open(k_f, "w") as fk: 141- fk.write(tmp_key.decode("utf-8")) 142+ with open(c_f, "wb") as fc: 143+ fc.write(tmp_cert) 144+ with open(k_f, "wb") as fk: 145+ fk.write(tmp_key) 146 return c_f, k_f 147 return tmp_cert, tmp_key 148 except Exception as ex: 149@@ -198,7 +216,6 @@ class OpenSSLWrapper: 150 sign_cert_str, 151 sign_key_str, 152 request_cert_str, 153- hash_alg="sha256", 154 valid_from=0, 155 valid_to=315360000, 156 sn=1, 157@@ -222,8 +239,6 @@ class OpenSSLWrapper: 158 the requested certificate. If you only have 159 a file use the method read_str_from_file 160 to get a string representation. 161- :param hash_alg: Hash algorithm to use for the key. Default 162- is sha256. 163 :param valid_from: When the certificate starts to be valid. 164 Amount of seconds from when the 165 certificate is generated. 166@@ -237,27 +252,29 @@ class OpenSSLWrapper: 167 :return: String representation of the signed 168 certificate. 169 """ 170- ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str) 171- ca_key = None 172- if passphrase is not None: 173- ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, passphrase) 174- else: 175- ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str) 176- req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, request_cert_str) 177- 178- cert = crypto.X509() 179- cert.set_subject(req_cert.get_subject()) 180- cert.set_serial_number(sn) 181- cert.gmtime_adj_notBefore(valid_from) 182- cert.gmtime_adj_notAfter(valid_to) 183- cert.set_issuer(ca_cert.get_subject()) 184- cert.set_pubkey(req_cert.get_pubkey()) 185- cert.sign(ca_key, hash_alg) 186- 187- cert_dump = crypto.dump_certificate(crypto.FILETYPE_PEM, cert) 188- if isinstance(cert_dump, str): 189- return cert_dump 190- return cert_dump.decode("utf-8") 191+ if isinstance(sign_cert_str, str): 192+ sign_cert_str = sign_cert_str.encode("utf-8") 193+ ca_cert = x509.load_pem_x509_certificate(sign_cert_str) 194+ ca_key = serialization.load_pem_private_key( 195+ sign_key_str, password=passphrase) 196+ req_cert = x509.load_pem_x509_csr(request_cert_str) 197+ 198+ now = datetime.now(timezone.utc) 199+ cert = x509.CertificateBuilder().subject_name( 200+ req_cert.subject, 201+ ).serial_number( 202+ sn, 203+ ).not_valid_before( 204+ now + timedelta(seconds=valid_from), 205+ ).not_valid_after( 206+ now + timedelta(seconds=valid_to), 207+ ).issuer_name( 208+ ca_cert.subject, 209+ ).public_key( 210+ req_cert.public_key(), 211+ ).sign(ca_key, hashes.SHA256()) 212+ 213+ return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") 214 215 def verify_chain(self, cert_chain_str_list, cert_str): 216 """ 217@@ -276,13 +293,6 @@ class OpenSSLWrapper: 218 cert_str = tmp_cert_str 219 return (True, "Signed certificate is valid and correctly signed by CA " "certificate.") 220 221- def certificate_not_valid_yet(self, cert): 222- starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore()) 223- now = datetime.now(timezone.utc) 224- if starts_to_be_valid < now: 225- return False 226- return True 227- 228 def verify(self, signing_cert_str, cert_str): 229 """ 230 Verifies if a certificate is valid and signed by a given certificate. 231@@ -303,34 +313,34 @@ class OpenSSLWrapper: 232 Message = Why the validation failed. 233 """ 234 try: 235- ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, signing_cert_str) 236- cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) 237- 238- if self.certificate_not_valid_yet(ca_cert): 239+ if isinstance(signing_cert_str, str): 240+ signing_cert_str = signing_cert_str.encode("utf-8") 241+ if isinstance(cert_str, str): 242+ cert_str = cert_str.encode("utf-8") 243+ ca_cert = x509.load_pem_x509_certificate(signing_cert_str) 244+ cert = x509.load_pem_x509_certificate(cert_str) 245+ now = datetime.now(timezone.utc) 246+ 247+ if ca_cert.not_valid_before_utc >= now: 248 return False, "CA certificate is not valid yet." 249 250- if ca_cert.has_expired() == 1: 251+ if ca_cert.not_valid_after_utc < now: 252 return False, "CA certificate is expired." 253 254- if cert.has_expired() == 1: 255+ if cert.not_valid_after_utc < now: 256 return False, "The signed certificate is expired." 257 258- if self.certificate_not_valid_yet(cert): 259+ if cert.not_valid_before_utc >= now: 260 return False, "The signed certificate is not valid yet." 261 262- if ca_cert.get_subject().CN == cert.get_subject().CN: 263+ if ca_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == \ 264+ cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME): 265 return False, ("CN may not be equal for CA certificate and the " "signed certificate.") 266 267- cert_algorithm = cert.get_signature_algorithm() 268- cert_algorithm = cert_algorithm.decode("ascii") 269- cert_str = cert_str.encode("ascii") 270- 271- cert_crypto = saml2.cryptography.pki.load_pem_x509_certificate(cert_str) 272- 273 try: 274- crypto.verify(ca_cert, cert_crypto.signature, cert_crypto.tbs_certificate_bytes, cert_algorithm) 275+ cert.verify_directly_issued_by(ca_cert) 276 return True, "Signed certificate is valid and correctly signed by CA certificate." 277- except crypto.Error as e: 278+ except (ValueError, TypeError, InvalidSignature) as e: 279 return False, f"Certificate is incorrectly signed: {str(e)}" 280 except Exception as e: 281 return False, f"Certificate is not valid for an unknown reason. {str(e)}" 282@@ -352,8 +362,14 @@ def read_cert_from_file(cert_file, cert_type="pem"): 283 data = fp.read() 284 285 try: 286- cert = saml2.cryptography.pki.load_x509_certificate(data, cert_type) 287- pem_data = saml2.cryptography.pki.get_public_bytes_from_cert(cert) 288+ cert = None 289+ if cert_type == "pem": 290+ cert = x509.load_pem_x509_certificate(data) 291+ elif cert_type == "der": 292+ cert = x509.load_der_x509_certificate(data) 293+ else: 294+ raise ValueError(f"cert-type {cert_type} not supported") 295+ pem_data = cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") 296 except Exception as e: 297 raise CertificateError(e) 298 299diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py 300index 738ac04b..60c83718 100644 301--- a/src/saml2/sigver.py 302+++ b/src/saml2/sigver.py 303@@ -18,8 +18,9 @@ from time import mktime 304 from urllib import parse 305 from uuid import uuid4 as gen_random_key 306 307-from OpenSSL import crypto 308-import dateutil 309+from urllib import parse 310+ 311+from cryptography import x509 312 313 from saml2 import ExtensionElement 314 from saml2 import SamlBase 315@@ -373,14 +374,14 @@ def active_cert(key): 316 """ 317 try: 318 cert_str = pem_format(key) 319- cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str) 320+ cert = x509.load_pem_x509_certificate(cert_str) 321 except AttributeError: 322 return False 323 324- now = datetime.now(timezone.utc) 325- valid_from = dateutil.parser.parse(cert.get_notBefore()) 326- valid_to = dateutil.parser.parse(cert.get_notAfter()) 327- active = not cert.has_expired() and valid_from <= now < valid_to 328+ now = datetime.datetime.now(datetime.timezone.utc) 329+ valid_from = cert.not_valid_before_utc 330+ valid_to = cert.not_valid_after_utc 331+ active = valid_from <= now < valid_to 332 return active 333 334