A Python port of the Invisible Internet Project (I2P)
1"""Garlic encryption types — DeliveryInstructions, GarlicClove, GarlicMessage."""
2
3import enum
4import struct
5
6
7class DeliveryType(enum.IntEnum):
8 LOCAL = 0
9 DESTINATION = 1
10 ROUTER = 2
11 TUNNEL = 3
12
13
14class DeliveryInstructions:
15 """Delivery instructions for a garlic clove."""
16
17 def __init__(self, delivery_type: DeliveryType, *,
18 destination: bytes | None = None,
19 router: bytes | None = None,
20 tunnel_id: int | None = None):
21 self.delivery_type = delivery_type
22 self.destination = destination
23 self.router = router
24 self.tunnel_id = tunnel_id
25
26 def to_bytes(self) -> bytes:
27 # Flag byte: delivery type in bits 5-6
28 flag = (self.delivery_type & 0x03) << 5
29 parts = [struct.pack("!B", flag)]
30
31 if self.delivery_type == DeliveryType.DESTINATION:
32 assert self.destination is not None
33 parts.append(self.destination)
34 elif self.delivery_type == DeliveryType.ROUTER:
35 assert self.router is not None
36 parts.append(self.router)
37 elif self.delivery_type == DeliveryType.TUNNEL:
38 assert self.router is not None
39 parts.append(self.router)
40 parts.append(struct.pack("!I", self.tunnel_id))
41
42 return b"".join(parts)
43
44 @classmethod
45 def from_bytes(cls, data: bytes) -> tuple["DeliveryInstructions", int]:
46 flag = data[0]
47 delivery_type = DeliveryType((flag >> 5) & 0x03)
48 offset = 1
49 destination = None
50 router = None
51 tunnel_id = None
52
53 if delivery_type == DeliveryType.DESTINATION:
54 destination = data[offset:offset + 32]
55 offset += 32
56 elif delivery_type == DeliveryType.ROUTER:
57 router = data[offset:offset + 32]
58 offset += 32
59 elif delivery_type == DeliveryType.TUNNEL:
60 router = data[offset:offset + 32]
61 offset += 32
62 tunnel_id = struct.unpack("!I", data[offset:offset + 4])[0]
63 offset += 4
64
65 return cls(delivery_type, destination=destination, router=router,
66 tunnel_id=tunnel_id), offset
67
68
69class GarlicClove:
70 """A single garlic clove: delivery instructions + message + id + expiration."""
71
72 def __init__(self, delivery_instructions: DeliveryInstructions,
73 message_data: bytes, clove_id: int, expiration: int):
74 self.delivery_instructions = delivery_instructions
75 self.message_data = message_data
76 self.clove_id = clove_id
77 self.expiration = expiration
78 self._size: int = 0
79
80 def to_bytes(self) -> bytes:
81 di_bytes = self.delivery_instructions.to_bytes()
82 return (di_bytes +
83 struct.pack("!I", len(self.message_data)) +
84 self.message_data +
85 struct.pack("!IQ", self.clove_id, self.expiration))
86
87 @classmethod
88 def from_bytes(cls, data: bytes, offset: int = 0) -> "GarlicClove":
89 di, di_len = DeliveryInstructions.from_bytes(data[offset:])
90 pos = offset + di_len
91 msg_len = struct.unpack("!I", data[pos:pos + 4])[0]
92 pos += 4
93 message_data = data[pos:pos + msg_len]
94 pos += msg_len
95 clove_id, expiration = struct.unpack("!IQ", data[pos:pos + 12])
96 pos += 12
97 clove = cls(di, message_data, clove_id, expiration)
98 clove._size = pos - offset
99 return clove
100
101
102class GarlicMessage:
103 """Bundle of garlic cloves."""
104
105 def __init__(self, cloves: list[GarlicClove]):
106 self.cloves = cloves
107
108 def clove_count(self) -> int:
109 return len(self.cloves)
110
111 def to_bytes(self) -> bytes:
112 parts = [struct.pack("!B", len(self.cloves))]
113 for clove in self.cloves:
114 parts.append(clove.to_bytes())
115 return b"".join(parts)
116
117 @classmethod
118 def from_bytes(cls, data: bytes) -> "GarlicMessage":
119 count = data[0]
120 offset = 1
121 cloves = []
122 for _ in range(count):
123 clove = GarlicClove.from_bytes(data, offset)
124 cloves.append(clove)
125 offset += clove._size
126 return cls(cloves)