Highly ambitious ATProtocol AppView service and sdks
at main 9.2 kB view raw
1CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; 2 3-- The UDT for creating messages 4CREATE TYPE mq_new_t AS ( 5 -- Unique message ID 6 id UUID, 7 -- Delay before message is processed 8 delay INTERVAL, 9 -- Number of retries if initial processing fails 10 retries INT, 11 -- Initial backoff between retries 12 retry_backoff INTERVAL, 13 -- Name of channel 14 channel_name TEXT, 15 -- Arguments to channel 16 channel_args TEXT, 17 -- Interval for two-phase commit (or NULL to disable two-phase commit) 18 commit_interval INTERVAL, 19 -- Whether this message should be processed in order with respect to other 20 -- ordered messages. 21 ordered BOOLEAN, 22 -- Name of message 23 name TEXT, 24 -- JSON payload 25 payload_json TEXT, 26 -- Binary payload 27 payload_bytes BYTEA 28); 29 30-- Small, frequently updated table of messages 31CREATE TABLE mq_msgs ( 32 id UUID PRIMARY KEY, 33 created_at TIMESTAMPTZ DEFAULT NOW(), 34 attempt_at TIMESTAMPTZ DEFAULT NOW(), 35 attempts INT NOT NULL DEFAULT 5, 36 retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second', 37 channel_name TEXT NOT NULL, 38 channel_args TEXT NOT NULL, 39 commit_interval INTERVAL, 40 after_message_id UUID DEFAULT '00000000-0000-0000-0000-000000000000'::uuid REFERENCES mq_msgs(id) ON DELETE SET DEFAULT 41); 42 43-- Insert dummy message so that the 'nil' UUID can be referenced 44INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES ('00000000-0000-0000-0000-000000000000'::uuid, '', '', NULL); 45 46-- Internal helper function to check that a UUID is neither NULL nor NIL 47CREATE FUNCTION mq_uuid_exists( 48 id UUID 49) RETURNS BOOLEAN AS $$ 50 SELECT id IS NOT NULL AND id != '00000000-0000-0000-0000-000000000000'::uuid 51$$ LANGUAGE SQL IMMUTABLE; 52 53-- Index for polling 54CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != '00000000-0000-0000-0000-000000000000'::uuid AND NOT mq_uuid_exists(after_message_id); 55-- Index for adding messages 56CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != '00000000-0000-0000-0000-000000000000'::uuid AND after_message_id IS NOT NULL; 57 58-- Index for ensuring strict message order 59CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); 60 61 62-- Large, less frequently updated table of message payloads 63CREATE TABLE mq_payloads( 64 id UUID PRIMARY KEY, 65 name TEXT NOT NULL, 66 payload_json JSONB, 67 payload_bytes BYTEA 68); 69 70-- Internal helper function to return the most recently added message in a queue. 71CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) 72RETURNS UUID AS $$ 73 SELECT COALESCE( 74 ( 75 SELECT id FROM mq_msgs 76 WHERE channel_name = from_channel_name 77 AND channel_args = from_channel_args 78 AND after_message_id IS NOT NULL 79 AND id != '00000000-0000-0000-0000-000000000000'::uuid 80 ORDER BY created_at DESC, id DESC 81 LIMIT 1 82 ), 83 '00000000-0000-0000-0000-000000000000'::uuid 84 ) 85$$ LANGUAGE SQL STABLE; 86 87-- Internal helper function to randomly select a set of channels with "ready" messages. 88CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) 89RETURNS TABLE(name TEXT, args TEXT) AS $$ 90 SELECT channel_name, channel_args 91 FROM mq_msgs 92 WHERE id != '00000000-0000-0000-0000-000000000000'::uuid 93 AND attempt_at <= NOW() 94 AND (channel_names IS NULL OR channel_name = ANY(channel_names)) 95 AND NOT mq_uuid_exists(after_message_id) 96 GROUP BY channel_name, channel_args 97 ORDER BY RANDOM() 98 LIMIT batch_size 99$$ LANGUAGE SQL STABLE; 100 101-- Main entry-point for job runner: pulls a batch of messages from the queue. 102CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) 103RETURNS TABLE( 104 id UUID, 105 is_committed BOOLEAN, 106 name TEXT, 107 payload_json TEXT, 108 payload_bytes BYTEA, 109 retry_backoff INTERVAL, 110 wait_time INTERVAL 111) AS $$ 112BEGIN 113 RETURN QUERY UPDATE mq_msgs 114 SET 115 attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, 116 attempts = mq_msgs.attempts - 1, 117 retry_backoff = mq_msgs.retry_backoff * 2 118 FROM ( 119 SELECT 120 msgs.id 121 FROM mq_active_channels(channel_names, batch_size) AS active_channels 122 INNER JOIN LATERAL ( 123 SELECT * FROM mq_msgs 124 WHERE mq_msgs.id != '00000000-0000-0000-0000-000000000000'::uuid 125 AND mq_msgs.attempt_at <= NOW() 126 AND mq_msgs.channel_name = active_channels.name 127 AND mq_msgs.channel_args = active_channels.args 128 AND NOT mq_uuid_exists(mq_msgs.after_message_id) 129 ORDER BY mq_msgs.attempt_at ASC 130 LIMIT batch_size 131 ) AS msgs ON TRUE 132 LIMIT batch_size 133 ) AS messages_to_update 134 LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id 135 WHERE mq_msgs.id = messages_to_update.id 136 RETURNING 137 mq_msgs.id, 138 mq_msgs.commit_interval IS NULL, 139 mq_payloads.name, 140 mq_payloads.payload_json::TEXT, 141 mq_payloads.payload_bytes, 142 mq_msgs.retry_backoff / 2, 143 interval '0' AS wait_time; 144 145 IF NOT FOUND THEN 146 RETURN QUERY SELECT 147 NULL::UUID, 148 NULL::BOOLEAN, 149 NULL::TEXT, 150 NULL::TEXT, 151 NULL::BYTEA, 152 NULL::INTERVAL, 153 MIN(mq_msgs.attempt_at) - NOW() 154 FROM mq_msgs 155 WHERE mq_msgs.id != '00000000-0000-0000-0000-000000000000'::uuid 156 AND NOT mq_uuid_exists(mq_msgs.after_message_id) 157 AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); 158 END IF; 159END; 160$$ LANGUAGE plpgsql; 161 162-- Creates new messages 163CREATE FUNCTION mq_insert(new_messages mq_new_t[]) 164RETURNS VOID AS $$ 165BEGIN 166 PERFORM pg_notify(CONCAT('mq_', channel_name), '') 167 FROM unnest(new_messages) AS new_msgs 168 GROUP BY channel_name; 169 170 IF FOUND THEN 171 PERFORM pg_notify('mq', ''); 172 END IF; 173 174 INSERT INTO mq_payloads ( 175 id, 176 name, 177 payload_json, 178 payload_bytes 179 ) SELECT 180 id, 181 name, 182 payload_json::JSONB, 183 payload_bytes 184 FROM UNNEST(new_messages); 185 186 INSERT INTO mq_msgs ( 187 id, 188 attempt_at, 189 attempts, 190 retry_backoff, 191 channel_name, 192 channel_args, 193 commit_interval, 194 after_message_id 195 ) 196 SELECT 197 id, 198 NOW() + delay + COALESCE(commit_interval, INTERVAL '0'), 199 retries + 1, 200 retry_backoff, 201 channel_name, 202 channel_args, 203 commit_interval, 204 CASE WHEN ordered 205 THEN 206 LAG(id, 1, mq_latest_message(channel_name, channel_args)) 207 OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id) 208 ELSE 209 NULL 210 END 211 FROM UNNEST(new_messages); 212END; 213$$ LANGUAGE plpgsql; 214 215-- Commits messages previously created with a non-NULL commit interval. 216CREATE FUNCTION mq_commit(msg_ids UUID[]) 217RETURNS VOID AS $$ 218BEGIN 219 UPDATE mq_msgs 220 SET 221 attempt_at = attempt_at - commit_interval, 222 commit_interval = NULL 223 WHERE id = ANY(msg_ids) 224 AND commit_interval IS NOT NULL; 225END; 226$$ LANGUAGE plpgsql; 227 228 229-- Deletes messages from the queue. This occurs when a message has been 230-- processed, or when it expires without being processed. 231CREATE FUNCTION mq_delete(msg_ids UUID[]) 232RETURNS VOID AS $$ 233BEGIN 234 PERFORM pg_notify(CONCAT('mq_', channel_name), '') 235 FROM mq_msgs 236 WHERE id = ANY(msg_ids) 237 AND after_message_id = '00000000-0000-0000-0000-000000000000'::uuid 238 GROUP BY channel_name; 239 240 IF FOUND THEN 241 PERFORM pg_notify('mq', ''); 242 END IF; 243 244 DELETE FROM mq_msgs WHERE id = ANY(msg_ids); 245 DELETE FROM mq_payloads WHERE id = ANY(msg_ids); 246END; 247$$ LANGUAGE plpgsql; 248 249 250-- Can be called during the initial commit interval, or when processing 251-- a message. Indicates that the caller is still active and will prevent either 252-- the commit interval elapsing or the message being retried for the specified 253-- interval. 254CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL) 255RETURNS VOID AS $$ 256 UPDATE mq_msgs 257 SET 258 attempt_at = NOW() + duration, 259 commit_interval = commit_interval + ((NOW() + duration) - attempt_at) 260 WHERE id = ANY(msg_ids) 261 AND attempt_at < NOW() + duration; 262$$ LANGUAGE SQL; 263 264 265-- Called during lengthy processing of a message to checkpoint the progress. 266-- As well as behaving like `mq_keep_alive`, the message payload can be 267-- updated. 268CREATE FUNCTION mq_checkpoint( 269 msg_id UUID, 270 duration INTERVAL, 271 new_payload_json TEXT, 272 new_payload_bytes BYTEA, 273 extra_retries INT 274) 275RETURNS VOID AS $$ 276 UPDATE mq_msgs 277 SET 278 attempt_at = GREATEST(attempt_at, NOW() + duration), 279 attempts = attempts + COALESCE(extra_retries, 0) 280 WHERE id = msg_id; 281 282 UPDATE mq_payloads 283 SET 284 payload_json = COALESCE(new_payload_json::JSONB, payload_json), 285 payload_bytes = COALESCE(new_payload_bytes, payload_bytes) 286 WHERE 287 id = msg_id; 288$$ LANGUAGE SQL; 289