Highly ambitious ATProtocol AppView service and sdks
1CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
2
3-- The UDT for creating messages
4CREATE TYPE mq_new_t AS (
5 -- Unique message ID
6 id UUID,
7 -- Delay before message is processed
8 delay INTERVAL,
9 -- Number of retries if initial processing fails
10 retries INT,
11 -- Initial backoff between retries
12 retry_backoff INTERVAL,
13 -- Name of channel
14 channel_name TEXT,
15 -- Arguments to channel
16 channel_args TEXT,
17 -- Interval for two-phase commit (or NULL to disable two-phase commit)
18 commit_interval INTERVAL,
19 -- Whether this message should be processed in order with respect to other
20 -- ordered messages.
21 ordered BOOLEAN,
22 -- Name of message
23 name TEXT,
24 -- JSON payload
25 payload_json TEXT,
26 -- Binary payload
27 payload_bytes BYTEA
28);
29
30-- Small, frequently updated table of messages
31CREATE TABLE mq_msgs (
32 id UUID PRIMARY KEY,
33 created_at TIMESTAMPTZ DEFAULT NOW(),
34 attempt_at TIMESTAMPTZ DEFAULT NOW(),
35 attempts INT NOT NULL DEFAULT 5,
36 retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second',
37 channel_name TEXT NOT NULL,
38 channel_args TEXT NOT NULL,
39 commit_interval INTERVAL,
40 after_message_id UUID DEFAULT '00000000-0000-0000-0000-000000000000'::uuid REFERENCES mq_msgs(id) ON DELETE SET DEFAULT
41);
42
43-- Insert dummy message so that the 'nil' UUID can be referenced
44INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES ('00000000-0000-0000-0000-000000000000'::uuid, '', '', NULL);
45
46-- Internal helper function to check that a UUID is neither NULL nor NIL
47CREATE FUNCTION mq_uuid_exists(
48 id UUID
49) RETURNS BOOLEAN AS $$
50 SELECT id IS NOT NULL AND id != '00000000-0000-0000-0000-000000000000'::uuid
51$$ LANGUAGE SQL IMMUTABLE;
52
53-- Index for polling
54CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != '00000000-0000-0000-0000-000000000000'::uuid AND NOT mq_uuid_exists(after_message_id);
55-- Index for adding messages
56CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != '00000000-0000-0000-0000-000000000000'::uuid AND after_message_id IS NOT NULL;
57
58-- Index for ensuring strict message order
59CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
60
61
62-- Large, less frequently updated table of message payloads
63CREATE TABLE mq_payloads(
64 id UUID PRIMARY KEY,
65 name TEXT NOT NULL,
66 payload_json JSONB,
67 payload_bytes BYTEA
68);
69
70-- Internal helper function to return the most recently added message in a queue.
71CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT)
72RETURNS UUID AS $$
73 SELECT COALESCE(
74 (
75 SELECT id FROM mq_msgs
76 WHERE channel_name = from_channel_name
77 AND channel_args = from_channel_args
78 AND after_message_id IS NOT NULL
79 AND id != '00000000-0000-0000-0000-000000000000'::uuid
80 ORDER BY created_at DESC, id DESC
81 LIMIT 1
82 ),
83 '00000000-0000-0000-0000-000000000000'::uuid
84 )
85$$ LANGUAGE SQL STABLE;
86
87-- Internal helper function to randomly select a set of channels with "ready" messages.
88CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT)
89RETURNS TABLE(name TEXT, args TEXT) AS $$
90 SELECT channel_name, channel_args
91 FROM mq_msgs
92 WHERE id != '00000000-0000-0000-0000-000000000000'::uuid
93 AND attempt_at <= NOW()
94 AND (channel_names IS NULL OR channel_name = ANY(channel_names))
95 AND NOT mq_uuid_exists(after_message_id)
96 GROUP BY channel_name, channel_args
97 ORDER BY RANDOM()
98 LIMIT batch_size
99$$ LANGUAGE SQL STABLE;
100
101-- Main entry-point for job runner: pulls a batch of messages from the queue.
102CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
103RETURNS TABLE(
104 id UUID,
105 is_committed BOOLEAN,
106 name TEXT,
107 payload_json TEXT,
108 payload_bytes BYTEA,
109 retry_backoff INTERVAL,
110 wait_time INTERVAL
111) AS $$
112BEGIN
113 RETURN QUERY UPDATE mq_msgs
114 SET
115 attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
116 attempts = mq_msgs.attempts - 1,
117 retry_backoff = mq_msgs.retry_backoff * 2
118 FROM (
119 SELECT
120 msgs.id
121 FROM mq_active_channels(channel_names, batch_size) AS active_channels
122 INNER JOIN LATERAL (
123 SELECT * FROM mq_msgs
124 WHERE mq_msgs.id != '00000000-0000-0000-0000-000000000000'::uuid
125 AND mq_msgs.attempt_at <= NOW()
126 AND mq_msgs.channel_name = active_channels.name
127 AND mq_msgs.channel_args = active_channels.args
128 AND NOT mq_uuid_exists(mq_msgs.after_message_id)
129 ORDER BY mq_msgs.attempt_at ASC
130 LIMIT batch_size
131 ) AS msgs ON TRUE
132 LIMIT batch_size
133 ) AS messages_to_update
134 LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
135 WHERE mq_msgs.id = messages_to_update.id
136 RETURNING
137 mq_msgs.id,
138 mq_msgs.commit_interval IS NULL,
139 mq_payloads.name,
140 mq_payloads.payload_json::TEXT,
141 mq_payloads.payload_bytes,
142 mq_msgs.retry_backoff / 2,
143 interval '0' AS wait_time;
144
145 IF NOT FOUND THEN
146 RETURN QUERY SELECT
147 NULL::UUID,
148 NULL::BOOLEAN,
149 NULL::TEXT,
150 NULL::TEXT,
151 NULL::BYTEA,
152 NULL::INTERVAL,
153 MIN(mq_msgs.attempt_at) - NOW()
154 FROM mq_msgs
155 WHERE mq_msgs.id != '00000000-0000-0000-0000-000000000000'::uuid
156 AND NOT mq_uuid_exists(mq_msgs.after_message_id)
157 AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
158 END IF;
159END;
160$$ LANGUAGE plpgsql;
161
162-- Creates new messages
163CREATE FUNCTION mq_insert(new_messages mq_new_t[])
164RETURNS VOID AS $$
165BEGIN
166 PERFORM pg_notify(CONCAT('mq_', channel_name), '')
167 FROM unnest(new_messages) AS new_msgs
168 GROUP BY channel_name;
169
170 IF FOUND THEN
171 PERFORM pg_notify('mq', '');
172 END IF;
173
174 INSERT INTO mq_payloads (
175 id,
176 name,
177 payload_json,
178 payload_bytes
179 ) SELECT
180 id,
181 name,
182 payload_json::JSONB,
183 payload_bytes
184 FROM UNNEST(new_messages);
185
186 INSERT INTO mq_msgs (
187 id,
188 attempt_at,
189 attempts,
190 retry_backoff,
191 channel_name,
192 channel_args,
193 commit_interval,
194 after_message_id
195 )
196 SELECT
197 id,
198 NOW() + delay + COALESCE(commit_interval, INTERVAL '0'),
199 retries + 1,
200 retry_backoff,
201 channel_name,
202 channel_args,
203 commit_interval,
204 CASE WHEN ordered
205 THEN
206 LAG(id, 1, mq_latest_message(channel_name, channel_args))
207 OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id)
208 ELSE
209 NULL
210 END
211 FROM UNNEST(new_messages);
212END;
213$$ LANGUAGE plpgsql;
214
215-- Commits messages previously created with a non-NULL commit interval.
216CREATE FUNCTION mq_commit(msg_ids UUID[])
217RETURNS VOID AS $$
218BEGIN
219 UPDATE mq_msgs
220 SET
221 attempt_at = attempt_at - commit_interval,
222 commit_interval = NULL
223 WHERE id = ANY(msg_ids)
224 AND commit_interval IS NOT NULL;
225END;
226$$ LANGUAGE plpgsql;
227
228
229-- Deletes messages from the queue. This occurs when a message has been
230-- processed, or when it expires without being processed.
231CREATE FUNCTION mq_delete(msg_ids UUID[])
232RETURNS VOID AS $$
233BEGIN
234 PERFORM pg_notify(CONCAT('mq_', channel_name), '')
235 FROM mq_msgs
236 WHERE id = ANY(msg_ids)
237 AND after_message_id = '00000000-0000-0000-0000-000000000000'::uuid
238 GROUP BY channel_name;
239
240 IF FOUND THEN
241 PERFORM pg_notify('mq', '');
242 END IF;
243
244 DELETE FROM mq_msgs WHERE id = ANY(msg_ids);
245 DELETE FROM mq_payloads WHERE id = ANY(msg_ids);
246END;
247$$ LANGUAGE plpgsql;
248
249
250-- Can be called during the initial commit interval, or when processing
251-- a message. Indicates that the caller is still active and will prevent either
252-- the commit interval elapsing or the message being retried for the specified
253-- interval.
254CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL)
255RETURNS VOID AS $$
256 UPDATE mq_msgs
257 SET
258 attempt_at = NOW() + duration,
259 commit_interval = commit_interval + ((NOW() + duration) - attempt_at)
260 WHERE id = ANY(msg_ids)
261 AND attempt_at < NOW() + duration;
262$$ LANGUAGE SQL;
263
264
265-- Called during lengthy processing of a message to checkpoint the progress.
266-- As well as behaving like `mq_keep_alive`, the message payload can be
267-- updated.
268CREATE FUNCTION mq_checkpoint(
269 msg_id UUID,
270 duration INTERVAL,
271 new_payload_json TEXT,
272 new_payload_bytes BYTEA,
273 extra_retries INT
274)
275RETURNS VOID AS $$
276 UPDATE mq_msgs
277 SET
278 attempt_at = GREATEST(attempt_at, NOW() + duration),
279 attempts = attempts + COALESCE(extra_retries, 0)
280 WHERE id = msg_id;
281
282 UPDATE mq_payloads
283 SET
284 payload_json = COALESCE(new_payload_json::JSONB, payload_json),
285 payload_bytes = COALESCE(new_payload_bytes, payload_bytes)
286 WHERE
287 id = msg_id;
288$$ LANGUAGE SQL;
289