A Python port of the Invisible Internet Project (I2P)
at main 126 lines 4.2 kB view raw
1"""Garlic encryption types — DeliveryInstructions, GarlicClove, GarlicMessage.""" 2 3import enum 4import struct 5 6 7class DeliveryType(enum.IntEnum): 8 LOCAL = 0 9 DESTINATION = 1 10 ROUTER = 2 11 TUNNEL = 3 12 13 14class DeliveryInstructions: 15 """Delivery instructions for a garlic clove.""" 16 17 def __init__(self, delivery_type: DeliveryType, *, 18 destination: bytes | None = None, 19 router: bytes | None = None, 20 tunnel_id: int | None = None): 21 self.delivery_type = delivery_type 22 self.destination = destination 23 self.router = router 24 self.tunnel_id = tunnel_id 25 26 def to_bytes(self) -> bytes: 27 # Flag byte: delivery type in bits 5-6 28 flag = (self.delivery_type & 0x03) << 5 29 parts = [struct.pack("!B", flag)] 30 31 if self.delivery_type == DeliveryType.DESTINATION: 32 assert self.destination is not None 33 parts.append(self.destination) 34 elif self.delivery_type == DeliveryType.ROUTER: 35 assert self.router is not None 36 parts.append(self.router) 37 elif self.delivery_type == DeliveryType.TUNNEL: 38 assert self.router is not None 39 parts.append(self.router) 40 parts.append(struct.pack("!I", self.tunnel_id)) 41 42 return b"".join(parts) 43 44 @classmethod 45 def from_bytes(cls, data: bytes) -> tuple["DeliveryInstructions", int]: 46 flag = data[0] 47 delivery_type = DeliveryType((flag >> 5) & 0x03) 48 offset = 1 49 destination = None 50 router = None 51 tunnel_id = None 52 53 if delivery_type == DeliveryType.DESTINATION: 54 destination = data[offset:offset + 32] 55 offset += 32 56 elif delivery_type == DeliveryType.ROUTER: 57 router = data[offset:offset + 32] 58 offset += 32 59 elif delivery_type == DeliveryType.TUNNEL: 60 router = data[offset:offset + 32] 61 offset += 32 62 tunnel_id = struct.unpack("!I", data[offset:offset + 4])[0] 63 offset += 4 64 65 return cls(delivery_type, destination=destination, router=router, 66 tunnel_id=tunnel_id), offset 67 68 69class GarlicClove: 70 """A single garlic clove: delivery instructions + message + id + expiration.""" 71 72 def __init__(self, delivery_instructions: DeliveryInstructions, 73 message_data: bytes, clove_id: int, expiration: int): 74 self.delivery_instructions = delivery_instructions 75 self.message_data = message_data 76 self.clove_id = clove_id 77 self.expiration = expiration 78 self._size: int = 0 79 80 def to_bytes(self) -> bytes: 81 di_bytes = self.delivery_instructions.to_bytes() 82 return (di_bytes + 83 struct.pack("!I", len(self.message_data)) + 84 self.message_data + 85 struct.pack("!IQ", self.clove_id, self.expiration)) 86 87 @classmethod 88 def from_bytes(cls, data: bytes, offset: int = 0) -> "GarlicClove": 89 di, di_len = DeliveryInstructions.from_bytes(data[offset:]) 90 pos = offset + di_len 91 msg_len = struct.unpack("!I", data[pos:pos + 4])[0] 92 pos += 4 93 message_data = data[pos:pos + msg_len] 94 pos += msg_len 95 clove_id, expiration = struct.unpack("!IQ", data[pos:pos + 12]) 96 pos += 12 97 clove = cls(di, message_data, clove_id, expiration) 98 clove._size = pos - offset 99 return clove 100 101 102class GarlicMessage: 103 """Bundle of garlic cloves.""" 104 105 def __init__(self, cloves: list[GarlicClove]): 106 self.cloves = cloves 107 108 def clove_count(self) -> int: 109 return len(self.cloves) 110 111 def to_bytes(self) -> bytes: 112 parts = [struct.pack("!B", len(self.cloves))] 113 for clove in self.cloves: 114 parts.append(clove.to_bytes()) 115 return b"".join(parts) 116 117 @classmethod 118 def from_bytes(cls, data: bytes) -> "GarlicMessage": 119 count = data[0] 120 offset = 1 121 cloves = [] 122 for _ in range(count): 123 clove = GarlicClove.from_bytes(data, offset) 124 cloves.append(clove) 125 offset += clove._size 126 return cls(cloves)