+14
atproto/util.py
+14
atproto/util.py
···
1
+
from multiprocessing import Value
2
+
3
+
4
+
URI = "at://"
5
+
URI_LEN = len(URI)
6
+
7
+
8
+
class AtUri:
9
+
@classmethod
10
+
def record_uri(cls, uri: str) -> tuple[str, str, str]:
11
+
did, collection, rid = uri[URI_LEN:].split("/")
12
+
if not (did and collection and rid):
13
+
raise ValueError(f"Ivalid record uri {uri}!")
14
+
return did, collection, rid
+4
-4
bluesky/info.py
+4
-4
bluesky/info.py
···
1
1
from abc import ABC, abstractmethod
2
-
from typing import Any
2
+
from typing import Any, override
3
3
4
4
from atproto.identity import did_resolver, handle_resolver
5
5
from cross.service import Service
6
-
from util.util import LOGGER, normalize_service_url
6
+
from util.util import normalize_service_url
7
7
8
8
SERVICE = "https://bsky.app"
9
9
···
36
36
if not did:
37
37
if not handle:
38
38
raise KeyError("No did: or atproto handle provided!")
39
-
LOGGER.info("Resolving ATP identity for %s...", handle)
39
+
self.log.info("Resolving ATP identity for %s...", handle)
40
40
self.did = handle_resolver.resolve_handle(handle)
41
41
42
42
if not pds:
43
-
LOGGER.info("Resolving PDS from %s DID document...", did)
43
+
self.log.info("Resolving PDS from %s DID document...", did)
44
44
atp_pds = did_resolver.resolve_did(self.did).get_atproto_pds()
45
45
if not atp_pds:
46
46
raise Exception("Failed to resolve atproto pds for %s")
+102
-7
bluesky/input.py
+102
-7
bluesky/input.py
···
7
7
8
8
import websockets
9
9
10
+
from atproto.util import AtUri
10
11
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
12
+
from cross.attachments import LabelsAttachment, LanguagesAttachment, RemoteUrlAttachment
13
+
from cross.post import Post
11
14
from cross.service import InputService
12
15
from database.connection import DatabasePool
13
-
from util.util import LOGGER, normalize_service_url
16
+
from util.util import normalize_service_url
14
17
15
18
16
19
@dataclass(kw_only=True)
···
50
53
super().__init__(SERVICE, db)
51
54
52
55
def _on_post(self, record: dict[str, Any]):
53
-
LOGGER.info(record) # TODO
56
+
post_uri = cast(str, record["$xpost.strongRef"]["uri"])
57
+
post_cid = cast(str, record["$xpost.strongRef"]["cid"])
58
+
59
+
parent_uri = cast(
60
+
str,
61
+
None if not record.get("reply") else record["reply"]["parent"]["uri"]
62
+
)
63
+
parent = None
64
+
if parent_uri:
65
+
parent = self._get_post(self.url, self.did, parent_uri)
66
+
if not parent:
67
+
self.log.info("Skipping %s, parent %s not found in db", post_uri, parent_uri)
68
+
return
69
+
70
+
post = Post(id=post_uri, parent_id=parent_uri, text=record["text"])
71
+
did, _, rid = AtUri.record_uri(post_uri)
72
+
post.attachments.put(RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}"))
73
+
74
+
# TODO Media Attachments
75
+
embed = record.get("embed", {})
76
+
if embed:
77
+
match cast(str, embed["$type"]):
78
+
case "app.bsky.embed.record" | "app.bsky.embed.recordWithMedia":
79
+
_, collection, _ = AtUri.record_uri(
80
+
cast(str, embed["record"]["uri"])
81
+
)
82
+
if collection == "app.bsky.feed.post":
83
+
self.log.info("Skipping '%s'! Quote..", post_uri)
84
+
return
85
+
case _:
86
+
self.log.warning(f"Unhandled embedd type {embed['$type']}")
87
+
pass
88
+
89
+
if "langs" in record:
90
+
post.attachments.put(
91
+
LanguagesAttachment(langs=record["langs"])
92
+
)
93
+
if "labels" in record:
94
+
post.attachments.put(
95
+
LabelsAttachment(
96
+
labels=[
97
+
label["val"].replace("-", " ") for label in record["values"]
98
+
]
99
+
),
100
+
)
101
+
102
+
if parent:
103
+
self._insert_post({
104
+
"user": self.did,
105
+
"service": self.url,
106
+
"identifier": post_uri,
107
+
"parent": parent['id'],
108
+
"root": parent['id'] if not parent['root'] else parent['root'],
109
+
"extra_data": json.dumps({'cid': post_cid})
110
+
})
111
+
else:
112
+
self._insert_post({
113
+
"user": self.did,
114
+
"service": self.url,
115
+
"identifier": post_uri,
116
+
"extra_data": json.dumps({'cid': post_cid})
117
+
})
118
+
119
+
for out in self.outputs:
120
+
self.submitter(lambda: out.accept_post(post))
54
121
55
122
def _on_repost(self, record: dict[str, Any]):
56
-
LOGGER.info(record) # TODO
123
+
post_uri = cast(str, record["$xpost.strongRef"]["uri"])
124
+
post_cid = cast(str, record["$xpost.strongRef"]["cid"])
125
+
126
+
reposted_uri = cast(str, record["subject"]["uri"])
127
+
reposted = self._get_post(self.url, self.did, reposted_uri)
128
+
if not reposted:
129
+
self.log.info("Skipping repost '%s' as reposted post '%s' was not found in the db.")
130
+
return
131
+
132
+
self._insert_post({
133
+
"user": self.did,
134
+
"service": self.url,
135
+
"identifier": post_uri,
136
+
"reposted": reposted['id'],
137
+
"extra_data": json.dumps({'cid': post_cid})
138
+
})
139
+
140
+
for out in self.outputs:
141
+
self.submitter(lambda: out.accept_repost(post_uri, reposted_uri))
57
142
58
143
def _on_delete_post(self, post_id: str, repost: bool):
59
-
LOGGER.info("%s | %s", post_id, repost) # TODO
144
+
post = self._get_post(self.url, self.did, post_id)
145
+
if not post:
146
+
return
147
+
148
+
if repost:
149
+
for output in self.outputs:
150
+
self.submitter(lambda: output.delete_repost(post_id))
151
+
else:
152
+
for output in self.outputs:
153
+
self.submitter(lambda: output.delete_post(post_id))
154
+
self._delete_post_by_id(post['id'])
60
155
61
156
62
157
class BlueskyJetstreamInputService(BlueskyBaseInputService):
···
116
211
117
212
async for ws in websockets.connect(url):
118
213
try:
119
-
LOGGER.info("Listening to %s...", self.options.jetstream)
214
+
self.log.info("Listening to %s...", self.options.jetstream)
120
215
121
216
async def listen_for_messages():
122
217
async for msg in ws:
···
126
221
127
222
_ = await asyncio.gather(listen)
128
223
except websockets.ConnectionClosedError as e:
129
-
LOGGER.error(e, stack_info=True, exc_info=True)
130
-
LOGGER.info("Reconnecting to %s...", self.options.jetstream)
224
+
self.log.error(e, stack_info=True, exc_info=True)
225
+
self.log.info("Reconnecting to %s...", self.options.jetstream)
131
226
continue
+8
-8
cross/attachments.py
+8
-8
cross/attachments.py
···
1
1
from dataclasses import dataclass
2
2
3
3
4
-
@dataclass
4
+
@dataclass(kw_only=True)
5
5
class Attachment:
6
6
pass
7
7
8
8
9
-
@dataclass
10
-
class SpoilerAttachment(Attachment):
11
-
spoiler: str
9
+
@dataclass(kw_only=True)
10
+
class LabelsAttachment(Attachment):
11
+
labels: list[str]
12
12
13
13
14
-
@dataclass
14
+
@dataclass(kw_only=True)
15
15
class LanguagesAttachment(Attachment):
16
16
langs: list[str]
17
17
18
18
19
-
@dataclass
19
+
@dataclass(kw_only=True)
20
20
class SensitiveAttachment(Attachment):
21
21
sensitive: bool
22
22
23
23
24
-
@dataclass
24
+
@dataclass(kw_only=True)
25
25
class RemoteUrlAttachment(Attachment):
26
26
url: str
27
27
28
28
29
-
@dataclass
29
+
@dataclass(kw_only=True)
30
30
class QuoteAttachment(Attachment):
31
31
quoted_id: str
+4
-4
cross/fragments.py
+4
-4
cross/fragments.py
···
1
1
from dataclasses import dataclass
2
2
3
3
4
-
@dataclass
4
+
@dataclass(kw_only=True)
5
5
class Fragment:
6
6
start: int
7
7
end: int
8
8
9
9
10
-
@dataclass
10
+
@dataclass(kw_only=True)
11
11
class LinkFragment(Fragment):
12
12
url: str
13
13
14
14
15
-
@dataclass
15
+
@dataclass(kw_only=True)
16
16
class TagFragment(Fragment):
17
17
tag: str
18
18
19
19
20
-
@dataclass
20
+
@dataclass(kw_only=True)
21
21
class MentionFragment(Fragment):
22
22
uri: str
+6
-3
cross/post.py
+6
-3
cross/post.py
···
11
11
def __init__(self) -> None:
12
12
self._map: dict[type, Attachment] = {}
13
13
14
-
def put(self, cls: type[T], attachment: T) -> None:
15
-
self._map[cls] = attachment
14
+
def put(self, attachment: Attachment) -> None:
15
+
self._map[attachment.__class__] = attachment
16
16
17
17
def get(self, cls: type[T]) -> T | None:
18
18
instance = self._map.get(cls)
···
22
22
raise TypeError(f"Expected {cls.__name__}, got {type(instance).__name__}")
23
23
return instance
24
24
25
+
def __repr__(self) -> str:
26
+
return f"AttachmentKeeper(_map={self._map.values()})"
27
+
25
28
26
29
@dataclass
27
30
class Post:
28
31
id: str
29
32
parent_id: str | None
30
33
text: str # utf-8 text
31
-
attachments: AttachmentKeeper
34
+
attachments: AttachmentKeeper = field(default_factory=AttachmentKeeper)
32
35
fragments: list[Fragment] = field(default_factory=list)
+56
-9
cross/service.py
+56
-9
cross/service.py
···
1
1
import sqlite3
2
2
from abc import ABC, abstractmethod
3
-
from typing import Callable, cast
3
+
from typing import Any, Callable, cast
4
+
import logging
4
5
5
6
from cross.post import Post
6
7
from database.connection import DatabasePool
7
-
from util.util import LOGGER
8
+
9
+
columns: list[str] = [
10
+
"user",
11
+
"service",
12
+
"identifier",
13
+
"parent",
14
+
"root",
15
+
"reposted",
16
+
"extra_data",
17
+
]
18
+
placeholders: str = ", ".join(["?" for _ in columns])
19
+
column_names: str = ", ".join(columns)
8
20
9
21
10
22
class Service:
11
23
def __init__(self, url: str, db: DatabasePool) -> None:
12
24
self.url: str = url
13
25
self.db: DatabasePool = db
26
+
self.log: logging.Logger = logging.getLogger(self.__class__.__name__)
14
27
# self._lock: threading.Lock = threading.Lock()
15
28
16
-
def get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None:
29
+
def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None:
17
30
cursor = self.db.get_conn().cursor()
18
31
_ = cursor.execute(
19
32
"""
20
33
SELECT * FROM posts
21
34
WHERE service = ?
22
-
AND user_id = ?
35
+
AND user = ?
23
36
AND identifier = ?
24
37
""",
25
38
(url, user, identifier),
26
39
)
27
40
return cast(sqlite3.Row, cursor.fetchone())
28
41
29
-
def get_post_by_id(self, id: int) -> sqlite3.Row | None:
42
+
def _get_post_by_id(self, id: int) -> sqlite3.Row | None:
30
43
cursor = self.db.get_conn().cursor()
31
44
_ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
32
45
return cast(sqlite3.Row, cursor.fetchone())
33
46
47
+
def _insert_post(self, post_data: dict[str, Any]):
48
+
values = [post_data.get(col) for col in columns]
49
+
cursor = self.db.get_conn().cursor()
50
+
_ = cursor.execute(
51
+
f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values
52
+
)
53
+
54
+
def _insert_post_mapping(self, original: int, mapped: int):
55
+
cursor = self.db.get_conn().cursor()
56
+
_ = cursor.execute(
57
+
"INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);",
58
+
(original, mapped),
59
+
)
60
+
_ = cursor.execute(
61
+
"INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);",
62
+
(mapped, original),
63
+
)
64
+
65
+
def _delete_post(self, url: str, user: str, identifier: str):
66
+
cursor = self.db.get_conn().cursor()
67
+
_ = cursor.execute(
68
+
"""
69
+
DELETE FROM posts
70
+
WHERE identifier = ?
71
+
AND service = ?
72
+
AND user = ?
73
+
""",
74
+
(identifier, url, user),
75
+
)
76
+
77
+
def _delete_post_by_id(self, id: int):
78
+
cursor = self.db.get_conn().cursor()
79
+
_ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,))
80
+
34
81
35
82
class OutputService(Service):
36
83
def accept_post(self, post: Post):
37
-
LOGGER.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
84
+
self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
38
85
39
86
def delete_post(self, post_id: str):
40
-
LOGGER.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
87
+
self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
41
88
42
89
def accept_repost(self, repost_id: str, reposted_id: str):
43
-
LOGGER.warning(
90
+
self.log.warning(
44
91
"NOT IMPLEMENTED (%s), accept_repost %s of %s",
45
92
self.url,
46
93
repost_id,
···
48
95
)
49
96
50
97
def delete_repost(self, repost_id: str):
51
-
LOGGER.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
98
+
self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
52
99
53
100
54
101
class InputService(ABC, Service):
+3
-3
mastodon/info.py
+3
-3
mastodon/info.py
···
5
5
import requests
6
6
7
7
from cross.service import Service
8
-
from util.util import LOGGER, normalize_service_url
8
+
from util.util import normalize_service_url
9
9
10
10
def validate_and_transform(data: dict[str, Any]):
11
11
if 'token' not in data or 'instance' not in data:
···
64
64
headers={"Authorization": f"Bearer {token}"},
65
65
)
66
66
if responce.status_code != 200:
67
-
LOGGER.error("Failed to validate user credentials!")
67
+
self.log.error("Failed to validate user credentials!")
68
68
responce.raise_for_status()
69
69
return dict(responce.json())
70
70
···
75
75
headers={"Authorization": f"Bearer {token}"},
76
76
)
77
77
if responce.status_code != 200:
78
-
LOGGER.error("Failed to get instance info!")
78
+
self.log.error("Failed to get instance info!")
79
79
responce.raise_for_status()
80
80
return dict(responce.json())
81
81
+7
-8
mastodon/input.py
+7
-8
mastodon/input.py
···
9
9
from cross.service import InputService
10
10
from database.connection import DatabasePool
11
11
from mastodon.info import MastodonService, validate_and_transform
12
-
from util.util import LOGGER
13
12
14
13
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
15
14
···
43
42
super().__init__(options.instance, db)
44
43
self.options: MastodonInputOptions = options
45
44
46
-
LOGGER.info("Verifying %s credentails...", self.url)
45
+
self.log.info("Verifying %s credentails...", self.url)
47
46
responce = self.verify_credentials()
48
47
self.user_id: str = responce["id"]
49
48
50
-
LOGGER.info("Getting %s configuration...", self.url)
49
+
self.log.info("Getting %s configuration...", self.url)
51
50
responce = self.fetch_instance_info()
52
51
self.streaming_url: str = responce["urls"]["streaming_api"]
53
52
···
56
55
return self.options.token
57
56
58
57
def _on_create_post(self, status: dict[str, Any]):
59
-
LOGGER.info(status) # TODO
58
+
self.log.info(status) # TODO
60
59
61
60
def _on_delete_post(self, status_id: str):
62
-
LOGGER.info(status_id) # TODO
61
+
self.log.info(status_id) # TODO
63
62
64
63
def _accept_msg(self, msg: websockets.Data) -> None:
65
64
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
···
79
78
url, additional_headers={"Authorization": f"Bearer {self.options.token}"}
80
79
):
81
80
try:
82
-
LOGGER.info("Listening to %s...", self.streaming_url)
81
+
self.log.info("Listening to %s...", self.streaming_url)
83
82
84
83
async def listen_for_messages():
85
84
async for msg in ws:
···
89
88
90
89
_ = await asyncio.gather(listen)
91
90
except websockets.ConnectionClosedError as e:
92
-
LOGGER.error(e, stack_info=True, exc_info=True)
93
-
LOGGER.info("Reconnecting to %s...", self.streaming_url)
91
+
self.log.error(e, stack_info=True, exc_info=True)
92
+
self.log.info("Reconnecting to %s...", self.streaming_url)
94
93
continue
+2
-3
mastodon/output.py
+2
-3
mastodon/output.py
···
4
4
from cross.service import OutputService
5
5
from database.connection import DatabasePool
6
6
from mastodon.info import InstanceInfo, MastodonService, validate_and_transform
7
-
from util.util import LOGGER
8
7
9
8
ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
10
9
···
32
31
super().__init__(options.instance, db)
33
32
self.options: MastodonOutputOptions = options
34
33
35
-
LOGGER.info("Verifying %s credentails...", self.url)
34
+
self.log.info("Verifying %s credentails...", self.url)
36
35
responce = self.verify_credentials()
37
36
self.user_id: str = responce["id"]
38
37
39
-
LOGGER.info("Getting %s configuration...", self.url)
38
+
self.log.info("Getting %s configuration...", self.url)
40
39
responce = self.fetch_instance_info()
41
40
self.instance_info: InstanceInfo = InstanceInfo.from_api(responce)
42
41
+3
-2
migrations/001_initdb.sql
+3
-2
migrations/001_initdb.sql
···
1
1
CREATE TABLE IF NOT EXISTS posts (
2
-
id INTEGER PRIMARY KEY AUTOINCREMENT,
2
+
id INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
3
3
user TEXT NOT NULL,
4
4
service TEXT NOT NULL,
5
5
identifier TEXT NOT NULL,
···
11
11
12
12
CREATE TABLE IF NOT EXISTS mappings (
13
13
original INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
14
-
mapped INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE
14
+
mapped INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
15
+
UNIQUE(original, mapped)
15
16
);
+5
migrations/002_add_indexes.sql
+5
migrations/002_add_indexes.sql
+1
-2
misskey/info.py
+1
-2
misskey/info.py
···
3
3
import requests
4
4
5
5
from cross.service import Service
6
-
from util.util import LOGGER
7
6
8
7
9
8
class MisskeyService(ABC, Service):
···
14
13
headers={"Content-Type": "application/json"},
15
14
)
16
15
if responce.status_code != 200:
17
-
LOGGER.error("Failed to validate user credentials!")
16
+
self.log.error("Failed to validate user credentials!")
18
17
responce.raise_for_status()
19
18
return dict(responce.json())
20
19
+7
-7
misskey/input.py
+7
-7
misskey/input.py
···
10
10
from cross.service import InputService
11
11
from database.connection import DatabasePool
12
12
from misskey.info import MisskeyService
13
-
from util.util import LOGGER, normalize_service_url
13
+
from util.util import normalize_service_url
14
14
15
15
ALLOWED_VISIBILITY = ["public", "home"]
16
16
···
44
44
super().__init__(options.instance, db)
45
45
self.options: MisskeyInputOptions = options
46
46
47
-
LOGGER.info("Verifying %s credentails...", self.url)
47
+
self.log.info("Verifying %s credentails...", self.url)
48
48
responce = self.verify_credentials()
49
49
self.user_id: str = responce["id"]
50
50
···
53
53
return self.options.token
54
54
55
55
def _on_note(self, note: dict[str, Any]):
56
-
LOGGER.info(note) # TODO
56
+
self.log.info(note) # TODO
57
57
58
58
def _accept_msg(self, msg: websockets.Data) -> None:
59
59
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
···
74
74
}
75
75
)
76
76
)
77
-
LOGGER.info("Subscribed to 'homeTimeline' channel...")
77
+
self.log.info("Subscribed to 'homeTimeline' channel...")
78
78
79
79
@override
80
80
async def listen(self):
···
83
83
84
84
async for ws in websockets.connect(url):
85
85
try:
86
-
LOGGER.info("Listening to %s...", streaming)
86
+
self.log.info("Listening to %s...", streaming)
87
87
await self._subscribe_to_home(ws)
88
88
89
89
async def listen_for_messages():
···
94
94
95
95
_ = await asyncio.gather(listen)
96
96
except websockets.ConnectionClosedError as e:
97
-
LOGGER.error(e, stack_info=True, exc_info=True)
98
-
LOGGER.info("Reconnecting to %s...", streaming)
97
+
self.log.error(e, stack_info=True, exc_info=True)
98
+
self.log.info("Reconnecting to %s...", streaming)
99
99
continue
+3
-1
registry_bootstrap.py
+3
-1
registry_bootstrap.py
···
18
18
options_class = getattr(module, self.options_class_name)
19
19
return service_class(db, options_class.from_dict(d))
20
20
21
-
22
21
def bootstrap():
23
22
input_factories["mastodon-wss"] = LazyFactory(
24
23
"mastodon.input", "MastodonInputService", "MastodonInputOptions"
···
29
28
input_factories["bluesky-jetstream"] = LazyFactory(
30
29
"bluesky.input", "BlueskyJetstreamInputService", "BlueskyJetstreamInputOptions"
31
30
)
31
+
output_factories['stderr'] = LazyFactory(
32
+
"util.dummy", "StderrOutputService", "DummyOptions"
33
+
)
+29
util/dummy.py
+29
util/dummy.py
···
1
+
from typing import override
2
+
from cross.post import Post
3
+
from cross.service import OutputService
4
+
from database.connection import DatabasePool
5
+
6
+
class DummyOptions:
7
+
@classmethod
8
+
def from_dict(cls, obj) -> 'DummyOptions':
9
+
return DummyOptions()
10
+
11
+
class StderrOutputService(OutputService):
12
+
def __init__(self, db: DatabasePool, options: DummyOptions) -> None:
13
+
super().__init__("http://localhost", db)
14
+
15
+
@override
16
+
def accept_post(self, post: Post):
17
+
self.log.info("%s", post)
18
+
19
+
@override
20
+
def accept_repost(self, repost_id: str, reposted_id: str):
21
+
self.log.info("%s, %s", repost_id, reposted_id)
22
+
23
+
@override
24
+
def delete_post(self, post_id: str):
25
+
self.log.info("%s", post_id)
26
+
27
+
@override
28
+
def delete_repost(self, repost_id: str):
29
+
self.log.info("%s", repost_id)