Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import hmac
2import os
3from ._jacobian import JacobianCurve
4from .._ecc import ECC
5from .aes import aes
6from ._util import int_to_bytes, bytes_to_int, inverse, square_root_mod_prime
7
8
9class EllipticCurveBackend:
10 def __init__(self, p, n, a, b, g):
11 self.p, self.n, self.a, self.b, self.g = p, n, a, b, g
12 self.jacobian = JacobianCurve(p, n, a, b, g)
13
14 self.public_key_length = (len(bin(p).replace("0b", "")) + 7) // 8
15 self.order_bitlength = len(bin(n).replace("0b", ""))
16
17
18 def _int_to_bytes(self, raw, len=None):
19 return int_to_bytes(raw, len or self.public_key_length)
20
21
22 def decompress_point(self, public_key):
23 # Parse & load data
24 x = bytes_to_int(public_key[1:])
25 # Calculate Y
26 y_square = (pow(x, 3, self.p) + self.a * x + self.b) % self.p
27 try:
28 y = square_root_mod_prime(y_square, self.p)
29 except Exception:
30 raise ValueError("Invalid public key") from None
31 if y % 2 != public_key[0] - 0x02:
32 y = self.p - y
33 return self._int_to_bytes(x), self._int_to_bytes(y)
34
35
36 def new_private_key(self):
37 while True:
38 private_key = os.urandom(self.public_key_length)
39 if bytes_to_int(private_key) >= self.n:
40 continue
41 return private_key
42
43
44 def private_to_public(self, private_key):
45 raw = bytes_to_int(private_key)
46 x, y = self.jacobian.fast_multiply(self.g, raw)
47 return self._int_to_bytes(x), self._int_to_bytes(y)
48
49
50 def ecdh(self, private_key, public_key):
51 x, y = public_key
52 x, y = bytes_to_int(x), bytes_to_int(y)
53 private_key = bytes_to_int(private_key)
54 x, _ = self.jacobian.fast_multiply((x, y), private_key, secret=True)
55 return self._int_to_bytes(x)
56
57
58 def _subject_to_int(self, subject):
59 return bytes_to_int(subject[:(self.order_bitlength + 7) // 8])
60
61
62 def sign(self, subject, raw_private_key, recoverable, is_compressed, entropy):
63 z = self._subject_to_int(subject)
64 private_key = bytes_to_int(raw_private_key)
65 k = bytes_to_int(entropy)
66
67 # Fix k length to prevent Minerva. Increasing multiplier by a
68 # multiple of order doesn't break anything. This fix was ported
69 # from python-ecdsa
70 ks = k + self.n
71 kt = ks + self.n
72 ks_len = len(bin(ks).replace("0b", "")) // 8
73 kt_len = len(bin(kt).replace("0b", "")) // 8
74 if ks_len == kt_len:
75 k = kt
76 else:
77 k = ks
78 px, py = self.jacobian.fast_multiply(self.g, k, secret=True)
79
80 r = px % self.n
81 if r == 0:
82 # Invalid k
83 raise ValueError("Invalid k")
84
85 s = (inverse(k, self.n) * (z + (private_key * r))) % self.n
86 if s == 0:
87 # Invalid k
88 raise ValueError("Invalid k")
89
90 inverted = False
91 if s * 2 >= self.n:
92 s = self.n - s
93 inverted = True
94 rs_buf = self._int_to_bytes(r) + self._int_to_bytes(s)
95
96 if recoverable:
97 recid = (py % 2) ^ inverted
98 recid += 2 * int(px // self.n)
99 if is_compressed:
100 return bytes([31 + recid]) + rs_buf
101 else:
102 if recid >= 4:
103 raise ValueError("Too big recovery ID, use compressed address instead")
104 return bytes([27 + recid]) + rs_buf
105 else:
106 return rs_buf
107
108
109 def recover(self, signature, subject):
110 z = self._subject_to_int(subject)
111
112 recid = signature[0] - 27 if signature[0] < 31 else signature[0] - 31
113 r = bytes_to_int(signature[1:self.public_key_length + 1])
114 s = bytes_to_int(signature[self.public_key_length + 1:])
115
116 # Verify bounds
117 if not 0 <= recid < 2 * (self.p // self.n + 1):
118 raise ValueError("Invalid recovery ID")
119 if r >= self.n:
120 raise ValueError("r is out of bounds")
121 if s >= self.n:
122 raise ValueError("s is out of bounds")
123
124 rinv = inverse(r, self.n)
125 u1 = (-z * rinv) % self.n
126 u2 = (s * rinv) % self.n
127
128 # Recover R
129 rx = r + (recid // 2) * self.n
130 if rx >= self.p:
131 raise ValueError("Rx is out of bounds")
132
133 # Almost copied from decompress_point
134 ry_square = (pow(rx, 3, self.p) + self.a * rx + self.b) % self.p
135 try:
136 ry = square_root_mod_prime(ry_square, self.p)
137 except Exception:
138 raise ValueError("Invalid recovered public key") from None
139
140 # Ensure the point is correct
141 if ry % 2 != recid % 2:
142 # Fix Ry sign
143 ry = self.p - ry
144
145 x, y = self.jacobian.fast_shamir(self.g, u1, (rx, ry), u2)
146 return self._int_to_bytes(x), self._int_to_bytes(y)
147
148
149 def verify(self, signature, subject, public_key):
150 z = self._subject_to_int(subject)
151
152 r = bytes_to_int(signature[:self.public_key_length])
153 s = bytes_to_int(signature[self.public_key_length:])
154
155 # Verify bounds
156 if r >= self.n:
157 raise ValueError("r is out of bounds")
158 if s >= self.n:
159 raise ValueError("s is out of bounds")
160
161 public_key = [bytes_to_int(c) for c in public_key]
162
163 # Ensure that the public key is correct
164 if not self.jacobian.is_on_curve(public_key):
165 raise ValueError("Public key is not on curve")
166
167 sinv = inverse(s, self.n)
168 u1 = (z * sinv) % self.n
169 u2 = (r * sinv) % self.n
170
171 x1, _ = self.jacobian.fast_shamir(self.g, u1, public_key, u2)
172 if r != x1 % self.n:
173 raise ValueError("Invalid signature")
174
175 return True
176
177
178 def derive_child(self, seed, child):
179 # Round 1
180 h = hmac.new(key=b"Bitcoin seed", msg=seed, digestmod="sha512").digest()
181 private_key1 = h[:32]
182 x, y = self.private_to_public(private_key1)
183 public_key1 = bytes([0x02 + (y[-1] % 2)]) + x
184 private_key1 = bytes_to_int(private_key1)
185
186 # Round 2
187 msg = public_key1 + self._int_to_bytes(child, 4)
188 h = hmac.new(key=h[32:], msg=msg, digestmod="sha512").digest()
189 private_key2 = bytes_to_int(h[:32])
190
191 return self._int_to_bytes((private_key1 + private_key2) % self.n)
192
193
194 @classmethod
195 def get_backend(cls):
196 return "fallback"
197
198
199ecc = ECC(EllipticCurveBackend, aes)