unoffical wafrn mirror
wafrn.net
atproto
social-network
activitypub
1import express, { Request, Response } from "express";
2import cors from "cors";
3import bodyParser from "body-parser";
4import { logger } from "./utils/logger.js";
5
6import {
7 workerInbox,
8 workerPrepareSendPost,
9 workerGetUser,
10 workerSendPostChunk,
11 workerProcessFirehose,
12 workerDeletePost,
13 workerProcessRemotePostView,
14 workerProcessRemoteMediaData,
15 workerGenerateUserKeyPair,
16} from "./utils/workers.js";
17
18import { SignedRequest } from "./interfaces/fediverse/signedRequest.js";
19import { activityPubRoutes } from "./routes/activitypub/activitypub.js";
20import { wellKnownRoutes } from "./routes/activitypub/well-known.js";
21import adminRoutes from "./routes/admin.js";
22import blockRoutes from "./routes/blocks.js";
23import blockUserServerRoutes from "./routes/blockUserServer.js";
24import dashboardRoutes from "./routes/dashboard.js";
25import deletePost from "./routes/deletePost.js";
26import emojiReactRoutes from "./routes/emojiReact.js";
27import emojiRoutes from "./routes/emojis.js";
28import followsRoutes from "./routes/follows.js";
29import forumRoutes from "./routes/forum.js";
30import { frontend } from "./routes/frontend.js";
31import likeRoutes from "./routes/like.js";
32import biteRoutes from "./routes/bite.js";
33import listRoutes from "./routes/lists.js";
34import mediaRoutes from "./routes/media.js";
35import muteRoutes from "./routes/mute.js";
36import { notificationRoutes } from "./routes/notifications.js";
37import pollRoutes from "./routes/polls.js";
38import postsRoutes from "./routes/posts.js";
39import searchRoutes from "./routes/search.js";
40import silencePostRoutes from "./routes/silencePost.js";
41import statusRoutes from "./routes/status.js";
42import { userRoutes } from "./routes/users.js";
43import checkIpBlocked from "./utils/checkIpBlocked.js";
44import overrideContentType from "./utils/overrideContentType.js";
45import swagger from "swagger-ui-express";
46import { readFile } from "fs/promises";
47import { Worker } from "bullmq";
48import expressWs from "express-ws";
49import websocketRoutes from "./routes/websocket.js";
50import { followHashtagRoutes } from "./routes/followHashtags.js";
51import { completeEnvironment } from "./utils/backendOptions.js";
52import cron from "node-cron";
53import { nukeBannedUsers } from "./utils/maintenanceTasks/nukeBannedUsers.js";
54import { sequelize } from "./models/sequelize.js";
55import { Op, Sequelize } from "sequelize";
56import { Post } from "./models/post.js";
57import { User } from "./models/index.js";
58import { follow } from "./utils/follow.js";
59import { getAdminUser } from "./utils/getAdminAndDeletedUser.js";
60
61const PORT = completeEnvironment.port;
62const app = express();
63const wsServer = expressWs(app);
64const server = wsServer.app;
65websocketRoutes(server);
66
67cron.schedule("0 2 * * *", async () => {
68 // maintenance tasks
69 sequelize.query("VACUUM ANALYZE").then(() => {
70 logger.info(`postgres vacuum analyze executed`);
71 });
72 nukeBannedUsers().then(() => {
73 logger.info(`NukeBannedUsers Done`);
74 });
75});
76
77server.listen(PORT, completeEnvironment.listenIp, () => {
78 logger.info("started websocket");
79});
80
81const queryInterface = sequelize.getQueryInterface();
82
83if (completeEnvironment.autoFollowAdmin) {
84 try {
85 const users = await User.findAll({
86 where: {
87 banned: {
88 [Op.ne]: true,
89 },
90 email: {
91 [Op.ne]: null,
92 },
93 },
94 });
95 const adminUser = await getAdminUser();
96 await Promise.all(users.map((x) => follow(x.id, adminUser.id)));
97 } catch {}
98}
99let postIndexes = await queryInterface.showIndex("posts");
100
101if (
102 !(postIndexes as Array<any>).some((index) => index.name === "post_bsky_uri")
103) {
104 logger.info(
105 `ATTENTION: your server doesnt seem to have an unique index on bskyuri. this is a bug. we will investigate soon in a future release`
106 );
107 // clearDuplicatedBskyUris().then(async (res) => {
108 // // well turns out that we dont have indexes
109 // // we have cleaned duplicated before. if a duplicate apears here we just crash and do it again :3
110 // await queryInterface.sequelize.query(
111 // `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS post_bsky_uri ON "posts" ("bskyUri");`
112 // );
113 // });
114}
115
116async function clearDuplicatedBskyUris(): Promise<boolean> {
117 let duplicatedURIs: any = await queryInterface.sequelize.query(
118 `SELECT "a"."id" FROM "posts" as "a" LEFT JOIN "posts" as "b" ON "a"."bskyUri" = "b"."bskyUri" WHERE "a"."bskyUri" IS NOT NULL AND "a"."id" != "b"."id" LIMIT 50`
119 );
120 while (duplicatedURIs[1].rowCount) {
121 const postIds: string[] = duplicatedURIs[0].map((elem: any) => elem.id);
122 for await (const postId of postIds) {
123 const originalPost = await Post.findByPk(postId);
124 if (originalPost && originalPost.bskyUri) {
125 const duplicatePosts = await Post.findAll({
126 where: {
127 id: {
128 [Op.ne]: postId,
129 },
130 bskyUri: originalPost.bskyUri,
131 },
132 });
133 if (duplicatePosts && duplicatePosts.length > 0) {
134 // TIME TO CLEAN.
135 const duplicatedPostIds = duplicatePosts.map((elem) => elem.id);
136 await Post.update(
137 {
138 parentId: postId,
139 },
140 {
141 where: {
142 parentId: {
143 [Op.in]: duplicatedPostIds,
144 },
145 },
146 }
147 );
148 await Post.destroy({
149 where: {
150 id: {
151 [Op.in]: duplicatedPostIds,
152 },
153 },
154 });
155 }
156 }
157 }
158 duplicatedURIs = await queryInterface.sequelize.query(
159 `SELECT "a"."id" FROM "posts" as "a" LEFT JOIN "posts" as "b" ON "a"."bskyUri" = "b"."bskyUri" WHERE "a"."bskyUri" IS NOT NULL AND "a"."id" != "b"."id" LIMIT 50`
160 );
161 }
162
163 return true;
164}