nixpkgs mirror (for testing)
github.com/NixOS/nixpkgs
nix
1diff --git a/pyproject.toml b/pyproject.toml
2index 87068b18..03f30491 100644
3--- a/pyproject.toml
4+++ b/pyproject.toml
5@@ -22,8 +22,6 @@ requires-python = ">= 3.9"
6 dependencies = [
7 "cryptography >=3.1",
8 "defusedxml",
9- "pyopenssl <24.3.0",
10- "python-dateutil",
11 "requests >=2.0.0,<3.0.0", # ^2 means compatible with 2.x
12 "xmlschema >=2.0.0,<3.0.0"
13 ]
14diff --git a/src/saml2/cert.py b/src/saml2/cert.py
15index e90651e4..926aec8a 100644
16--- a/src/saml2/cert.py
17+++ b/src/saml2/cert.py
18@@ -3,11 +3,13 @@ __author__ = "haho0032"
19 import base64
20 from os import remove
21 from os.path import join
22-from datetime import datetime
23-from datetime import timezone
24+from datetime import datetime, timedelta, timezone
25
26-from OpenSSL import crypto
27-import dateutil.parser
28+from cryptography import x509
29+from cryptography.exceptions import InvalidSignature
30+from cryptography.hazmat.primitives import hashes, serialization
31+from cryptography.hazmat.primitives.asymmetric import rsa
32+from cryptography.x509.oid import NameOID
33
34 import saml2.cryptography.pki
35
36@@ -36,7 +38,6 @@ class OpenSSLWrapper:
37 valid_to=315360000,
38 sn=1,
39 key_length=1024,
40- hash_alg="sha256",
41 write_to_file=False,
42 cert_dir="",
43 cipher_passphrase=None,
44@@ -87,8 +88,6 @@ class OpenSSLWrapper:
45 is 1.
46 :param key_length: Length of the key to be generated. Defaults
47 to 1024.
48- :param hash_alg: Hash algorithm to use for the key. Default
49- is sha256.
50 :param write_to_file: True if you want to write the certificate
51 to a file. The method will then return
52 a tuple with path to certificate file and
53@@ -131,49 +130,68 @@ class OpenSSLWrapper:
54 k_f = join(cert_dir, key_file)
55
56 # create a key pair
57- k = crypto.PKey()
58- k.generate_key(crypto.TYPE_RSA, key_length)
59+ k = rsa.generate_private_key(
60+ public_exponent=65537,
61+ key_size=key_length,
62+ )
63
64 # create a self-signed cert
65- cert = crypto.X509()
66+ builder = x509.CertificateBuilder()
67
68 if request:
69- cert = crypto.X509Req()
70+ builder = x509.CertificateSigningRequestBuilder()
71
72 if len(cert_info["country_code"]) != 2:
73 raise WrongInput("Country code must be two letters!")
74- cert.get_subject().C = cert_info["country_code"]
75- cert.get_subject().ST = cert_info["state"]
76- cert.get_subject().L = cert_info["city"]
77- cert.get_subject().O = cert_info["organization"] # noqa: E741
78- cert.get_subject().OU = cert_info["organization_unit"]
79- cert.get_subject().CN = cn
80+ subject_name = x509.Name([
81+ x509.NameAttribute(NameOID.COUNTRY_NAME,
82+ cert_info["country_code"]),
83+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME,
84+ cert_info["state"]),
85+ x509.NameAttribute(NameOID.LOCALITY_NAME,
86+ cert_info["city"]),
87+ x509.NameAttribute(NameOID.ORGANIZATION_NAME,
88+ cert_info["organization"]),
89+ x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME,
90+ cert_info["organization_unit"]),
91+ x509.NameAttribute(NameOID.COMMON_NAME, cn),
92+ ])
93+ builder = builder.subject_name(subject_name)
94 if not request:
95- cert.set_serial_number(sn)
96- cert.gmtime_adj_notBefore(valid_from) # Valid before present time
97- cert.gmtime_adj_notAfter(valid_to) # 3 650 days
98- cert.set_issuer(cert.get_subject())
99- cert.set_pubkey(k)
100- cert.sign(k, hash_alg)
101+ now = datetime.now(timezone.utc)
102+ builder = builder.serial_number(
103+ sn,
104+ ).not_valid_before(
105+ now + timedelta(seconds=valid_from),
106+ ).not_valid_after(
107+ now + timedelta(seconds=valid_to),
108+ ).issuer_name(
109+ subject_name,
110+ ).public_key(
111+ k.public_key(),
112+ )
113+ cert = builder.sign(k, hashes.SHA256())
114
115 try:
116- if request:
117- tmp_cert = crypto.dump_certificate_request(crypto.FILETYPE_PEM, cert)
118- else:
119- tmp_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
120- tmp_key = None
121+ tmp_cert = cert.public_bytes(serialization.Encoding.PEM)
122+ key_encryption = None
123 if cipher_passphrase is not None:
124 passphrase = cipher_passphrase["passphrase"]
125 if isinstance(cipher_passphrase["passphrase"], str):
126 passphrase = passphrase.encode("utf-8")
127- tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k, cipher_passphrase["cipher"], passphrase)
128+ key_encryption = serialization.BestAvailableEncryption(passphrase)
129 else:
130- tmp_key = crypto.dump_privatekey(crypto.FILETYPE_PEM, k)
131+ key_encryption = serialization.NoEncryption()
132+ tmp_key = k.private_bytes(
133+ encoding=serialization.Encoding.PEM,
134+ format=serialization.PrivateFormat.TraditionalOpenSSL,
135+ encryption_algorithm=key_encryption,
136+ )
137 if write_to_file:
138- with open(c_f, "w") as fc:
139- fc.write(tmp_cert.decode("utf-8"))
140- with open(k_f, "w") as fk:
141- fk.write(tmp_key.decode("utf-8"))
142+ with open(c_f, "wb") as fc:
143+ fc.write(tmp_cert)
144+ with open(k_f, "wb") as fk:
145+ fk.write(tmp_key)
146 return c_f, k_f
147 return tmp_cert, tmp_key
148 except Exception as ex:
149@@ -198,7 +216,6 @@ class OpenSSLWrapper:
150 sign_cert_str,
151 sign_key_str,
152 request_cert_str,
153- hash_alg="sha256",
154 valid_from=0,
155 valid_to=315360000,
156 sn=1,
157@@ -222,8 +239,6 @@ class OpenSSLWrapper:
158 the requested certificate. If you only have
159 a file use the method read_str_from_file
160 to get a string representation.
161- :param hash_alg: Hash algorithm to use for the key. Default
162- is sha256.
163 :param valid_from: When the certificate starts to be valid.
164 Amount of seconds from when the
165 certificate is generated.
166@@ -237,27 +252,29 @@ class OpenSSLWrapper:
167 :return: String representation of the signed
168 certificate.
169 """
170- ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, sign_cert_str)
171- ca_key = None
172- if passphrase is not None:
173- ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str, passphrase)
174- else:
175- ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, sign_key_str)
176- req_cert = crypto.load_certificate_request(crypto.FILETYPE_PEM, request_cert_str)
177-
178- cert = crypto.X509()
179- cert.set_subject(req_cert.get_subject())
180- cert.set_serial_number(sn)
181- cert.gmtime_adj_notBefore(valid_from)
182- cert.gmtime_adj_notAfter(valid_to)
183- cert.set_issuer(ca_cert.get_subject())
184- cert.set_pubkey(req_cert.get_pubkey())
185- cert.sign(ca_key, hash_alg)
186-
187- cert_dump = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
188- if isinstance(cert_dump, str):
189- return cert_dump
190- return cert_dump.decode("utf-8")
191+ if isinstance(sign_cert_str, str):
192+ sign_cert_str = sign_cert_str.encode("utf-8")
193+ ca_cert = x509.load_pem_x509_certificate(sign_cert_str)
194+ ca_key = serialization.load_pem_private_key(
195+ sign_key_str, password=passphrase)
196+ req_cert = x509.load_pem_x509_csr(request_cert_str)
197+
198+ now = datetime.now(timezone.utc)
199+ cert = x509.CertificateBuilder().subject_name(
200+ req_cert.subject,
201+ ).serial_number(
202+ sn,
203+ ).not_valid_before(
204+ now + timedelta(seconds=valid_from),
205+ ).not_valid_after(
206+ now + timedelta(seconds=valid_to),
207+ ).issuer_name(
208+ ca_cert.subject,
209+ ).public_key(
210+ req_cert.public_key(),
211+ ).sign(ca_key, hashes.SHA256())
212+
213+ return cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")
214
215 def verify_chain(self, cert_chain_str_list, cert_str):
216 """
217@@ -276,13 +293,6 @@ class OpenSSLWrapper:
218 cert_str = tmp_cert_str
219 return (True, "Signed certificate is valid and correctly signed by CA " "certificate.")
220
221- def certificate_not_valid_yet(self, cert):
222- starts_to_be_valid = dateutil.parser.parse(cert.get_notBefore())
223- now = datetime.now(timezone.utc)
224- if starts_to_be_valid < now:
225- return False
226- return True
227-
228 def verify(self, signing_cert_str, cert_str):
229 """
230 Verifies if a certificate is valid and signed by a given certificate.
231@@ -303,34 +313,34 @@ class OpenSSLWrapper:
232 Message = Why the validation failed.
233 """
234 try:
235- ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, signing_cert_str)
236- cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
237-
238- if self.certificate_not_valid_yet(ca_cert):
239+ if isinstance(signing_cert_str, str):
240+ signing_cert_str = signing_cert_str.encode("utf-8")
241+ if isinstance(cert_str, str):
242+ cert_str = cert_str.encode("utf-8")
243+ ca_cert = x509.load_pem_x509_certificate(signing_cert_str)
244+ cert = x509.load_pem_x509_certificate(cert_str)
245+ now = datetime.now(timezone.utc)
246+
247+ if ca_cert.not_valid_before_utc >= now:
248 return False, "CA certificate is not valid yet."
249
250- if ca_cert.has_expired() == 1:
251+ if ca_cert.not_valid_after_utc < now:
252 return False, "CA certificate is expired."
253
254- if cert.has_expired() == 1:
255+ if cert.not_valid_after_utc < now:
256 return False, "The signed certificate is expired."
257
258- if self.certificate_not_valid_yet(cert):
259+ if cert.not_valid_before_utc >= now:
260 return False, "The signed certificate is not valid yet."
261
262- if ca_cert.get_subject().CN == cert.get_subject().CN:
263+ if ca_cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) == \
264+ cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME):
265 return False, ("CN may not be equal for CA certificate and the " "signed certificate.")
266
267- cert_algorithm = cert.get_signature_algorithm()
268- cert_algorithm = cert_algorithm.decode("ascii")
269- cert_str = cert_str.encode("ascii")
270-
271- cert_crypto = saml2.cryptography.pki.load_pem_x509_certificate(cert_str)
272-
273 try:
274- crypto.verify(ca_cert, cert_crypto.signature, cert_crypto.tbs_certificate_bytes, cert_algorithm)
275+ cert.verify_directly_issued_by(ca_cert)
276 return True, "Signed certificate is valid and correctly signed by CA certificate."
277- except crypto.Error as e:
278+ except (ValueError, TypeError, InvalidSignature) as e:
279 return False, f"Certificate is incorrectly signed: {str(e)}"
280 except Exception as e:
281 return False, f"Certificate is not valid for an unknown reason. {str(e)}"
282@@ -352,8 +362,14 @@ def read_cert_from_file(cert_file, cert_type="pem"):
283 data = fp.read()
284
285 try:
286- cert = saml2.cryptography.pki.load_x509_certificate(data, cert_type)
287- pem_data = saml2.cryptography.pki.get_public_bytes_from_cert(cert)
288+ cert = None
289+ if cert_type == "pem":
290+ cert = x509.load_pem_x509_certificate(data)
291+ elif cert_type == "der":
292+ cert = x509.load_der_x509_certificate(data)
293+ else:
294+ raise ValueError(f"cert-type {cert_type} not supported")
295+ pem_data = cert.public_bytes(serialization.Encoding.PEM).decode("utf-8")
296 except Exception as e:
297 raise CertificateError(e)
298
299diff --git a/src/saml2/sigver.py b/src/saml2/sigver.py
300index 738ac04b..60c83718 100644
301--- a/src/saml2/sigver.py
302+++ b/src/saml2/sigver.py
303@@ -18,8 +18,9 @@ from time import mktime
304 from urllib import parse
305 from uuid import uuid4 as gen_random_key
306
307-from OpenSSL import crypto
308-import dateutil
309+from urllib import parse
310+
311+from cryptography import x509
312
313 from saml2 import ExtensionElement
314 from saml2 import SamlBase
315@@ -373,14 +374,14 @@ def active_cert(key):
316 """
317 try:
318 cert_str = pem_format(key)
319- cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_str)
320+ cert = x509.load_pem_x509_certificate(cert_str)
321 except AttributeError:
322 return False
323
324- now = datetime.now(timezone.utc)
325- valid_from = dateutil.parser.parse(cert.get_notBefore())
326- valid_to = dateutil.parser.parse(cert.get_notAfter())
327- active = not cert.has_expired() and valid_from <= now < valid_to
328+ now = datetime.datetime.now(datetime.timezone.utc)
329+ valid_from = cert.not_valid_before_utc
330+ valid_to = cert.not_valid_after_utc
331+ active = valid_from <= now < valid_to
332 return active
333
334