Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 199 lines 6.4 kB view raw
1import hmac 2import os 3from ._jacobian import JacobianCurve 4from .._ecc import ECC 5from .aes import aes 6from ._util import int_to_bytes, bytes_to_int, inverse, square_root_mod_prime 7 8 9class EllipticCurveBackend: 10 def __init__(self, p, n, a, b, g): 11 self.p, self.n, self.a, self.b, self.g = p, n, a, b, g 12 self.jacobian = JacobianCurve(p, n, a, b, g) 13 14 self.public_key_length = (len(bin(p).replace("0b", "")) + 7) // 8 15 self.order_bitlength = len(bin(n).replace("0b", "")) 16 17 18 def _int_to_bytes(self, raw, len=None): 19 return int_to_bytes(raw, len or self.public_key_length) 20 21 22 def decompress_point(self, public_key): 23 # Parse & load data 24 x = bytes_to_int(public_key[1:]) 25 # Calculate Y 26 y_square = (pow(x, 3, self.p) + self.a * x + self.b) % self.p 27 try: 28 y = square_root_mod_prime(y_square, self.p) 29 except Exception: 30 raise ValueError("Invalid public key") from None 31 if y % 2 != public_key[0] - 0x02: 32 y = self.p - y 33 return self._int_to_bytes(x), self._int_to_bytes(y) 34 35 36 def new_private_key(self): 37 while True: 38 private_key = os.urandom(self.public_key_length) 39 if bytes_to_int(private_key) >= self.n: 40 continue 41 return private_key 42 43 44 def private_to_public(self, private_key): 45 raw = bytes_to_int(private_key) 46 x, y = self.jacobian.fast_multiply(self.g, raw) 47 return self._int_to_bytes(x), self._int_to_bytes(y) 48 49 50 def ecdh(self, private_key, public_key): 51 x, y = public_key 52 x, y = bytes_to_int(x), bytes_to_int(y) 53 private_key = bytes_to_int(private_key) 54 x, _ = self.jacobian.fast_multiply((x, y), private_key, secret=True) 55 return self._int_to_bytes(x) 56 57 58 def _subject_to_int(self, subject): 59 return bytes_to_int(subject[:(self.order_bitlength + 7) // 8]) 60 61 62 def sign(self, subject, raw_private_key, recoverable, is_compressed, entropy): 63 z = self._subject_to_int(subject) 64 private_key = bytes_to_int(raw_private_key) 65 k = bytes_to_int(entropy) 66 67 # Fix k length to prevent Minerva. Increasing multiplier by a 68 # multiple of order doesn't break anything. This fix was ported 69 # from python-ecdsa 70 ks = k + self.n 71 kt = ks + self.n 72 ks_len = len(bin(ks).replace("0b", "")) // 8 73 kt_len = len(bin(kt).replace("0b", "")) // 8 74 if ks_len == kt_len: 75 k = kt 76 else: 77 k = ks 78 px, py = self.jacobian.fast_multiply(self.g, k, secret=True) 79 80 r = px % self.n 81 if r == 0: 82 # Invalid k 83 raise ValueError("Invalid k") 84 85 s = (inverse(k, self.n) * (z + (private_key * r))) % self.n 86 if s == 0: 87 # Invalid k 88 raise ValueError("Invalid k") 89 90 inverted = False 91 if s * 2 >= self.n: 92 s = self.n - s 93 inverted = True 94 rs_buf = self._int_to_bytes(r) + self._int_to_bytes(s) 95 96 if recoverable: 97 recid = (py % 2) ^ inverted 98 recid += 2 * int(px // self.n) 99 if is_compressed: 100 return bytes([31 + recid]) + rs_buf 101 else: 102 if recid >= 4: 103 raise ValueError("Too big recovery ID, use compressed address instead") 104 return bytes([27 + recid]) + rs_buf 105 else: 106 return rs_buf 107 108 109 def recover(self, signature, subject): 110 z = self._subject_to_int(subject) 111 112 recid = signature[0] - 27 if signature[0] < 31 else signature[0] - 31 113 r = bytes_to_int(signature[1:self.public_key_length + 1]) 114 s = bytes_to_int(signature[self.public_key_length + 1:]) 115 116 # Verify bounds 117 if not 0 <= recid < 2 * (self.p // self.n + 1): 118 raise ValueError("Invalid recovery ID") 119 if r >= self.n: 120 raise ValueError("r is out of bounds") 121 if s >= self.n: 122 raise ValueError("s is out of bounds") 123 124 rinv = inverse(r, self.n) 125 u1 = (-z * rinv) % self.n 126 u2 = (s * rinv) % self.n 127 128 # Recover R 129 rx = r + (recid // 2) * self.n 130 if rx >= self.p: 131 raise ValueError("Rx is out of bounds") 132 133 # Almost copied from decompress_point 134 ry_square = (pow(rx, 3, self.p) + self.a * rx + self.b) % self.p 135 try: 136 ry = square_root_mod_prime(ry_square, self.p) 137 except Exception: 138 raise ValueError("Invalid recovered public key") from None 139 140 # Ensure the point is correct 141 if ry % 2 != recid % 2: 142 # Fix Ry sign 143 ry = self.p - ry 144 145 x, y = self.jacobian.fast_shamir(self.g, u1, (rx, ry), u2) 146 return self._int_to_bytes(x), self._int_to_bytes(y) 147 148 149 def verify(self, signature, subject, public_key): 150 z = self._subject_to_int(subject) 151 152 r = bytes_to_int(signature[:self.public_key_length]) 153 s = bytes_to_int(signature[self.public_key_length:]) 154 155 # Verify bounds 156 if r >= self.n: 157 raise ValueError("r is out of bounds") 158 if s >= self.n: 159 raise ValueError("s is out of bounds") 160 161 public_key = [bytes_to_int(c) for c in public_key] 162 163 # Ensure that the public key is correct 164 if not self.jacobian.is_on_curve(public_key): 165 raise ValueError("Public key is not on curve") 166 167 sinv = inverse(s, self.n) 168 u1 = (z * sinv) % self.n 169 u2 = (r * sinv) % self.n 170 171 x1, _ = self.jacobian.fast_shamir(self.g, u1, public_key, u2) 172 if r != x1 % self.n: 173 raise ValueError("Invalid signature") 174 175 return True 176 177 178 def derive_child(self, seed, child): 179 # Round 1 180 h = hmac.new(key=b"Bitcoin seed", msg=seed, digestmod="sha512").digest() 181 private_key1 = h[:32] 182 x, y = self.private_to_public(private_key1) 183 public_key1 = bytes([0x02 + (y[-1] % 2)]) + x 184 private_key1 = bytes_to_int(private_key1) 185 186 # Round 2 187 msg = public_key1 + self._int_to_bytes(child, 4) 188 h = hmac.new(key=h[32:], msg=msg, digestmod="sha512").digest() 189 private_key2 = bytes_to_int(h[:32]) 190 191 return self._int_to_bytes((private_key1 + private_key2) % self.n) 192 193 194 @classmethod 195 def get_backend(cls): 196 return "fallback" 197 198 199ecc = ECC(EllipticCurveBackend, aes)