Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import base64
2import os
3
4import gevent
5
6from Plugin import PluginManager
7from Crypt import CryptBitcoin, CryptHash
8from Config import config
9import sslcrypto
10
11from . import CryptMessage
12
13curve = sslcrypto.ecc.get_curve("secp256k1")
14
15
16@PluginManager.registerTo("UiWebsocket")
17class UiWebsocketPlugin(object):
18 # - Actions -
19
20 # Returns user's public key unique to site
21 # Return: Public key
22 def actionUserPublickey(self, to, index=0):
23 self.response(to, self.user.getEncryptPublickey(self.site.address, index))
24
25 # Encrypt a text using the publickey or user's sites unique publickey
26 # Return: Encrypted text using base64 encoding
27 def actionEciesEncrypt(self, to, text, publickey=0, return_aes_key=False):
28 if type(publickey) is int: # Encrypt using user's publickey
29 publickey = self.user.getEncryptPublickey(self.site.address, publickey)
30 aes_key, encrypted = CryptMessage.eciesEncrypt(text.encode("utf8"), publickey)
31 if return_aes_key:
32 self.response(to, [base64.b64encode(encrypted).decode("utf8"), base64.b64encode(aes_key).decode("utf8")])
33 else:
34 self.response(to, base64.b64encode(encrypted).decode("utf8"))
35
36 # Decrypt a text using privatekey or the user's site unique private key
37 # Return: Decrypted text or list of decrypted texts
38 def actionEciesDecrypt(self, to, param, privatekey=0):
39 if type(privatekey) is int: # Decrypt using user's privatekey
40 privatekey = self.user.getEncryptPrivatekey(self.site.address, privatekey)
41
42 if type(param) == list:
43 encrypted_texts = param
44 else:
45 encrypted_texts = [param]
46
47 texts = CryptMessage.eciesDecryptMulti(encrypted_texts, privatekey)
48
49 if type(param) == list:
50 self.response(to, texts)
51 else:
52 self.response(to, texts[0])
53
54 # Encrypt a text using AES
55 # Return: Iv, AES key, Encrypted text
56 def actionAesEncrypt(self, to, text, key=None):
57 if key:
58 key = base64.b64decode(key)
59 else:
60 key = sslcrypto.aes.new_key()
61
62 if text:
63 encrypted, iv = sslcrypto.aes.encrypt(text.encode("utf8"), key)
64 else:
65 encrypted, iv = b"", b""
66
67 res = [base64.b64encode(item).decode("utf8") for item in [key, iv, encrypted]]
68 self.response(to, res)
69
70 # Decrypt a text using AES
71 # Return: Decrypted text
72 def actionAesDecrypt(self, to, *args):
73 if len(args) == 3: # Single decrypt
74 encrypted_texts = [(args[0], args[1])]
75 keys = [args[2]]
76 else: # Batch decrypt
77 encrypted_texts, keys = args
78
79 texts = [] # Decoded texts
80 for iv, encrypted_text in encrypted_texts:
81 encrypted_text = base64.b64decode(encrypted_text)
82 iv = base64.b64decode(iv)
83 text = None
84 for key in keys:
85 try:
86 decrypted = sslcrypto.aes.decrypt(encrypted_text, iv, base64.b64decode(key))
87 if decrypted and decrypted.decode("utf8"): # Valid text decoded
88 text = decrypted.decode("utf8")
89 except Exception as err:
90 pass
91 texts.append(text)
92
93 if len(args) == 3:
94 self.response(to, texts[0])
95 else:
96 self.response(to, texts)
97
98 # Sign data using ECDSA
99 # Return: Signature
100 def actionEcdsaSign(self, to, data, privatekey=None):
101 if privatekey is None: # Sign using user's privatekey
102 privatekey = self.user.getAuthPrivatekey(self.site.address)
103
104 self.response(to, CryptBitcoin.sign(data, privatekey))
105
106 # Verify data using ECDSA (address is either a address or array of addresses)
107 # Return: bool
108 def actionEcdsaVerify(self, to, data, address, signature):
109 self.response(to, CryptBitcoin.verify(data, address, signature))
110
111 # Gets the publickey of a given privatekey
112 def actionEccPrivToPub(self, to, privatekey):
113 self.response(to, curve.private_to_public(curve.wif_to_private(privatekey.encode())))
114
115 # Gets the address of a given publickey
116 def actionEccPubToAddr(self, to, publickey):
117 self.response(to, curve.public_to_address(bytes.fromhex(publickey)))
118
119
120@PluginManager.registerTo("User")
121class UserPlugin(object):
122 def getEncryptPrivatekey(self, address, param_index=0):
123 if param_index < 0 or param_index > 1000:
124 raise Exception("Param_index out of range")
125
126 site_data = self.getSiteData(address)
127
128 if site_data.get("cert"): # Different privatekey for different cert provider
129 index = param_index + self.getAddressAuthIndex(site_data["cert"])
130 else:
131 index = param_index
132
133 if "encrypt_privatekey_%s" % index not in site_data:
134 address_index = self.getAddressAuthIndex(address)
135 crypt_index = address_index + 1000 + index
136 site_data["encrypt_privatekey_%s" % index] = CryptBitcoin.hdPrivatekey(self.master_seed, crypt_index)
137 self.log.debug("New encrypt privatekey generated for %s:%s" % (address, index))
138 return site_data["encrypt_privatekey_%s" % index]
139
140 def getEncryptPublickey(self, address, param_index=0):
141 if param_index < 0 or param_index > 1000:
142 raise Exception("Param_index out of range")
143
144 site_data = self.getSiteData(address)
145
146 if site_data.get("cert"): # Different privatekey for different cert provider
147 index = param_index + self.getAddressAuthIndex(site_data["cert"])
148 else:
149 index = param_index
150
151 if "encrypt_publickey_%s" % index not in site_data:
152 privatekey = self.getEncryptPrivatekey(address, param_index).encode()
153 publickey = curve.private_to_public(curve.wif_to_private(privatekey) + b"\x01")
154 site_data["encrypt_publickey_%s" % index] = base64.b64encode(publickey).decode("utf8")
155 return site_data["encrypt_publickey_%s" % index]
156
157
158@PluginManager.registerTo("Actions")
159class ActionsPlugin:
160 publickey = "A3HatibU4S6eZfIQhVs2u7GLN5G9wXa9WwlkyYIfwYaj"
161 privatekey = "5JBiKFYBm94EUdbxtnuLi6cvNcPzcKymCUHBDf2B6aq19vvG3rL"
162 utf8_text = '\xc1rv\xedzt\xfbr\xf5t\xfck\xf6rf\xfar\xf3g\xe9p'
163
164 def getBenchmarkTests(self, online=False):
165 if hasattr(super(), "getBenchmarkTests"):
166 tests = super().getBenchmarkTests(online)
167 else:
168 tests = []
169
170 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey) # Warm-up
171 tests.extend([
172 {"func": self.testCryptEciesEncrypt, "kwargs": {}, "num": 100, "time_standard": 1.2},
173 {"func": self.testCryptEciesDecrypt, "kwargs": {}, "num": 500, "time_standard": 1.3},
174 {"func": self.testCryptEciesDecryptMulti, "kwargs": {}, "num": 5, "time_standard": 0.68},
175 {"func": self.testCryptAesEncrypt, "kwargs": {}, "num": 10000, "time_standard": 0.27},
176 {"func": self.testCryptAesDecrypt, "kwargs": {}, "num": 10000, "time_standard": 0.25}
177 ])
178 return tests
179
180 def testCryptEciesEncrypt(self, num_run=1):
181 for i in range(num_run):
182 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey)
183 assert len(aes_key) == 32
184 yield "."
185
186 def testCryptEciesDecrypt(self, num_run=1):
187 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey)
188 for i in range(num_run):
189 assert len(aes_key) == 32
190 decrypted = CryptMessage.eciesDecrypt(base64.b64encode(encrypted), self.privatekey)
191 assert decrypted == self.utf8_text.encode("utf8"), "%s != %s" % (decrypted, self.utf8_text.encode("utf8"))
192 yield "."
193
194 def testCryptEciesDecryptMulti(self, num_run=1):
195 yield "x 100 (%s threads) " % config.threads_crypt
196 aes_key, encrypted = CryptMessage.eciesEncrypt(self.utf8_text.encode("utf8"), self.publickey)
197
198 threads = []
199 for i in range(num_run):
200 assert len(aes_key) == 32
201 threads.append(gevent.spawn(
202 CryptMessage.eciesDecryptMulti, [base64.b64encode(encrypted)] * 100, self.privatekey
203 ))
204
205 for thread in threads:
206 res = thread.get()
207 assert res[0] == self.utf8_text, "%s != %s" % (res[0], self.utf8_text)
208 assert res[0] == res[-1], "%s != %s" % (res[0], res[-1])
209 yield "."
210 gevent.joinall(threads)
211
212 def testCryptAesEncrypt(self, num_run=1):
213 for i in range(num_run):
214 key = os.urandom(32)
215 encrypted = sslcrypto.aes.encrypt(self.utf8_text.encode("utf8"), key)
216 yield "."
217
218 def testCryptAesDecrypt(self, num_run=1):
219 key = os.urandom(32)
220 encrypted_text, iv = sslcrypto.aes.encrypt(self.utf8_text.encode("utf8"), key)
221
222 for i in range(num_run):
223 decrypted = sslcrypto.aes.decrypt(encrypted_text, iv, key).decode("utf8")
224 assert decrypted == self.utf8_text
225 yield "."