Skip to content

LDSignature

A class for signing and verifying Linked Data signatures using the RSA signature algorithm.

Attributes:

Name Type Description
private_key RSAPrivateKey

The RSA private key used for signing.

public_key RSAPublicKey

The corresponding RSA public key.

Methods:

Name Description
sign

dict, creator: str, private_key: rsa.RSAPrivateKey, options: dict = None, created: datetime.datetime = None) -> dict: Signs the provided document using the specified RSA private key.

verify

dict, public_key: rsa.RSAPublicKey | str) -> bool: Verifies the signature of the provided document against the given public key.

Source code in libs/apsig/src/apsig/ld_signature.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class LDSignature:
    """A class for signing and verifying Linked Data signatures using the RSA signature algorithm. 

    Attributes:
        private_key (rsa.RSAPrivateKey): The RSA private key used for signing.
        public_key (rsa.RSAPublicKey): The corresponding RSA public key.

    Methods:
        sign(doc: dict, creator: str, private_key: rsa.RSAPrivateKey, options: dict = None, created: datetime.datetime = None) -> dict:
            Signs the provided document using the specified RSA private key.

        verify(doc: dict, public_key: rsa.RSAPublicKey | str) -> bool:
            Verifies the signature of the provided document against the given public key.
    """

    def __init__(self):
        pass

    def __normalized_hash(self, data):
        norm_form = jsonld.normalize(
            data, {"algorithm": "URDNA2015", "format": "application/n-quads"}
        )
        digest = hashes.Hash(hashes.SHA256())
        digest.update(norm_form.encode("utf8"))
        return digest.finalize().hex().encode("ascii")

    def sign(
        self,
        doc: dict,
        creator: str,
        private_key: rsa.RSAPrivateKey,
        options: dict = None,
        created: datetime.datetime = None,
    ):
        """Signs the provided document using the specified RSA private key.

        Args:
            doc (dict): The document to be signed.
            creator (str): The identifier of the creator of the document.
            private_key (rsa.RSAPrivateKey): The RSA private key used for signing.
            options (dict, optional): Additional signing options. Defaults to None.
            created (datetime.datetime, optional): The timestamp when the signature is created. 
                Defaults to the current UTC time if not provided.

        Returns:
            dict: The signed document containing the original data and the signature.
        """
        options: dict[str, str] = {
            "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld", # "https://w3id.org/identity/v1"
            "creator": creator,
            "created": created or utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
        }

        to_be_signed = self.__normalized_hash(options) + self.__normalized_hash(doc)

        signature = base64.b64encode(private_key.sign(
            to_be_signed, padding.PKCS1v15(), hashes.SHA256()
        ))

        return {
            **doc,
            "signature": {
                **options,
                "type": "RsaSignature2017",
                "signatureValue": signature.decode("ascii"),
            },
        }

    def verify(self, doc: dict, public_key: rsa.RSAPublicKey | str, raise_on_fail: bool = False) -> Union[str, None]:
        """Verifies the signature of the provided document against the given public key.

        Args:
            doc (dict): The signed document to verify.
            public_key (rsa.RSAPublicKey | str): The RSA public key in PEM format or as a multibase-encoded string.

        Returns:
            bool: True if the signature is valid; otherwise, an exception is raised.

        Raises:
            MissingSignature: If the signature section is missing in the document.
            UnknownSignature: If the signature type is not recognized.
            VerificationFailed: If the signature verification fails.
        """
        if isinstance(public_key, str):
            codec, data = multicodec.unwrap(multibase.decode(public_key))
            if codec.name != "rsa-pub":
                if raise_on_fail:
                    raise ValueError("public_key must be RSA PublicKey.")
                return None
            public_key = serialization.load_pem_public_key(data, backend=default_backend())
        try:
            document = doc.copy()
            signature = document.pop("signature")
            options = {
                "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld",
                "creator": signature["creator"],
                "created": signature["created"],
            }
        except KeyError:
            if raise_on_fail:
                raise MissingSignature("Invalid signature section")
            return None
        if signature["type"].lower() != "rsasignature2017":
            if raise_on_fail:
                raise UnknownSignature("Unknown signature type")
            return None
        final_hash = self.__normalized_hash(options) + self.__normalized_hash(document)
        try:
            public_key.verify(
                base64.b64decode(signature["signatureValue"]),
                final_hash,
                padding.PKCS1v15(),
                hashes.SHA256(),
            )
            return signature["creator"]
        except InvalidSignature:
            if raise_on_fail:
                raise VerificationFailed("LDSignature mismatch")
            return None

sign(doc, creator, private_key, options=None, created=None)

Signs the provided document using the specified RSA private key.

Parameters:

Name Type Description Default
doc dict

The document to be signed.

required
creator str

The identifier of the creator of the document.

required
private_key RSAPrivateKey

The RSA private key used for signing.

required
options dict

Additional signing options. Defaults to None.

None
created datetime

The timestamp when the signature is created. Defaults to the current UTC time if not provided.

None

Returns:

Name Type Description
dict

The signed document containing the original data and the signature.

Source code in libs/apsig/src/apsig/ld_signature.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def sign(
    self,
    doc: dict,
    creator: str,
    private_key: rsa.RSAPrivateKey,
    options: dict = None,
    created: datetime.datetime = None,
):
    """Signs the provided document using the specified RSA private key.

    Args:
        doc (dict): The document to be signed.
        creator (str): The identifier of the creator of the document.
        private_key (rsa.RSAPrivateKey): The RSA private key used for signing.
        options (dict, optional): Additional signing options. Defaults to None.
        created (datetime.datetime, optional): The timestamp when the signature is created. 
            Defaults to the current UTC time if not provided.

    Returns:
        dict: The signed document containing the original data and the signature.
    """
    options: dict[str, str] = {
        "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld", # "https://w3id.org/identity/v1"
        "creator": creator,
        "created": created or utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
    }

    to_be_signed = self.__normalized_hash(options) + self.__normalized_hash(doc)

    signature = base64.b64encode(private_key.sign(
        to_be_signed, padding.PKCS1v15(), hashes.SHA256()
    ))

    return {
        **doc,
        "signature": {
            **options,
            "type": "RsaSignature2017",
            "signatureValue": signature.decode("ascii"),
        },
    }

verify(doc, public_key, raise_on_fail=False)

Verifies the signature of the provided document against the given public key.

Parameters:

Name Type Description Default
doc dict

The signed document to verify.

required
public_key RSAPublicKey | str

The RSA public key in PEM format or as a multibase-encoded string.

required

Returns:

Name Type Description
bool Union[str, None]

True if the signature is valid; otherwise, an exception is raised.

Raises:

Type Description
MissingSignature

If the signature section is missing in the document.

UnknownSignature

If the signature type is not recognized.

VerificationFailed

If the signature verification fails.

Source code in libs/apsig/src/apsig/ld_signature.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def verify(self, doc: dict, public_key: rsa.RSAPublicKey | str, raise_on_fail: bool = False) -> Union[str, None]:
    """Verifies the signature of the provided document against the given public key.

    Args:
        doc (dict): The signed document to verify.
        public_key (rsa.RSAPublicKey | str): The RSA public key in PEM format or as a multibase-encoded string.

    Returns:
        bool: True if the signature is valid; otherwise, an exception is raised.

    Raises:
        MissingSignature: If the signature section is missing in the document.
        UnknownSignature: If the signature type is not recognized.
        VerificationFailed: If the signature verification fails.
    """
    if isinstance(public_key, str):
        codec, data = multicodec.unwrap(multibase.decode(public_key))
        if codec.name != "rsa-pub":
            if raise_on_fail:
                raise ValueError("public_key must be RSA PublicKey.")
            return None
        public_key = serialization.load_pem_public_key(data, backend=default_backend())
    try:
        document = doc.copy()
        signature = document.pop("signature")
        options = {
            "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld",
            "creator": signature["creator"],
            "created": signature["created"],
        }
    except KeyError:
        if raise_on_fail:
            raise MissingSignature("Invalid signature section")
        return None
    if signature["type"].lower() != "rsasignature2017":
        if raise_on_fail:
            raise UnknownSignature("Unknown signature type")
        return None
    final_hash = self.__normalized_hash(options) + self.__normalized_hash(document)
    try:
        public_key.verify(
            base64.b64decode(signature["signatureValue"]),
            final_hash,
            padding.PKCS1v15(),
            hashes.SHA256(),
        )
        return signature["creator"]
    except InvalidSignature:
        if raise_on_fail:
            raise VerificationFailed("LDSignature mismatch")
        return None

draftVerifier

Source code in libs/apsig/src/apsig/draft/verify.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class draftVerifier:
    @staticmethod
    @deprecated(
        "apsig.draft.verify.draftVerifier is deprecated; use apsig.draft.verify.Verifier instead. This will be removed in apsig 1.0."
    )
    def verify(
        public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
    ) -> tuple[bool, str]:
        """Verifies the digital signature of an HTTP request.

        Args:
            public_pem (str): The public key in PEM format used to verify the signature.
            method (str): The HTTP method (e.g., "GET", "POST").
            url (str): The URL of the request.
            headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
            body (bytes, optional): The request body. Defaults to an empty byte string.

        Returns:
            tuple: A tuple containing:
                - bool: True if the signature is valid, False otherwise.
                - str: A message indicating the result of the verification.

        Raises:
            ValueError: If the signature header is missing or if the algorithm is unsupported.
        """
        try:
            result = Verifier(
                public_pem=public_pem, method=method, url=url, headers=headers, body=body
            ).verify(raise_on_fail=True)
        except Exception as e:
            return False, str(e)
        if result:
            return True, "Signature is valid"

verify(public_pem, method, url, headers, body=b'') staticmethod

Verifies the digital signature of an HTTP request.

Parameters:

Name Type Description Default
public_pem str

The public key in PEM format used to verify the signature.

required
method str

The HTTP method (e.g., "GET", "POST").

required
url str

The URL of the request.

required
headers dict

A dictionary of HTTP headers, including the signature and other relevant information.

required
body bytes

The request body. Defaults to an empty byte string.

b''

Returns:

Name Type Description
tuple tuple[bool, str]

A tuple containing: - bool: True if the signature is valid, False otherwise. - str: A message indicating the result of the verification.

Raises:

Type Description
ValueError

If the signature header is missing or if the algorithm is unsupported.

Source code in libs/apsig/src/apsig/draft/verify.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@staticmethod
@deprecated(
    "apsig.draft.verify.draftVerifier is deprecated; use apsig.draft.verify.Verifier instead. This will be removed in apsig 1.0."
)
def verify(
    public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
) -> tuple[bool, str]:
    """Verifies the digital signature of an HTTP request.

    Args:
        public_pem (str): The public key in PEM format used to verify the signature.
        method (str): The HTTP method (e.g., "GET", "POST").
        url (str): The URL of the request.
        headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
        body (bytes, optional): The request body. Defaults to an empty byte string.

    Returns:
        tuple: A tuple containing:
            - bool: True if the signature is valid, False otherwise.
            - str: A message indicating the result of the verification.

    Raises:
        ValueError: If the signature header is missing or if the algorithm is unsupported.
    """
    try:
        result = Verifier(
            public_pem=public_pem, method=method, url=url, headers=headers, body=body
        ).verify(raise_on_fail=True)
    except Exception as e:
        return False, str(e)
    if result:
        return True, "Signature is valid"

draft

Signer

Source code in libs/apsig/src/apsig/draft/sign.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Signer:
    def __init__(self, headers: dict[Any, Any], private_key: rsa.RSAPrivateKey, method: str, url: str, key_id: str, body: bytes=b"") -> None:
        """Signs an HTTP request with a digital signature.

        Args:
            private_key (rsa.RSAPrivateKey): The RSA private key used to sign the request.
            method (str): The HTTP method (e.g., "GET", "POST").
            url (str): The URL of the request.
            headers (dict): A dictionary of HTTP headers that will be signed.
            key_id (str): The key identifier to include in the signature header.
            body (bytes, optional): The request body. Defaults to an empty byte string.

        Returns:
            dict: The HTTP headers with the signature added.

        Raises:
            ValueError: If the signing process fails due to invalid parameters.
        """
        if not headers.get("date") and not headers.get("Date"):
            headers["date"] = email.utils.formatdate(usegmt=True)
        self.parsed_url: ParseResult = urlparse(url)
        self.headers = {
            **headers,
            "(request-target)": f"{method.lower()} {self.parsed_url.path}"
        }
        self.private_key = private_key
        self.method = method
        self.url = url
        self.key_id = key_id
        self.body = body

        if not self.headers.get("Host"):
            self.headers["Host"] = self.parsed_url.netloc

        self.__generate_digest(self.body)

    def __generate_sign_header(self, signature: str):
        self.headers["Signature"] = signature
        self.headers["Authorization"] = f"Signature {signature}"

    def __sign_document(self, document: bytes):
        return base64.standard_b64encode(self.private_key.sign(document, padding.PKCS1v15(), hashes.SHA256())).decode("utf-8")

    def __generate_digest(self, body: bytes | str):
        if not self.headers.get("digest") and not self.headers.get("Digest"):
            self.headers["digest"] = calculate_digest(body)
        else:
            return self.headers.get("digest")

    def build_signature(self, key_id: str, signature: str, algorithm: str = "rsa-sha256"):
        if algorithm != "rsa-sha256":
            raise NotImplementedError(f"Unsuppored algorithm: {algorithm}")

        return ",".join([
            f'keyId="{key_id}"',
            f'algorithm="{algorithm}"',
            f'headers="{" ".join(key.lower() for key in self.headers.keys())}"',
            f'signature="{signature}"'
        ])

    def sign(self) -> dict:
        signature_string = build_string(self.headers).encode("utf-8")
        signature = self.__sign_document(signature_string)
        signed = self.build_signature(self.key_id, signature)
        self.__generate_sign_header(signed)

        headers = self.headers.copy()
        headers.pop("(request-target)")

        return headers

__init__(headers, private_key, method, url, key_id, body=b'')

Signs an HTTP request with a digital signature.

Parameters:

Name Type Description Default
private_key RSAPrivateKey

The RSA private key used to sign the request.

required
method str

The HTTP method (e.g., "GET", "POST").

required
url str

The URL of the request.

required
headers dict

A dictionary of HTTP headers that will be signed.

required
key_id str

The key identifier to include in the signature header.

required
body bytes

The request body. Defaults to an empty byte string.

b''

Returns:

Name Type Description
dict None

The HTTP headers with the signature added.

Raises:

Type Description
ValueError

If the signing process fails due to invalid parameters.

Source code in libs/apsig/src/apsig/draft/sign.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, headers: dict[Any, Any], private_key: rsa.RSAPrivateKey, method: str, url: str, key_id: str, body: bytes=b"") -> None:
    """Signs an HTTP request with a digital signature.

    Args:
        private_key (rsa.RSAPrivateKey): The RSA private key used to sign the request.
        method (str): The HTTP method (e.g., "GET", "POST").
        url (str): The URL of the request.
        headers (dict): A dictionary of HTTP headers that will be signed.
        key_id (str): The key identifier to include in the signature header.
        body (bytes, optional): The request body. Defaults to an empty byte string.

    Returns:
        dict: The HTTP headers with the signature added.

    Raises:
        ValueError: If the signing process fails due to invalid parameters.
    """
    if not headers.get("date") and not headers.get("Date"):
        headers["date"] = email.utils.formatdate(usegmt=True)
    self.parsed_url: ParseResult = urlparse(url)
    self.headers = {
        **headers,
        "(request-target)": f"{method.lower()} {self.parsed_url.path}"
    }
    self.private_key = private_key
    self.method = method
    self.url = url
    self.key_id = key_id
    self.body = body

    if not self.headers.get("Host"):
        self.headers["Host"] = self.parsed_url.netloc

    self.__generate_digest(self.body)

Verifier

Source code in libs/apsig/src/apsig/draft/verify.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class Verifier:
    def __init__(
        self, public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
    ) -> None:
        """
        Args:
            public_pem (str): The public key in PEM format used to verify the signature.
            method (str): The HTTP method (e.g., "GET", "POST").
            url (str): The URL of the request.
            headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
            body (bytes, optional): The request body. Defaults to an empty byte string.
        """
        self.public_pem = public_pem
        self.method = method
        self.url = url
        self.headers_raw = headers
        self.headers = {key.lower(): value for key, value in headers.items()}
        self.body = body

    def __decode_sign(self, signature):
        return base64.standard_b64decode(signature)

    def verify(self, raise_on_fail: bool = False) -> Union[str, None]:
        """Verifies the digital signature of an HTTP request.

        Args:
            raise_on_fail (bool, optional): Return error on failure. defaults to False.

        Returns:

        Raises:
            ValueError: If the signature header is missing or if the algorithm is unsupported.
        """
        headers = self.headers.copy()

        signature_header = headers.get("signature")
        if not signature_header:
            if raise_on_fail:
                raise MissingSignature(
                    "Signature header is missing"
                )
            return None

        signature_parts = {}
        for item in signature_header.split(","):
            key, value = item.split("=", 1)
            signature_parts[key.strip()] = value.strip().strip('"')

        signature = self.__decode_sign(signature_parts["signature"])
        key_id = signature_parts["keyId"]
        algorithm = signature_parts["algorithm"]

        if algorithm != "rsa-sha256":
            if raise_on_fail:
                raise UnknownSignature(
                    f"Unsupported algorithm. Algorithm must be rsa-sha256, but passed {algorithm}."
                )
            return None

        signed_headers = signature_parts["headers"].split()

        parsed_url = urlparse(self.url)

        signature_headers = headers.copy()
        signature_headers["(request-target)"] = (
            f"{self.method.lower()} {parsed_url.path}"
        )
        signature_string = build_string(
            signature_headers, headers=signed_headers
        ).encode("utf-8")

        public_key = serialization.load_pem_public_key(
            self.public_pem.encode("utf-8"), backend=default_backend()
        )

        try:
            public_key.verify(
                signature, signature_string, padding.PKCS1v15(), hashes.SHA256()
            )
        except InvalidSignature:
            if raise_on_fail:
                raise VerificationFailed(
                    "Invalid signature"
                )
            return None

        expected_digest = calculate_digest(self.body)
        if headers.get("digest") != expected_digest:
            if raise_on_fail:
                raise VerificationFailed(
                    "Digest mismatch"
                )
            return None

        date_header = headers.get("date")
        if date_header:
            date = datetime.datetime.datetime.strptime(
                date_header, "%a, %d %b %Y %H:%M:%S GMT"
            )
            gmt_tz = pytz.timezone('GMT')
            gmt_time = gmt_tz.localize(date)
            request_time = gmt_time.astimezone(pytz.utc)
            current_time = datetime.utcnow()
            if abs((current_time - request_time).total_seconds()) > 3600:
                if raise_on_fail:
                    raise VerificationFailed(
                        "Date header is too far from current time"
                    )
                return None

        return key_id

__init__(public_pem, method, url, headers, body=b'')

Parameters:

Name Type Description Default
public_pem str

The public key in PEM format used to verify the signature.

required
method str

The HTTP method (e.g., "GET", "POST").

required
url str

The URL of the request.

required
headers dict

A dictionary of HTTP headers, including the signature and other relevant information.

required
body bytes

The request body. Defaults to an empty byte string.

b''
Source code in libs/apsig/src/apsig/draft/verify.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self, public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
) -> None:
    """
    Args:
        public_pem (str): The public key in PEM format used to verify the signature.
        method (str): The HTTP method (e.g., "GET", "POST").
        url (str): The URL of the request.
        headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
        body (bytes, optional): The request body. Defaults to an empty byte string.
    """
    self.public_pem = public_pem
    self.method = method
    self.url = url
    self.headers_raw = headers
    self.headers = {key.lower(): value for key, value in headers.items()}
    self.body = body

verify(raise_on_fail=False)

Verifies the digital signature of an HTTP request.

Parameters:

Name Type Description Default
raise_on_fail bool

Return error on failure. defaults to False.

False

Returns:

Raises:

Type Description
ValueError

If the signature header is missing or if the algorithm is unsupported.

Source code in libs/apsig/src/apsig/draft/verify.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def verify(self, raise_on_fail: bool = False) -> Union[str, None]:
    """Verifies the digital signature of an HTTP request.

    Args:
        raise_on_fail (bool, optional): Return error on failure. defaults to False.

    Returns:

    Raises:
        ValueError: If the signature header is missing or if the algorithm is unsupported.
    """
    headers = self.headers.copy()

    signature_header = headers.get("signature")
    if not signature_header:
        if raise_on_fail:
            raise MissingSignature(
                "Signature header is missing"
            )
        return None

    signature_parts = {}
    for item in signature_header.split(","):
        key, value = item.split("=", 1)
        signature_parts[key.strip()] = value.strip().strip('"')

    signature = self.__decode_sign(signature_parts["signature"])
    key_id = signature_parts["keyId"]
    algorithm = signature_parts["algorithm"]

    if algorithm != "rsa-sha256":
        if raise_on_fail:
            raise UnknownSignature(
                f"Unsupported algorithm. Algorithm must be rsa-sha256, but passed {algorithm}."
            )
        return None

    signed_headers = signature_parts["headers"].split()

    parsed_url = urlparse(self.url)

    signature_headers = headers.copy()
    signature_headers["(request-target)"] = (
        f"{self.method.lower()} {parsed_url.path}"
    )
    signature_string = build_string(
        signature_headers, headers=signed_headers
    ).encode("utf-8")

    public_key = serialization.load_pem_public_key(
        self.public_pem.encode("utf-8"), backend=default_backend()
    )

    try:
        public_key.verify(
            signature, signature_string, padding.PKCS1v15(), hashes.SHA256()
        )
    except InvalidSignature:
        if raise_on_fail:
            raise VerificationFailed(
                "Invalid signature"
            )
        return None

    expected_digest = calculate_digest(self.body)
    if headers.get("digest") != expected_digest:
        if raise_on_fail:
            raise VerificationFailed(
                "Digest mismatch"
            )
        return None

    date_header = headers.get("date")
    if date_header:
        date = datetime.datetime.datetime.strptime(
            date_header, "%a, %d %b %Y %H:%M:%S GMT"
        )
        gmt_tz = pytz.timezone('GMT')
        gmt_time = gmt_tz.localize(date)
        request_time = gmt_time.astimezone(pytz.utc)
        current_time = datetime.utcnow()
        if abs((current_time - request_time).total_seconds()) > 3600:
            if raise_on_fail:
                raise VerificationFailed(
                    "Date header is too far from current time"
                )
            return None

    return key_id

sign

Signer

Source code in libs/apsig/src/apsig/draft/sign.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Signer:
    def __init__(self, headers: dict[Any, Any], private_key: rsa.RSAPrivateKey, method: str, url: str, key_id: str, body: bytes=b"") -> None:
        """Signs an HTTP request with a digital signature.

        Args:
            private_key (rsa.RSAPrivateKey): The RSA private key used to sign the request.
            method (str): The HTTP method (e.g., "GET", "POST").
            url (str): The URL of the request.
            headers (dict): A dictionary of HTTP headers that will be signed.
            key_id (str): The key identifier to include in the signature header.
            body (bytes, optional): The request body. Defaults to an empty byte string.

        Returns:
            dict: The HTTP headers with the signature added.

        Raises:
            ValueError: If the signing process fails due to invalid parameters.
        """
        if not headers.get("date") and not headers.get("Date"):
            headers["date"] = email.utils.formatdate(usegmt=True)
        self.parsed_url: ParseResult = urlparse(url)
        self.headers = {
            **headers,
            "(request-target)": f"{method.lower()} {self.parsed_url.path}"
        }
        self.private_key = private_key
        self.method = method
        self.url = url
        self.key_id = key_id
        self.body = body

        if not self.headers.get("Host"):
            self.headers["Host"] = self.parsed_url.netloc

        self.__generate_digest(self.body)

    def __generate_sign_header(self, signature: str):
        self.headers["Signature"] = signature
        self.headers["Authorization"] = f"Signature {signature}"

    def __sign_document(self, document: bytes):
        return base64.standard_b64encode(self.private_key.sign(document, padding.PKCS1v15(), hashes.SHA256())).decode("utf-8")

    def __generate_digest(self, body: bytes | str):
        if not self.headers.get("digest") and not self.headers.get("Digest"):
            self.headers["digest"] = calculate_digest(body)
        else:
            return self.headers.get("digest")

    def build_signature(self, key_id: str, signature: str, algorithm: str = "rsa-sha256"):
        if algorithm != "rsa-sha256":
            raise NotImplementedError(f"Unsuppored algorithm: {algorithm}")

        return ",".join([
            f'keyId="{key_id}"',
            f'algorithm="{algorithm}"',
            f'headers="{" ".join(key.lower() for key in self.headers.keys())}"',
            f'signature="{signature}"'
        ])

    def sign(self) -> dict:
        signature_string = build_string(self.headers).encode("utf-8")
        signature = self.__sign_document(signature_string)
        signed = self.build_signature(self.key_id, signature)
        self.__generate_sign_header(signed)

        headers = self.headers.copy()
        headers.pop("(request-target)")

        return headers

__init__(headers, private_key, method, url, key_id, body=b'')

Signs an HTTP request with a digital signature.

Parameters:

Name Type Description Default
private_key RSAPrivateKey

The RSA private key used to sign the request.

required
method str

The HTTP method (e.g., "GET", "POST").

required
url str

The URL of the request.

required
headers dict

A dictionary of HTTP headers that will be signed.

required
key_id str

The key identifier to include in the signature header.

required
body bytes

The request body. Defaults to an empty byte string.

b''

Returns:

Name Type Description
dict None

The HTTP headers with the signature added.

Raises:

Type Description
ValueError

If the signing process fails due to invalid parameters.

Source code in libs/apsig/src/apsig/draft/sign.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, headers: dict[Any, Any], private_key: rsa.RSAPrivateKey, method: str, url: str, key_id: str, body: bytes=b"") -> None:
    """Signs an HTTP request with a digital signature.

    Args:
        private_key (rsa.RSAPrivateKey): The RSA private key used to sign the request.
        method (str): The HTTP method (e.g., "GET", "POST").
        url (str): The URL of the request.
        headers (dict): A dictionary of HTTP headers that will be signed.
        key_id (str): The key identifier to include in the signature header.
        body (bytes, optional): The request body. Defaults to an empty byte string.

    Returns:
        dict: The HTTP headers with the signature added.

    Raises:
        ValueError: If the signing process fails due to invalid parameters.
    """
    if not headers.get("date") and not headers.get("Date"):
        headers["date"] = email.utils.formatdate(usegmt=True)
    self.parsed_url: ParseResult = urlparse(url)
    self.headers = {
        **headers,
        "(request-target)": f"{method.lower()} {self.parsed_url.path}"
    }
    self.private_key = private_key
    self.method = method
    self.url = url
    self.key_id = key_id
    self.body = body

    if not self.headers.get("Host"):
        self.headers["Host"] = self.parsed_url.netloc

    self.__generate_digest(self.body)

verify

Verifier

Source code in libs/apsig/src/apsig/draft/verify.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class Verifier:
    def __init__(
        self, public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
    ) -> None:
        """
        Args:
            public_pem (str): The public key in PEM format used to verify the signature.
            method (str): The HTTP method (e.g., "GET", "POST").
            url (str): The URL of the request.
            headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
            body (bytes, optional): The request body. Defaults to an empty byte string.
        """
        self.public_pem = public_pem
        self.method = method
        self.url = url
        self.headers_raw = headers
        self.headers = {key.lower(): value for key, value in headers.items()}
        self.body = body

    def __decode_sign(self, signature):
        return base64.standard_b64decode(signature)

    def verify(self, raise_on_fail: bool = False) -> Union[str, None]:
        """Verifies the digital signature of an HTTP request.

        Args:
            raise_on_fail (bool, optional): Return error on failure. defaults to False.

        Returns:

        Raises:
            ValueError: If the signature header is missing or if the algorithm is unsupported.
        """
        headers = self.headers.copy()

        signature_header = headers.get("signature")
        if not signature_header:
            if raise_on_fail:
                raise MissingSignature(
                    "Signature header is missing"
                )
            return None

        signature_parts = {}
        for item in signature_header.split(","):
            key, value = item.split("=", 1)
            signature_parts[key.strip()] = value.strip().strip('"')

        signature = self.__decode_sign(signature_parts["signature"])
        key_id = signature_parts["keyId"]
        algorithm = signature_parts["algorithm"]

        if algorithm != "rsa-sha256":
            if raise_on_fail:
                raise UnknownSignature(
                    f"Unsupported algorithm. Algorithm must be rsa-sha256, but passed {algorithm}."
                )
            return None

        signed_headers = signature_parts["headers"].split()

        parsed_url = urlparse(self.url)

        signature_headers = headers.copy()
        signature_headers["(request-target)"] = (
            f"{self.method.lower()} {parsed_url.path}"
        )
        signature_string = build_string(
            signature_headers, headers=signed_headers
        ).encode("utf-8")

        public_key = serialization.load_pem_public_key(
            self.public_pem.encode("utf-8"), backend=default_backend()
        )

        try:
            public_key.verify(
                signature, signature_string, padding.PKCS1v15(), hashes.SHA256()
            )
        except InvalidSignature:
            if raise_on_fail:
                raise VerificationFailed(
                    "Invalid signature"
                )
            return None

        expected_digest = calculate_digest(self.body)
        if headers.get("digest") != expected_digest:
            if raise_on_fail:
                raise VerificationFailed(
                    "Digest mismatch"
                )
            return None

        date_header = headers.get("date")
        if date_header:
            date = datetime.datetime.datetime.strptime(
                date_header, "%a, %d %b %Y %H:%M:%S GMT"
            )
            gmt_tz = pytz.timezone('GMT')
            gmt_time = gmt_tz.localize(date)
            request_time = gmt_time.astimezone(pytz.utc)
            current_time = datetime.utcnow()
            if abs((current_time - request_time).total_seconds()) > 3600:
                if raise_on_fail:
                    raise VerificationFailed(
                        "Date header is too far from current time"
                    )
                return None

        return key_id

__init__(public_pem, method, url, headers, body=b'')

Parameters:

Name Type Description Default
public_pem str

The public key in PEM format used to verify the signature.

required
method str

The HTTP method (e.g., "GET", "POST").

required
url str

The URL of the request.

required
headers dict

A dictionary of HTTP headers, including the signature and other relevant information.

required
body bytes

The request body. Defaults to an empty byte string.

b''
Source code in libs/apsig/src/apsig/draft/verify.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self, public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
) -> None:
    """
    Args:
        public_pem (str): The public key in PEM format used to verify the signature.
        method (str): The HTTP method (e.g., "GET", "POST").
        url (str): The URL of the request.
        headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
        body (bytes, optional): The request body. Defaults to an empty byte string.
    """
    self.public_pem = public_pem
    self.method = method
    self.url = url
    self.headers_raw = headers
    self.headers = {key.lower(): value for key, value in headers.items()}
    self.body = body

verify(raise_on_fail=False)

Verifies the digital signature of an HTTP request.

Parameters:

Name Type Description Default
raise_on_fail bool

Return error on failure. defaults to False.

False

Returns:

Raises:

Type Description
ValueError

If the signature header is missing or if the algorithm is unsupported.

Source code in libs/apsig/src/apsig/draft/verify.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def verify(self, raise_on_fail: bool = False) -> Union[str, None]:
    """Verifies the digital signature of an HTTP request.

    Args:
        raise_on_fail (bool, optional): Return error on failure. defaults to False.

    Returns:

    Raises:
        ValueError: If the signature header is missing or if the algorithm is unsupported.
    """
    headers = self.headers.copy()

    signature_header = headers.get("signature")
    if not signature_header:
        if raise_on_fail:
            raise MissingSignature(
                "Signature header is missing"
            )
        return None

    signature_parts = {}
    for item in signature_header.split(","):
        key, value = item.split("=", 1)
        signature_parts[key.strip()] = value.strip().strip('"')

    signature = self.__decode_sign(signature_parts["signature"])
    key_id = signature_parts["keyId"]
    algorithm = signature_parts["algorithm"]

    if algorithm != "rsa-sha256":
        if raise_on_fail:
            raise UnknownSignature(
                f"Unsupported algorithm. Algorithm must be rsa-sha256, but passed {algorithm}."
            )
        return None

    signed_headers = signature_parts["headers"].split()

    parsed_url = urlparse(self.url)

    signature_headers = headers.copy()
    signature_headers["(request-target)"] = (
        f"{self.method.lower()} {parsed_url.path}"
    )
    signature_string = build_string(
        signature_headers, headers=signed_headers
    ).encode("utf-8")

    public_key = serialization.load_pem_public_key(
        self.public_pem.encode("utf-8"), backend=default_backend()
    )

    try:
        public_key.verify(
            signature, signature_string, padding.PKCS1v15(), hashes.SHA256()
        )
    except InvalidSignature:
        if raise_on_fail:
            raise VerificationFailed(
                "Invalid signature"
            )
        return None

    expected_digest = calculate_digest(self.body)
    if headers.get("digest") != expected_digest:
        if raise_on_fail:
            raise VerificationFailed(
                "Digest mismatch"
            )
        return None

    date_header = headers.get("date")
    if date_header:
        date = datetime.datetime.datetime.strptime(
            date_header, "%a, %d %b %Y %H:%M:%S GMT"
        )
        gmt_tz = pytz.timezone('GMT')
        gmt_time = gmt_tz.localize(date)
        request_time = gmt_time.astimezone(pytz.utc)
        current_time = datetime.utcnow()
        if abs((current_time - request_time).total_seconds()) > 3600:
            if raise_on_fail:
                raise VerificationFailed(
                    "Date header is too far from current time"
                )
            return None

    return key_id

draftVerifier

Source code in libs/apsig/src/apsig/draft/verify.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class draftVerifier:
    @staticmethod
    @deprecated(
        "apsig.draft.verify.draftVerifier is deprecated; use apsig.draft.verify.Verifier instead. This will be removed in apsig 1.0."
    )
    def verify(
        public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
    ) -> tuple[bool, str]:
        """Verifies the digital signature of an HTTP request.

        Args:
            public_pem (str): The public key in PEM format used to verify the signature.
            method (str): The HTTP method (e.g., "GET", "POST").
            url (str): The URL of the request.
            headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
            body (bytes, optional): The request body. Defaults to an empty byte string.

        Returns:
            tuple: A tuple containing:
                - bool: True if the signature is valid, False otherwise.
                - str: A message indicating the result of the verification.

        Raises:
            ValueError: If the signature header is missing or if the algorithm is unsupported.
        """
        try:
            result = Verifier(
                public_pem=public_pem, method=method, url=url, headers=headers, body=body
            ).verify(raise_on_fail=True)
        except Exception as e:
            return False, str(e)
        if result:
            return True, "Signature is valid"

verify(public_pem, method, url, headers, body=b'') staticmethod

Verifies the digital signature of an HTTP request.

Parameters:

Name Type Description Default
public_pem str

The public key in PEM format used to verify the signature.

required
method str

The HTTP method (e.g., "GET", "POST").

required
url str

The URL of the request.

required
headers dict

A dictionary of HTTP headers, including the signature and other relevant information.

required
body bytes

The request body. Defaults to an empty byte string.

b''

Returns:

Name Type Description
tuple tuple[bool, str]

A tuple containing: - bool: True if the signature is valid, False otherwise. - str: A message indicating the result of the verification.

Raises:

Type Description
ValueError

If the signature header is missing or if the algorithm is unsupported.

Source code in libs/apsig/src/apsig/draft/verify.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@staticmethod
@deprecated(
    "apsig.draft.verify.draftVerifier is deprecated; use apsig.draft.verify.Verifier instead. This will be removed in apsig 1.0."
)
def verify(
    public_pem: str, method: str, url: str, headers: dict, body: bytes = b""
) -> tuple[bool, str]:
    """Verifies the digital signature of an HTTP request.

    Args:
        public_pem (str): The public key in PEM format used to verify the signature.
        method (str): The HTTP method (e.g., "GET", "POST").
        url (str): The URL of the request.
        headers (dict): A dictionary of HTTP headers, including the signature and other relevant information.
        body (bytes, optional): The request body. Defaults to an empty byte string.

    Returns:
        tuple: A tuple containing:
            - bool: True if the signature is valid, False otherwise.
            - str: A message indicating the result of the verification.

    Raises:
        ValueError: If the signature header is missing or if the algorithm is unsupported.
    """
    try:
        result = Verifier(
            public_pem=public_pem, method=method, url=url, headers=headers, body=body
        ).verify(raise_on_fail=True)
    except Exception as e:
        return False, str(e)
    if result:
        return True, "Signature is valid"

ld_signature

LDSignature

A class for signing and verifying Linked Data signatures using the RSA signature algorithm.

Attributes:

Name Type Description
private_key RSAPrivateKey

The RSA private key used for signing.

public_key RSAPublicKey

The corresponding RSA public key.

Methods:

Name Description
sign

dict, creator: str, private_key: rsa.RSAPrivateKey, options: dict = None, created: datetime.datetime = None) -> dict: Signs the provided document using the specified RSA private key.

verify

dict, public_key: rsa.RSAPublicKey | str) -> bool: Verifies the signature of the provided document against the given public key.

Source code in libs/apsig/src/apsig/ld_signature.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class LDSignature:
    """A class for signing and verifying Linked Data signatures using the RSA signature algorithm. 

    Attributes:
        private_key (rsa.RSAPrivateKey): The RSA private key used for signing.
        public_key (rsa.RSAPublicKey): The corresponding RSA public key.

    Methods:
        sign(doc: dict, creator: str, private_key: rsa.RSAPrivateKey, options: dict = None, created: datetime.datetime = None) -> dict:
            Signs the provided document using the specified RSA private key.

        verify(doc: dict, public_key: rsa.RSAPublicKey | str) -> bool:
            Verifies the signature of the provided document against the given public key.
    """

    def __init__(self):
        pass

    def __normalized_hash(self, data):
        norm_form = jsonld.normalize(
            data, {"algorithm": "URDNA2015", "format": "application/n-quads"}
        )
        digest = hashes.Hash(hashes.SHA256())
        digest.update(norm_form.encode("utf8"))
        return digest.finalize().hex().encode("ascii")

    def sign(
        self,
        doc: dict,
        creator: str,
        private_key: rsa.RSAPrivateKey,
        options: dict = None,
        created: datetime.datetime = None,
    ):
        """Signs the provided document using the specified RSA private key.

        Args:
            doc (dict): The document to be signed.
            creator (str): The identifier of the creator of the document.
            private_key (rsa.RSAPrivateKey): The RSA private key used for signing.
            options (dict, optional): Additional signing options. Defaults to None.
            created (datetime.datetime, optional): The timestamp when the signature is created. 
                Defaults to the current UTC time if not provided.

        Returns:
            dict: The signed document containing the original data and the signature.
        """
        options: dict[str, str] = {
            "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld", # "https://w3id.org/identity/v1"
            "creator": creator,
            "created": created or utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
        }

        to_be_signed = self.__normalized_hash(options) + self.__normalized_hash(doc)

        signature = base64.b64encode(private_key.sign(
            to_be_signed, padding.PKCS1v15(), hashes.SHA256()
        ))

        return {
            **doc,
            "signature": {
                **options,
                "type": "RsaSignature2017",
                "signatureValue": signature.decode("ascii"),
            },
        }

    def verify(self, doc: dict, public_key: rsa.RSAPublicKey | str, raise_on_fail: bool = False) -> Union[str, None]:
        """Verifies the signature of the provided document against the given public key.

        Args:
            doc (dict): The signed document to verify.
            public_key (rsa.RSAPublicKey | str): The RSA public key in PEM format or as a multibase-encoded string.

        Returns:
            bool: True if the signature is valid; otherwise, an exception is raised.

        Raises:
            MissingSignature: If the signature section is missing in the document.
            UnknownSignature: If the signature type is not recognized.
            VerificationFailed: If the signature verification fails.
        """
        if isinstance(public_key, str):
            codec, data = multicodec.unwrap(multibase.decode(public_key))
            if codec.name != "rsa-pub":
                if raise_on_fail:
                    raise ValueError("public_key must be RSA PublicKey.")
                return None
            public_key = serialization.load_pem_public_key(data, backend=default_backend())
        try:
            document = doc.copy()
            signature = document.pop("signature")
            options = {
                "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld",
                "creator": signature["creator"],
                "created": signature["created"],
            }
        except KeyError:
            if raise_on_fail:
                raise MissingSignature("Invalid signature section")
            return None
        if signature["type"].lower() != "rsasignature2017":
            if raise_on_fail:
                raise UnknownSignature("Unknown signature type")
            return None
        final_hash = self.__normalized_hash(options) + self.__normalized_hash(document)
        try:
            public_key.verify(
                base64.b64decode(signature["signatureValue"]),
                final_hash,
                padding.PKCS1v15(),
                hashes.SHA256(),
            )
            return signature["creator"]
        except InvalidSignature:
            if raise_on_fail:
                raise VerificationFailed("LDSignature mismatch")
            return None

sign(doc, creator, private_key, options=None, created=None)

Signs the provided document using the specified RSA private key.

Parameters:

Name Type Description Default
doc dict

The document to be signed.

required
creator str

The identifier of the creator of the document.

required
private_key RSAPrivateKey

The RSA private key used for signing.

required
options dict

Additional signing options. Defaults to None.

None
created datetime

The timestamp when the signature is created. Defaults to the current UTC time if not provided.

None

Returns:

Name Type Description
dict

The signed document containing the original data and the signature.

Source code in libs/apsig/src/apsig/ld_signature.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def sign(
    self,
    doc: dict,
    creator: str,
    private_key: rsa.RSAPrivateKey,
    options: dict = None,
    created: datetime.datetime = None,
):
    """Signs the provided document using the specified RSA private key.

    Args:
        doc (dict): The document to be signed.
        creator (str): The identifier of the creator of the document.
        private_key (rsa.RSAPrivateKey): The RSA private key used for signing.
        options (dict, optional): Additional signing options. Defaults to None.
        created (datetime.datetime, optional): The timestamp when the signature is created. 
            Defaults to the current UTC time if not provided.

    Returns:
        dict: The signed document containing the original data and the signature.
    """
    options: dict[str, str] = {
        "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld", # "https://w3id.org/identity/v1"
        "creator": creator,
        "created": created or utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z",
    }

    to_be_signed = self.__normalized_hash(options) + self.__normalized_hash(doc)

    signature = base64.b64encode(private_key.sign(
        to_be_signed, padding.PKCS1v15(), hashes.SHA256()
    ))

    return {
        **doc,
        "signature": {
            **options,
            "type": "RsaSignature2017",
            "signatureValue": signature.decode("ascii"),
        },
    }

verify(doc, public_key, raise_on_fail=False)

Verifies the signature of the provided document against the given public key.

Parameters:

Name Type Description Default
doc dict

The signed document to verify.

required
public_key RSAPublicKey | str

The RSA public key in PEM format or as a multibase-encoded string.

required

Returns:

Name Type Description
bool Union[str, None]

True if the signature is valid; otherwise, an exception is raised.

Raises:

Type Description
MissingSignature

If the signature section is missing in the document.

UnknownSignature

If the signature type is not recognized.

VerificationFailed

If the signature verification fails.

Source code in libs/apsig/src/apsig/ld_signature.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def verify(self, doc: dict, public_key: rsa.RSAPublicKey | str, raise_on_fail: bool = False) -> Union[str, None]:
    """Verifies the signature of the provided document against the given public key.

    Args:
        doc (dict): The signed document to verify.
        public_key (rsa.RSAPublicKey | str): The RSA public key in PEM format or as a multibase-encoded string.

    Returns:
        bool: True if the signature is valid; otherwise, an exception is raised.

    Raises:
        MissingSignature: If the signature section is missing in the document.
        UnknownSignature: If the signature type is not recognized.
        VerificationFailed: If the signature verification fails.
    """
    if isinstance(public_key, str):
        codec, data = multicodec.unwrap(multibase.decode(public_key))
        if codec.name != "rsa-pub":
            if raise_on_fail:
                raise ValueError("public_key must be RSA PublicKey.")
            return None
        public_key = serialization.load_pem_public_key(data, backend=default_backend())
    try:
        document = doc.copy()
        signature = document.pop("signature")
        options = {
            "@context": "https://w3c-ccg.github.io/security-vocab/contexts/security-v1.jsonld",
            "creator": signature["creator"],
            "created": signature["created"],
        }
    except KeyError:
        if raise_on_fail:
            raise MissingSignature("Invalid signature section")
        return None
    if signature["type"].lower() != "rsasignature2017":
        if raise_on_fail:
            raise UnknownSignature("Unknown signature type")
        return None
    final_hash = self.__normalized_hash(options) + self.__normalized_hash(document)
    try:
        public_key.verify(
            base64.b64decode(signature["signatureValue"]),
            final_hash,
            padding.PKCS1v15(),
            hashes.SHA256(),
        )
        return signature["creator"]
    except InvalidSignature:
        if raise_on_fail:
            raise VerificationFailed("LDSignature mismatch")
        return None