Skip to content

Commit

Permalink
Format code with black and isort (#639)
Browse files Browse the repository at this point in the history
* Format code with black and isort

* Restore cryptography 41
  • Loading branch information
pquentin committed May 24, 2024
1 parent b3a767f commit 6e7af24
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 147 deletions.
2 changes: 2 additions & 0 deletions lint-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ cryptography>=35.0.0
types-pyopenssl>=20.0.4
pytest>=6.2
idna>=3.2
black
isort
22 changes: 18 additions & 4 deletions lint-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
# pip-compile lint-requirements.in
#
black==24.2.0
# via -r lint-requirements.in
cffi==1.16.0
# via cryptography
click==8.1.7
# via black
cryptography==41.0.7
# via
# -r lint-requirements.in
# types-pyopenssl
idna==3.4
idna==3.6
# via -r lint-requirements.in
iniconfig==2.0.0
# via pytest
isort==5.13.2
# via -r lint-requirements.in
mypy==0.910
# via -r lint-requirements.in
mypy-extensions==0.4.4
# via mypy
# via
# black
# mypy
packaging==23.2
# via pytest
# via
# black
# pytest
pathspec==0.12.1
# via black
platformdirs==4.2.0
# via black
pluggy==1.4.0
# via pytest
pycparser==2.21
Expand Down
3 changes: 2 additions & 1 deletion lint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ python -m pip --version
python -m pip install -Ur lint-requirements.txt

# Linting

black --check src/trustme tests
isort --profile black src/trustme tests
mypy src/trustme tests
133 changes: 70 additions & 63 deletions src/trustme/__init__.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
from __future__ import annotations

import datetime
import ipaddress
import os
import ssl
from enum import Enum
from base64 import urlsafe_b64encode
from contextlib import contextmanager
from enum import Enum
from tempfile import NamedTemporaryFile
from typing import Generator, List, Optional, Union, TYPE_CHECKING
from typing import TYPE_CHECKING, Generator, List, Optional, Union

import idna

from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.serialization import (
PrivateFormat, NoEncryption
Encoding,
NoEncryption,
PrivateFormat,
load_pem_private_key,
)
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import load_pem_private_key

from ._version import __version__

if TYPE_CHECKING: # pragma: no cover
import OpenSSL.SSL

CERTIFICATE_PUBLIC_KEY_TYPES = Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]
CERTIFICATE_PRIVATE_KEY_TYPES = Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]

Expand All @@ -38,7 +40,11 @@
DEFAULT_NOT_BEFORE = datetime.datetime(2000, 1, 1)


def _name(name: str, organization_name: Optional[str] = None, common_name: Optional[str] = None) -> x509.Name:
def _name(
name: str,
organization_name: Optional[str] = None,
common_name: Optional[str] = None,
) -> x509.Name:
name_pieces = [
x509.NameAttribute(
NameOID.ORGANIZATION_NAME,
Expand All @@ -47,9 +53,7 @@ def _name(name: str, organization_name: Optional[str] = None, common_name: Optio
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, name),
]
if common_name is not None:
name_pieces.append(
x509.NameAttribute(NameOID.COMMON_NAME, common_name)
)
name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
return x509.Name(name_pieces)


Expand Down Expand Up @@ -138,16 +142,17 @@ class Blob:
`CA.cert_pem` or `LeafCert.private_key_and_cert_chain_pem`.
"""

def __init__(self, data: bytes) -> None:
self._data = data

def bytes(self) -> bytes:
"""Returns the data as a `bytes` object.
"""
"""Returns the data as a `bytes` object."""
return self._data

def write_to_path(self, path: Union[str, "os.PathLike[str]"], append: bool = False) -> None:
def write_to_path(
self, path: Union[str, "os.PathLike[str]"], append: bool = False
) -> None:
"""Writes the data to the file at the given path.
Args:
Expand Down Expand Up @@ -215,9 +220,7 @@ def _generate_key(self) -> CERTIFICATE_PRIVATE_KEY_TYPES:
# key_size needs to be a least 2048 to be accepted
# on Debian and pressumably other OSes

return rsa.generate_private_key(
public_exponent=65537, key_size=2048
)
return rsa.generate_private_key(public_exponent=65537, key_size=2048)
elif self is KeyType.ECDSA:
return ec.generate_private_key(ec.SECP256R1())
else: # pragma: no cover
Expand All @@ -226,6 +229,7 @@ def _generate_key(self) -> CERTIFICATE_PRIVATE_KEY_TYPES:

class CA:
"""A certificate authority."""

_certificate: x509.Certificate

def __init__(
Expand Down Expand Up @@ -276,8 +280,9 @@ def __init__(
key_cert_sign=True, # sign certs
crl_sign=True, # sign revocation lists
encipher_only=False,
decipher_only=False),
critical=True
decipher_only=False,
),
critical=True,
)
.sign(
private_key=sign_key,
Expand All @@ -297,11 +302,9 @@ def private_key_pem(self) -> Blob:
other certificates from this CA."""
return Blob(
self._private_key.private_bytes(
Encoding.PEM,
PrivateFormat.TraditionalOpenSSL,
NoEncryption()
)
Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()
)
)

def create_child_ca(self, key_type: KeyType = KeyType.ECDSA) -> "CA":
"""Creates a child certificate authority
Expand Down Expand Up @@ -378,15 +381,16 @@ def issue_cert(
"""
if not identities and common_name is None:
raise ValueError(
"Must specify at least one identity or common name"
)
raise ValueError("Must specify at least one identity or common name")

key = key_type._generate_key()

ski_ext = self._certificate.extensions.get_extension_for_class(
x509.SubjectKeyIdentifier)
aki = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(ski_ext.value)
x509.SubjectKeyIdentifier
)
aki = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(
ski_ext.value
)

cert = (
_cert_builder_common(
Expand Down Expand Up @@ -421,16 +425,19 @@ def issue_cert(
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False),
critical=True
decipher_only=False,
),
critical=True,
)
.add_extension(
x509.ExtendedKeyUsage([
ExtendedKeyUsageOID.CLIENT_AUTH,
ExtendedKeyUsageOID.SERVER_AUTH,
ExtendedKeyUsageOID.CODE_SIGNING,
]),
critical=True
x509.ExtendedKeyUsage(
[
ExtendedKeyUsageOID.CLIENT_AUTH,
ExtendedKeyUsageOID.SERVER_AUTH,
ExtendedKeyUsageOID.CODE_SIGNING,
]
),
critical=True,
)
.sign(
private_key=self._private_key,
Expand All @@ -445,14 +452,14 @@ def issue_cert(
ca = ca.parent_cert

return LeafCert(
key.private_bytes(
Encoding.PEM,
PrivateFormat.TraditionalOpenSSL,
NoEncryption(),
),
cert.public_bytes(Encoding.PEM),
chain_to_ca,
)
key.private_bytes(
Encoding.PEM,
PrivateFormat.TraditionalOpenSSL,
NoEncryption(),
),
cert.public_bytes(Encoding.PEM),
chain_to_ca,
)

# For backwards compatibility
issue_server_cert = issue_cert
Expand All @@ -466,18 +473,17 @@ def configure_trust(self, ctx: Union[ssl.SSLContext, OpenSSL.SSL.Context]) -> No
"""
if isinstance(ctx, ssl.SSLContext):
ctx.load_verify_locations(
cadata=self.cert_pem.bytes().decode("ascii"))
ctx.load_verify_locations(cadata=self.cert_pem.bytes().decode("ascii"))
elif _smells_like_pyopenssl(ctx):
from OpenSSL import crypto
cert = crypto.load_certificate(
crypto.FILETYPE_PEM, self.cert_pem.bytes())

cert = crypto.load_certificate(crypto.FILETYPE_PEM, self.cert_pem.bytes())
store = ctx.get_cert_store()
store.add_cert(cert)
else:
raise TypeError(
"unrecognized context type {!r}"
.format(ctx.__class__.__name__))
"unrecognized context type {!r}".format(ctx.__class__.__name__)
)

@classmethod
def from_pem(cls, cert_bytes: bytes, private_key_bytes: bytes) -> "CA":
Expand Down Expand Up @@ -517,12 +523,15 @@ class LeafCert:
cert chain.
"""
def __init__(self, private_key_pem: bytes, server_cert_pem: bytes, chain_to_ca: List[bytes]) -> None:

def __init__(
self, private_key_pem: bytes, server_cert_pem: bytes, chain_to_ca: List[bytes]
) -> None:
self.private_key_pem = Blob(private_key_pem)
self.cert_chain_pems = [
Blob(pem) for pem in [server_cert_pem] + chain_to_ca]
self.private_key_and_cert_chain_pem = (
Blob(private_key_pem + server_cert_pem + b''.join(chain_to_ca)))
self.cert_chain_pems = [Blob(pem) for pem in [server_cert_pem] + chain_to_ca]
self.private_key_and_cert_chain_pem = Blob(
private_key_pem + server_cert_pem + b"".join(chain_to_ca)
)

def configure_cert(self, ctx: Union[ssl.SSLContext, OpenSSL.SSL.Context]) -> None:
"""Configure the given context object to present this certificate.
Expand All @@ -537,18 +546,16 @@ def configure_cert(self, ctx: Union[ssl.SSLContext, OpenSSL.SSL.Context]) -> Non
with self.private_key_and_cert_chain_pem.tempfile() as path:
ctx.load_cert_chain(path)
elif _smells_like_pyopenssl(ctx):
from OpenSSL.crypto import (
load_privatekey, load_certificate, FILETYPE_PEM,
)
from OpenSSL.crypto import FILETYPE_PEM, load_certificate, load_privatekey

key = load_privatekey(FILETYPE_PEM, self.private_key_pem.bytes())
ctx.use_privatekey(key)
cert = load_certificate(FILETYPE_PEM,
self.cert_chain_pems[0].bytes())
cert = load_certificate(FILETYPE_PEM, self.cert_chain_pems[0].bytes())
ctx.use_certificate(cert)
for pem in self.cert_chain_pems[1:]:
cert = load_certificate(FILETYPE_PEM, pem.bytes())
ctx.add_extra_chain_cert(cert)
else:
raise TypeError(
"unrecognized context type {!r}"
.format(ctx.__class__.__name__))
"unrecognized context type {!r}".format(ctx.__class__.__name__)
)
19 changes: 13 additions & 6 deletions src/trustme/_cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import argparse
import os
import trustme
import sys
from typing import List, Optional
from datetime import datetime
from typing import List, Optional

import trustme

# ISO 8601
DATE_FORMAT = '%Y-%m-%d'
DATE_FORMAT = "%Y-%m-%d"


def main(argv: Optional[List[str]] = None) -> None:
if argv is None:
Expand Down Expand Up @@ -38,7 +39,7 @@ def main(argv: Optional[List[str]] = None) -> None:
"--expires-on",
default=None,
help="Set the date the certificate will expire on (in YYYY-MM-DD format).",
metavar='YYYY-MM-DD',
metavar="YYYY-MM-DD",
)
parser.add_argument(
"-q",
Expand All @@ -57,7 +58,11 @@ def main(argv: Optional[List[str]] = None) -> None:
cert_dir = args.dir
identities = [str(identity) for identity in args.identities]
common_name = str(args.common_name[0]) if args.common_name else None
expires_on = None if args.expires_on is None else datetime.strptime(args.expires_on, DATE_FORMAT)
expires_on = (
None
if args.expires_on is None
else datetime.strptime(args.expires_on, DATE_FORMAT)
)
quiet = args.quiet
key_type = trustme.KeyType[args.key_type]

Expand All @@ -68,7 +73,9 @@ def main(argv: Optional[List[str]] = None) -> None:

# Generate the CA certificate
ca = trustme.CA(key_type=key_type)
cert = ca.issue_cert(*identities, common_name=common_name, not_after=expires_on, key_type=key_type)
cert = ca.issue_cert(
*identities, common_name=common_name, not_after=expires_on, key_type=key_type
)

# Write the certificate and private key the server should use
server_key = os.path.join(cert_dir, "server.key")
Expand Down
Loading

0 comments on commit 6e7af24

Please sign in to comment.