-16
src/atpasser/blob/__init__.py
-16
src/atpasser/blob/__init__.py
···
1
-
import cid
2
-
import multihash, hashlib
3
-
4
-
5
-
def generateCID(file):
6
-
hasher = hashlib.new("sha-256")
7
-
while True:
8
-
chunk = file.read(8192)
9
-
if not chunk:
10
-
break
11
-
hasher.update(chunk)
12
-
13
-
digest = hasher.digest
14
-
mh = multihash.encode(digest, "sha-256")
15
-
16
-
return cid.CIDv1(codec="raw", multihash=mh)
-1
src/atpasser/data/__init__.py
-1
src/atpasser/data/__init__.py
···
1
-
from ._wrapper import *
-76
src/atpasser/data/_data.py
-76
src/atpasser/data/_data.py
···
1
-
import base64
2
-
from cid import CIDv0, CIDv1, cid, make_cid
3
-
import json
4
-
5
-
6
-
class Data:
7
-
"""
8
-
A class representing data with "$type" key.
9
-
10
-
Attributes:
11
-
type (str): The type of the data.
12
-
json (str): Original object in JSON format.
13
-
"""
14
-
15
-
def __init__(self, dataType: str, json: str = "{}") -> None:
16
-
"""
17
-
Initalizes data object.
18
-
19
-
Parameters:
20
-
type (str): The type of the data.
21
-
json (str): Original object in JSON format.
22
-
"""
23
-
self.type = dataType
24
-
self.json = json
25
-
26
-
def data(self):
27
-
"""
28
-
Loads data as a Python-friendly format.
29
-
30
-
Returns:
31
-
dict: Converted data from JSON object.
32
-
"""
33
-
return json.loads(self.json, object_hook=dataHook)
34
-
35
-
36
-
def dataHook(data: dict):
37
-
"""
38
-
Treated as `JSONDecoder`'s `object_hook`
39
-
40
-
Parameters:
41
-
data: data in format that `JSONDecoder` like ;)
42
-
"""
43
-
if "$bytes" in data:
44
-
return base64.b64decode(data["$bytes"])
45
-
elif "$link" in data:
46
-
return make_cid(data["$link"])
47
-
elif "$type" in data:
48
-
dataType = data["$type"]
49
-
del data["$type"]
50
-
return Data(dataType, json.dumps(data))
51
-
else:
52
-
return data
53
-
54
-
55
-
def _convertDataToFakeJSON(data):
56
-
if isinstance(data, bytes):
57
-
return {"$bytes": base64.b64encode(data)}
58
-
elif isinstance(data, (CIDv0, CIDv1)):
59
-
return {"link": data.encode()}
60
-
elif isinstance(data, dict):
61
-
for item in data:
62
-
data[item] = _convertDataToFakeJSON(data[item])
63
-
elif isinstance(data, (tuple, list, set)):
64
-
return [_convertDataToFakeJSON(item) for item in data]
65
-
else:
66
-
return data
67
-
68
-
69
-
class DataEncoder(json.JSONEncoder):
70
-
"""
71
-
A superset of `JSONEncoder` to support ATProto data.
72
-
"""
73
-
74
-
def default(self, o):
75
-
result = _convertDataToFakeJSON(o)
76
-
return super().default(result)
-61
src/atpasser/data/_wrapper.py
-61
src/atpasser/data/_wrapper.py
···
1
-
from json import loads
2
-
from typing import Callable, Any
3
-
from ._data import *
4
-
import functools
5
-
6
-
# Pyright did the whole job. Thank it.
7
-
8
-
9
-
class DataDecoder(json.JSONDecoder):
10
-
"""
11
-
A superset of `JSONDecoder` to support ATProto data.
12
-
"""
13
-
14
-
def __init__(
15
-
self,
16
-
*,
17
-
object_hook: Callable[[dict[str, Any]], Any] | None = dataHook,
18
-
parse_float: Callable[[str], Any] | None = None,
19
-
parse_int: Callable[[str], Any] | None = None,
20
-
parse_constant: Callable[[str], Any] | None = None,
21
-
strict: bool = True,
22
-
object_pairs_hook: Callable[[list[tuple[str, Any]]], Any] | None = None,
23
-
) -> None:
24
-
super().__init__(
25
-
object_hook=object_hook,
26
-
parse_float=parse_float,
27
-
parse_int=parse_int,
28
-
parse_constant=parse_constant,
29
-
strict=strict,
30
-
object_pairs_hook=object_pairs_hook,
31
-
)
32
-
33
-
34
-
# Screw it. I have to make 4 `json`-like functions.
35
-
36
-
37
-
def _dataDecoratorForDump(func):
38
-
@functools.wraps(func)
39
-
def wrapper(obj, *args, **kwargs):
40
-
kwargs.setdefault("cls", DataEncoder)
41
-
return func(obj, *args, **kwargs)
42
-
43
-
return wrapper
44
-
45
-
46
-
def _dataDecoratorForLoad(func):
47
-
@functools.wraps(func)
48
-
def wrapper(obj, *args, **kwargs):
49
-
kwargs.setdefault("cls", DataDecoder)
50
-
return func(obj, *args, **kwargs)
51
-
52
-
return wrapper
53
-
54
-
55
-
dump = _dataDecoratorForDump(json.dump)
56
-
dumps = _dataDecoratorForDump(json.dumps)
57
-
load = _dataDecoratorForLoad(json.load)
58
-
loads = _dataDecoratorForLoad(json.loads)
59
-
"""
60
-
Wrapper of the JSON functions to support ATProto data.
61
-
"""
-137
src/atpasser/data/cbor.py
-137
src/atpasser/data/cbor.py
···
1
-
from datetime import tzinfo
2
-
import typing
3
-
import cbor2
4
-
import cid
5
-
6
-
from .data import dataHook, Data
7
-
8
-
9
-
def tagHook(decoder: cbor2.CBORDecoder, tag: cbor2.CBORTag, shareable_index=None):
10
-
"""
11
-
A simple tag hook for CID support.
12
-
"""
13
-
return cid.from_bytes(tag.value) if tag.tag == 42 else tag
14
-
15
-
16
-
class CBOREncoder(cbor2.CBOREncoder):
17
-
"""
18
-
Wrapper of cbor2.CBOREncoder.
19
-
"""
20
-
21
-
def __init__(
22
-
self,
23
-
fp: typing.IO[bytes],
24
-
datetime_as_timestamp: bool = False,
25
-
timezone: tzinfo | None = None,
26
-
value_sharing: bool = False,
27
-
default: (
28
-
typing.Callable[[cbor2.CBOREncoder, typing.Any], typing.Any] | None
29
-
) = None,
30
-
canonical: bool = False,
31
-
date_as_datetime: bool = False,
32
-
string_referencing: bool = False,
33
-
indefinite_containers: bool = False,
34
-
):
35
-
super().__init__(
36
-
fp,
37
-
datetime_as_timestamp,
38
-
timezone,
39
-
value_sharing,
40
-
default,
41
-
canonical,
42
-
date_as_datetime,
43
-
string_referencing,
44
-
indefinite_containers,
45
-
)
46
-
47
-
@cbor2.shareable_encoder
48
-
def cidOrDataEncoder(self: cbor2.CBOREncoder, value: cid.CIDv0 | cid.CIDv1 | Data):
49
-
"""
50
-
Encode CID or Data to CBOR Tag.
51
-
"""
52
-
if isinstance(value, (cid.CIDv0, cid.CIDv1)):
53
-
self.encode(cbor2.CBORTag(42, value.encode()))
54
-
elif isinstance(value, Data):
55
-
self.encode(value.data())
56
-
57
-
58
-
def _cborObjectHook(decoder: cbor2.CBORDecoder, value):
59
-
return dataHook(value)
60
-
61
-
62
-
class CBORDecoder(cbor2.CBORDecoder):
63
-
"""
64
-
Wrapper of cbor2.CBORDecoder.
65
-
"""
66
-
67
-
def __init__(
68
-
self,
69
-
fp: typing.IO[bytes],
70
-
tag_hook: (
71
-
typing.Callable[[cbor2.CBORDecoder, cbor2.CBORTag], typing.Any] | None
72
-
) = tagHook,
73
-
object_hook: (
74
-
typing.Callable[
75
-
[cbor2.CBORDecoder, dict[typing.Any, typing.Any]], typing.Any
76
-
]
77
-
| None
78
-
) = _cborObjectHook,
79
-
str_errors: typing.Literal["strict", "error", "replace"] = "strict",
80
-
):
81
-
super().__init__(fp, tag_hook, object_hook, str_errors)
82
-
83
-
84
-
# Make things for CBOR again.
85
-
86
-
from io import BytesIO
87
-
88
-
89
-
def dumps(
90
-
obj: object,
91
-
datetime_as_timestamp: bool = False,
92
-
timezone: tzinfo | None = None,
93
-
value_sharing: bool = False,
94
-
default: typing.Callable[[cbor2.CBOREncoder, typing.Any], typing.Any] | None = None,
95
-
canonical: bool = False,
96
-
date_as_datetime: bool = False,
97
-
string_referencing: bool = False,
98
-
indefinite_containers: bool = False,
99
-
) -> bytes:
100
-
with BytesIO() as fp:
101
-
CBOREncoder(
102
-
fp,
103
-
datetime_as_timestamp=datetime_as_timestamp,
104
-
timezone=timezone,
105
-
value_sharing=value_sharing,
106
-
default=default,
107
-
canonical=canonical,
108
-
date_as_datetime=date_as_datetime,
109
-
string_referencing=string_referencing,
110
-
indefinite_containers=indefinite_containers,
111
-
).encode(obj)
112
-
return fp.getvalue()
113
-
114
-
115
-
def dump(
116
-
obj: object,
117
-
fp: typing.IO[bytes],
118
-
datetime_as_timestamp: bool = False,
119
-
timezone: tzinfo | None = None,
120
-
value_sharing: bool = False,
121
-
default: typing.Callable[[cbor2.CBOREncoder, typing.Any], typing.Any] | None = None,
122
-
canonical: bool = False,
123
-
date_as_datetime: bool = False,
124
-
string_referencing: bool = False,
125
-
indefinite_containers: bool = False,
126
-
) -> None:
127
-
CBOREncoder(
128
-
fp,
129
-
datetime_as_timestamp=datetime_as_timestamp,
130
-
timezone=timezone,
131
-
value_sharing=value_sharing,
132
-
default=default,
133
-
canonical=canonical,
134
-
date_as_datetime=date_as_datetime,
135
-
string_referencing=string_referencing,
136
-
indefinite_containers=indefinite_containers,
137
-
).encode(obj)
-4
tests/__init__.py
-4
tests/__init__.py
-179
tests/_strings.py
-179
tests/_strings.py
···
1
-
from atpasser import did, handle, nsid, rKey, uri
2
-
3
-
4
-
testStrings, testMethods = {}, {}
5
-
6
-
7
-
testStrings[
8
-
"did"
9
-
10
-
] = """did:plc:z72i7hdynmk6r22z27h6tvur
11
-
12
-
did:web:blueskyweb.xyz
13
-
14
-
did:method:val:two
15
-
16
-
did:m:v
17
-
18
-
did:method::::val
19
-
20
-
did:method:-:_:.
21
-
22
-
did:key:zQ3shZc2QzApp2oymGvQbzP8eKheVshBHbU4ZYjeXqwSKEn6N
23
-
24
-
did:METHOD:val
25
-
26
-
did:m123:val
27
-
28
-
DID:method:val
29
-
did:method:
30
-
31
-
did:method:val/two
32
-
33
-
did:method:val?two
34
-
35
-
did:method:val#two"""
36
-
37
-
testMethods["did"] = did.DID
38
-
39
-
40
-
testStrings[
41
-
"handle"
42
-
43
-
] = """jay.bsky.social
44
-
45
-
8.cn
46
-
47
-
name.t--t
48
-
49
-
XX.LCS.MIT.EDU
50
-
a.co
51
-
52
-
xn--notarealidn.com
53
-
54
-
xn--fiqa61au8b7zsevnm8ak20mc4a87e.xn--fiqs8s
55
-
56
-
xn--ls8h.test
57
-
example.t
58
-
59
-
jo@hn.test
60
-
61
-
💩.tes
62
-
t
63
-
john..test
64
-
65
-
xn--bcher-.tld
66
-
67
-
john.0
68
-
69
-
cn.8
70
-
71
-
www.masełkowski.pl.com
72
-
73
-
org
74
-
75
-
name.org.
76
-
77
-
2gzyxa5ihm7nsggfxnu52rck2vv4rvmdlkiu3zzui5du4xyclen53wid.onion
78
-
laptop.local
79
-
80
-
blah.arpa"""
81
-
82
-
testMethods["handle"] = handle.Handle
83
-
84
-
85
-
testStrings[
86
-
"nsid"
87
-
88
-
] = """com.example.fooBar
89
-
90
-
net.users.bob.ping
91
-
92
-
a-0.b-1.c
93
-
94
-
a.b.c
95
-
96
-
com.example.fooBarV2
97
-
98
-
cn.8.lex.stuff
99
-
100
-
com.exa💩ple.thin
101
-
com.example
102
-
103
-
com.example.3"""
104
-
105
-
testMethods["nsid"] = nsid.NSID
106
-
107
-
108
-
testStrings[
109
-
110
-
"rkey"
111
-
112
-
] = """3jui7kd54zh2y
113
-
self
114
-
example.com
115
-
116
-
~1.2-3_
117
-
118
-
dHJ1ZQ
119
-
pre:fix
120
-
121
-
_
122
-
123
-
alpha/beta
124
-
.
125
-
..
126
-
127
-
#extra
128
-
129
-
@handle
130
-
131
-
any space
132
-
133
-
any+space
134
-
135
-
number[3]
136
-
137
-
number(3)
138
-
139
-
"quote"
140
-
141
-
dHJ1ZQ=="""
142
-
143
-
testMethods["rkey"] = rKey.RKey
144
-
145
-
146
-
testStrings[
147
-
"uri"
148
-
149
-
] = """at://foo.com/com.example.foo/123
150
-
151
-
at://foo.com/example/123
152
-
153
-
at://computer
154
-
155
-
at://example.com:3000
156
-
157
-
at://foo.com/
158
-
159
-
at://user:pass@foo.com"""
160
-
161
-
testMethods["uri"] = uri.URI
162
-
163
-
164
-
for item in testMethods:
165
-
166
-
print(f"START TEST {item}")
167
-
168
-
for value in testStrings[item].splitlines():
169
-
170
-
print(f"Value: {value}")
171
-
172
-
try:
173
-
174
-
print(f"str(): {str(testMethods[item](value))}")
175
-
176
-
except Exception as e:
177
-
178
-
print(f"× {e}")
179
-