Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 225 lines 9.2 kB view raw
1import base64 2import os 3 4import gevent 5 6from Plugin import PluginManager 7from Crypt import CryptBitcoin, CryptHash 8from Config import config 9import sslcrypto 10 11from . import CryptMessage 12 13curve = sslcrypto.ecc.get_curve("secp256k1") 14 15 16@PluginManager.registerTo("UiWebsocket") 17class UiWebsocketPlugin(object): 18 # - Actions - 19 20 # Returns user's public key unique to site 21 # Return: Public key 22 def actionUserPublickey(self, to, index=0): 23 self.response(to, self.user.getEncryptPublickey(self.site.address, index)) 24 25 # Encrypt a text using the publickey or user's sites unique publickey 26 # Return: Encrypted text using base64 encoding 27 def actionEciesEncrypt(self, to, text, publickey=0, return_aes_key=False): 28 if type(publickey) is int: # Encrypt using user's publickey 29 publickey = self.user.getEncryptPublickey(self.site.address, publickey) 30 aes_key, encrypted = CryptMessage.eciesEncrypt(text.encode("utf8"), publickey) 31 if return_aes_key: 32 self.response(to, [base64.b64encode(encrypted).decode("utf8"), base64.b64encode(aes_key).decode("utf8")]) 33 else: 34 self.response(to, base64.b64encode(encrypted).decode("utf8")) 35 36 # Decrypt a text using privatekey or the user's site unique private key 37 # Return: Decrypted text or list of decrypted texts 38 def actionEciesDecrypt(self, to, param, privatekey=0): 39 if type(privatekey) is int: # Decrypt using user's privatekey 40 privatekey = self.user.getEncryptPrivatekey(self.site.address, privatekey) 41 42 if type(param) == list: 43 encrypted_texts = param 44 else: 45 encrypted_texts = [param] 46 47 texts = CryptMessage.eciesDecryptMulti(encrypted_texts, privatekey) 48 49 if type(param) == list: 50 self.response(to, texts) 51 else: 52 self.response(to, texts[0]) 53 54 # Encrypt a text using AES 55 # Return: Iv, AES key, Encrypted text 56 def actionAesEncrypt(self, to, text, key=None): 57 if key: 58 key = base64.b64decode(key) 59 else: 60 key = sslcrypto.aes.new_key() 61 62 if text: 63 encrypted, iv = sslcrypto.aes.encrypt(text.encode("utf8"), key) 64 else: 65 encrypted, iv = b"", b"" 66 67 res = [base64.b64encode(item).decode("utf8") for item in [key, iv, encrypted]] 68 self.response(to, res) 69 70 # Decrypt a text using AES 71 # Return: Decrypted text 72 def actionAesDecrypt(self, to, *args): 73 if len(args) == 3: # Single decrypt 74 encrypted_texts = [(args[0], args[1])] 75 keys = [args[2]] 76 else: # Batch decrypt 77 encrypted_texts, keys = args 78 79 texts = [] # Decoded texts 80 for iv, encrypted_text in encrypted_texts: 81 encrypted_text = base64.b64decode(encrypted_text) 82 iv = base64.b64decode(iv) 83 text = None 84 for key in keys: 85 try: 86 decrypted = sslcrypto.aes.decrypt(encrypted_text, iv, base64.b64decode(key)) 87 if decrypted and decrypted.decode("utf8"): # Valid text decoded 88 text = decrypted.decode("utf8") 89 except Exception as err: 90 pass 91 texts.append(text) 92 93 if len(args) == 3: 94 self.response(to, texts[0]) 95 else: 96 self.response(to, texts) 97 98 # Sign data using ECDSA 99 # Return: Signature 100 def actionEcdsaSign(self, to, data, privatekey=None): 101 if privatekey is None: # Sign using user's privatekey 102 privatekey = self.user.getAuthPrivatekey(self.site.address) 103 104 self.response(to, CryptBitcoin.sign(data, privatekey)) 105 106 # Verify data using ECDSA (address is either a address or array of addresses) 107 # Return: bool 108 def actionEcdsaVerify(self, to, data, address, signature): 109 self.response(to, CryptBitcoin.verify(data, address, signature)) 110 111 # Gets the publickey of a given privatekey 112 def actionEccPrivToPub(self, to, privatekey): 113 self.response(to, curve.private_to_public(curve.wif_to_private(privatekey.encode()))) 114 115 # Gets the address of a given publickey 116 def actionEccPubToAddr(self, to, publickey): 117 self.response(to, curve.public_to_address(bytes.fromhex(publickey))) 118 119 120@PluginManager.registerTo("User") 121class UserPlugin(object): 122 def getEncryptPrivatekey(self, address, param_index=0): 123 if param_index < 0 or param_index > 1000: 124 raise Exception("Param_index out of range") 125 126 site_data = self.getSiteData(address) 127 128 if site_data.get("cert"): # Different privatekey for different cert provider 129 index = param_index + self.getAddressAuthIndex(site_data["cert"]) 130 else: 131 index = param_index 132 133 if "encrypt_privatekey_%s" % index not in site_data: 134 address_index = self.getAddressAuthIndex(address) 135 crypt_index = address_index + 1000 + index 136 site_data["encrypt_privatekey_%s" % index] = CryptBitcoin.hdPrivatekey(self.master_seed, crypt_index) 137 self.log.debug("New encrypt privatekey generated for %s:%s" % (address, index)) 138 return site_data["encrypt_privatekey_%s" % index] 139 140 def getEncryptPublickey(self, address, param_index=0): 141 if param_index < 0 or param_index > 1000: 142 raise Exception("Param_index out of range") 143 144 site_data = self.getSiteData(address) 145 146 if site_data.get("cert"): # Different privatekey for different cert provider 147 index = param_index + self.getAddressAuthIndex(site_data["cert"]) 148 else: 149 index = param_index 150 151 if "encrypt_publickey_%s" % index not in site_data: 152 privatekey = self.getEncryptPrivatekey(address, param_index).encode() 153 publickey = curve.private_to_public(curve.wif_to_private(privatekey) + b"\x01") 154 site_data["encrypt_publickey_%s" % index] = base64.b64encode(publickey).decode("utf8") 155 return site_data["encrypt_publickey_%s" % index] 156 157 158@PluginManager.registerTo("Actions") 159class ActionsPlugin: 160 publickey = "A3HatibU4S6eZfIQhVs2u7GLN5G9wXa9WwlkyYIfwYaj" 161 privatekey = "5JBiKFYBm94EUdbxtnuLi6cvNcPzcKymCUHBDf2B6aq19vvG3rL" 162 utf8_text = '\xc1rv\xedzt\xfbr\xf5t\xfck\xf6rf\xfar\xf3g\xe9p' 163 164 def getBenchmarkTests(self, online=False): 165 if hasattr(super(), "getBenchmarkTests"): 166 tests = super().getBenchmarkTests(online) 167 else: 168 tests = [] 169 170 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey) # Warm-up 171 tests.extend([ 172 {"func": self.testCryptEciesEncrypt, "kwargs": {}, "num": 100, "time_standard": 1.2}, 173 {"func": self.testCryptEciesDecrypt, "kwargs": {}, "num": 500, "time_standard": 1.3}, 174 {"func": self.testCryptEciesDecryptMulti, "kwargs": {}, "num": 5, "time_standard": 0.68}, 175 {"func": self.testCryptAesEncrypt, "kwargs": {}, "num": 10000, "time_standard": 0.27}, 176 {"func": self.testCryptAesDecrypt, "kwargs": {}, "num": 10000, "time_standard": 0.25} 177 ]) 178 return tests 179 180 def testCryptEciesEncrypt(self, num_run=1): 181 for i in range(num_run): 182 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey) 183 assert len(aes_key) == 32 184 yield "." 185 186 def testCryptEciesDecrypt(self, num_run=1): 187 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey) 188 for i in range(num_run): 189 assert len(aes_key) == 32 190 decrypted = CryptMessage.eciesDecrypt(base64.b64encode(encrypted), self.privatekey) 191 assert decrypted == self.utf8_text.encode("utf8"), "%s != %s" % (decrypted, self.utf8_text.encode("utf8")) 192 yield "." 193 194 def testCryptEciesDecryptMulti(self, num_run=1): 195 yield "x 100 (%s threads) " % config.threads_crypt 196 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey) 197 198 threads = [] 199 for i in range(num_run): 200 assert len(aes_key) == 32 201 threads.append(gevent.spawn( 202 CryptMessage.eciesDecryptMulti, [base64.b64encode(encrypted)] * 100, self.privatekey 203 )) 204 205 for thread in threads: 206 res = thread.get() 207 assert res[0] == self.utf8_text, "%s != %s" % (res[0], self.utf8_text) 208 assert res[0] == res[-1], "%s != %s" % (res[0], res[-1]) 209 yield "." 210 gevent.joinall(threads) 211 212 def testCryptAesEncrypt(self, num_run=1): 213 for i in range(num_run): 214 key = os.urandom(32) 215 encrypted = sslcrypto.aes.encrypt(self.utf8_text.encode("utf8"), key) 216 yield "." 217 218 def testCryptAesDecrypt(self, num_run=1): 219 key = os.urandom(32) 220 encrypted_text, iv = sslcrypto.aes.encrypt(self.utf8_text.encode("utf8"), key) 221 222 for i in range(num_run): 223 decrypted = sslcrypto.aes.decrypt(encrypted_text, iv, key).decode("utf8") 224 assert decrypted == self.utf8_text 225 yield "."