···11+-- migrate:up
22+33+-- Add validator_js column to lexicon table for JavaScript validators
44+ALTER TABLE lexicon ADD COLUMN validator_js TEXT;
55+66+-- migrate:down
77+88+-- SQLite doesn't support DROP COLUMN in older versions, so we recreate the table
99+CREATE TABLE lexicon_backup AS SELECT id, json, created_at FROM lexicon;
1010+DROP TABLE lexicon;
1111+CREATE TABLE lexicon (
1212+ id TEXT PRIMARY KEY NOT NULL,
1313+ json TEXT NOT NULL,
1414+ created_at TEXT NOT NULL DEFAULT (datetime('now'))
1515+);
1616+INSERT INTO lexicon SELECT * FROM lexicon_backup;
1717+DROP TABLE lexicon_backup;
1818+CREATE INDEX IF NOT EXISTS idx_lexicon_created_at ON lexicon(created_at DESC);
···11+-- migrate:up
22+33+-- Add validator_js column to lexicon table for JavaScript validators
44+ALTER TABLE lexicon ADD COLUMN validator_js TEXT;
55+66+-- migrate:down
77+88+ALTER TABLE lexicon DROP COLUMN validator_js;
+3-2
server/db/schema.sql
···2626 id TEXT PRIMARY KEY NOT NULL,
2727 json TEXT NOT NULL,
2828 created_at TEXT NOT NULL DEFAULT (datetime('now'))
2929-);
2929+, validator_js TEXT);
3030CREATE INDEX idx_lexicon_created_at ON lexicon(created_at DESC);
3131CREATE TABLE config (
3232 key TEXT PRIMARY KEY NOT NULL,
···243243 ('20241210000001'),
244244 ('20241227000001'),
245245 ('20251229000001'),
246246- ('20251230000001');
246246+ ('20251230000001'),
247247+ ('20260120000001');
···4040import mist
4141import pubsub
4242import stats_pubsub
4343+import validator
4344import wisp
4445import wisp/wisp_mist
4546···9495 stats_pubsub.start()
9596 logging.log(logging.Info, "[server] Stats PubSub registry initialized")
96979898+ // Start persistent validator worker for fast JavaScript validation
9999+ // If Node.js is not available, start_worker logs a warning and validation is disabled
100100+ case validator.start_worker() {
101101+ Ok(_) ->
102102+ case validator.is_node_available() {
103103+ True ->
104104+ logging.log(
105105+ logging.Info,
106106+ "[server] Validator worker started (persistent Node.js process)",
107107+ )
108108+ False ->
109109+ // Warning already logged by start_worker
110110+ Nil
111111+ }
112112+ Error(err) ->
113113+ logging.log(
114114+ logging.Warning,
115115+ "[server] Failed to start validator worker: " <> err,
116116+ )
117117+ }
118118+97119 // Start activity cleanup scheduler
98120 case activity_cleanup.start(db) {
99121 Ok(_cleanup_subject) ->
+74
server/src/validator.gleam
···11+/// JavaScript validator execution module
22+/// Allows running JavaScript validation functions against records during ingestion
33+/// Result of running a validator
44+pub type ValidatorResult {
55+ /// Validator returned true - record is valid
66+ Valid
77+ /// Validator returned false - record is invalid
88+ Invalid
99+ /// Validator failed to execute
1010+ ValidationError(String)
1111+}
1212+1313+/// Start the persistent validator worker process
1414+/// This keeps a Node.js process running for fast validation
1515+/// Returns Ok(Nil) if started successfully, Error if already running or failed
1616+pub fn start_worker() -> Result(Nil, String) {
1717+ case do_start_worker() {
1818+ Ok(_) -> Ok(Nil)
1919+ Error(reason) -> Error(reason)
2020+ }
2121+}
2222+2323+/// Stop the validator worker process
2424+pub fn stop_worker() -> Nil {
2525+ do_stop_worker()
2626+}
2727+2828+@external(erlang, "validator_ffi", "start_worker")
2929+fn do_start_worker() -> Result(Nil, String)
3030+3131+@external(erlang, "validator_ffi", "stop_worker")
3232+fn do_stop_worker() -> Nil
3333+3434+/// Run a JavaScript validator against a record JSON
3535+/// The validator_js should export a default function that takes the record object
3636+/// and returns true if valid, false if invalid
3737+pub fn run_validator(
3838+ validator_js: String,
3939+ record_json: String,
4040+) -> ValidatorResult {
4141+ case do_run_validator(validator_js, record_json) {
4242+ Ok(True) -> Valid
4343+ Ok(False) -> Invalid
4444+ Error(reason) -> ValidationError(reason)
4545+ }
4646+}
4747+4848+/// Validate that a JavaScript validator script is well-formed and can be executed
4949+/// Tests the validator with an empty object {} to ensure it returns a boolean
5050+/// Returns Ok(Nil) if valid, Error(reason) if invalid
5151+pub fn validate_script(validator_js: String) -> Result(Nil, String) {
5252+ do_validate_script(validator_js)
5353+}
5454+5555+/// FFI to Erlang validator runner
5656+@external(erlang, "validator_ffi", "run_validator")
5757+fn do_run_validator(
5858+ validator_js: String,
5959+ record_json: String,
6060+) -> Result(Bool, String)
6161+6262+/// FFI to Erlang validator script checker
6363+@external(erlang, "validator_ffi", "validate_script")
6464+fn do_validate_script(validator_js: String) -> Result(Nil, String)
6565+6666+/// Check if Node.js is available on the system
6767+/// Returns True if node is available, False otherwise
6868+/// The result is cached after the first check
6969+pub fn is_node_available() -> Bool {
7070+ do_is_node_available()
7171+}
7272+7373+@external(erlang, "validator_ffi", "is_node_available")
7474+fn do_is_node_available() -> Bool
+152
server/src/validator_ffi.erl
···11+-module(validator_ffi).
22+-export([run_validator/2, validate_script/1, start_worker/0, stop_worker/0]).
33+-export([is_node_available/0]).
44+55+-define(NODE_AVAILABLE_KEY, {?MODULE, node_available}).
66+77+%% Run a JavaScript validator against a record
88+%% validator_js: The JavaScript code (default export must be a function returning boolean)
99+%% record_json: The JSON string of the record to validate
1010+%% Returns: {ok, true} | {ok, false} | {error, Reason}
1111+run_validator(ValidatorJs, RecordJson) ->
1212+ case is_node_available() of
1313+ false ->
1414+ %% Node.js not available, skip validation (treat as valid)
1515+ {ok, true};
1616+ true ->
1717+ case try_worker_validate(ValidatorJs, RecordJson) of
1818+ {ok, Result} -> {ok, Result};
1919+ {error, _} = Err -> Err;
2020+ worker_not_running ->
2121+ %% Worker not running, skip validation (treat as valid)
2222+ {ok, true}
2323+ end
2424+ end.
2525+2626+%% Validate that a JavaScript validator script is well-formed
2727+%% Tests that the script exports a function
2828+%% Returns: {ok, nil} | {error, Reason}
2929+validate_script(ValidatorJs) ->
3030+ case is_node_available() of
3131+ false ->
3232+ %% Node.js not available, skip validation
3333+ {ok, nil};
3434+ true ->
3535+ case try_worker_check(ValidatorJs) of
3636+ {ok, nil} -> {ok, nil};
3737+ {error, _} = Err -> Err;
3838+ worker_not_running ->
3939+ %% Worker not running, skip validation
4040+ {ok, nil}
4141+ end
4242+ end.
4343+4444+%% ===================================================================
4545+%% Worker-based validation (fast path)
4646+%% ===================================================================
4747+4848+try_worker_validate(ValidatorJs, RecordJson) ->
4949+ case whereis(validator_worker) of
5050+ undefined -> worker_not_running;
5151+ _Pid ->
5252+ try
5353+ validator_worker:run_validator(ValidatorJs, RecordJson)
5454+ catch
5555+ exit:{noproc, _} -> worker_not_running;
5656+ exit:{timeout, _} -> {error, <<"Validator timeout">>};
5757+ _:_ -> worker_not_running
5858+ end
5959+ end.
6060+6161+try_worker_check(ValidatorJs) ->
6262+ case whereis(validator_worker) of
6363+ undefined -> worker_not_running;
6464+ _Pid ->
6565+ try
6666+ validator_worker:check_script(ValidatorJs)
6767+ catch
6868+ exit:{noproc, _} -> worker_not_running;
6969+ exit:{timeout, _} -> {error, <<"Validator timeout">>};
7070+ _:_ -> worker_not_running
7171+ end
7272+ end.
7373+7474+%% ===================================================================
7575+%% Worker lifecycle management
7676+%% ===================================================================
7777+7878+%% Start the persistent validator worker
7979+%% First checks if Node.js is available, logs warning and returns ok if not
8080+start_worker() ->
8181+ case check_and_cache_node_availability() of
8282+ false ->
8383+ %% Node.js not available - already logged warning in check
8484+ {ok, nil};
8585+ true ->
8686+ case whereis(validator_worker) of
8787+ undefined ->
8888+ case validator_worker:start_link() of
8989+ {ok, _Pid} -> {ok, nil};
9090+ {error, {already_started, _Pid}} -> {ok, nil};
9191+ {error, Reason} -> {error, iolist_to_binary(io_lib:format("~p", [Reason]))}
9292+ end;
9393+ _Pid ->
9494+ {ok, nil}
9595+ end
9696+ end.
9797+9898+%% Stop the persistent validator worker
9999+stop_worker() ->
100100+ case whereis(validator_worker) of
101101+ undefined -> nil;
102102+ _Pid ->
103103+ try
104104+ validator_worker:stop(),
105105+ nil
106106+ catch
107107+ _:_ -> nil
108108+ end
109109+ end.
110110+111111+%% ===================================================================
112112+%% Node.js availability checking
113113+%% ===================================================================
114114+115115+%% Check if Node.js is available (cached result)
116116+%% Returns true if available, false otherwise
117117+is_node_available() ->
118118+ try
119119+ persistent_term:get(?NODE_AVAILABLE_KEY)
120120+ catch
121121+ error:badarg ->
122122+ %% Not yet checked, do the check now
123123+ check_and_cache_node_availability()
124124+ end.
125125+126126+%% Check if Node.js is available and cache the result
127127+%% Logs a warning if not available
128128+check_and_cache_node_availability() ->
129129+ Result = do_check_node_available(),
130130+ persistent_term:put(?NODE_AVAILABLE_KEY, Result),
131131+ case Result of
132132+ false ->
133133+ logger:warning(<<"[validator] Node.js not found - custom validators are disabled. Install Node.js to enable validation.">>);
134134+ true ->
135135+ ok
136136+ end,
137137+ Result.
138138+139139+%% Actually check if node is available by running 'node --version'
140140+do_check_node_available() ->
141141+ %% Use 'which node' or 'node --version' to check availability
142142+ %% os:cmd returns "" if the command isn't found (depending on shell)
143143+ %% We'll try 'node --version' and check if output starts with 'v'
144144+ try
145145+ Output = os:cmd("node --version 2>/dev/null"),
146146+ case string:trim(Output) of
147147+ [$v | _] -> true; %% Version string like "v18.0.0"
148148+ _ -> false
149149+ end
150150+ catch
151151+ _:_ -> false
152152+ end.
+194
server/src/validator_worker.erl
···11+-module(validator_worker).
22+-behaviour(gen_server).
33+44+%% API
55+-export([start_link/0, start_link/1, stop/0]).
66+-export([run_validator/2, check_script/1]).
77+88+%% gen_server callbacks
99+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
1010+1111+-define(SERVER, ?MODULE).
1212+-define(TIMEOUT, 30000).
1313+1414+-record(state, {
1515+ port :: port() | undefined,
1616+ script_path :: string()
1717+}).
1818+1919+%% ===================================================================
2020+%% API
2121+%% ===================================================================
2222+2323+start_link() ->
2424+ start_link(default_script_path()).
2525+2626+start_link(ScriptPath) ->
2727+ gen_server:start_link({local, ?SERVER}, ?MODULE, [ScriptPath], []).
2828+2929+stop() ->
3030+ gen_server:stop(?SERVER).
3131+3232+%% Run a validator against a record
3333+%% Returns: {ok, true} | {ok, false} | {error, Reason}
3434+-spec run_validator(binary() | string(), binary() | string()) -> {ok, boolean()} | {error, binary()}.
3535+run_validator(JsCode, RecordJson) ->
3636+ gen_server:call(?SERVER, {run, to_binary(JsCode), to_binary(RecordJson)}, ?TIMEOUT).
3737+3838+%% Check if a script is a valid validator
3939+-spec check_script(binary() | string()) -> {ok, nil} | {error, binary()}.
4040+check_script(JsCode) ->
4141+ gen_server:call(?SERVER, {check, to_binary(JsCode)}, ?TIMEOUT).
4242+4343+%% ===================================================================
4444+%% gen_server callbacks
4545+%% ===================================================================
4646+4747+init([ScriptPath]) ->
4848+ process_flag(trap_exit, true),
4949+ State = #state{script_path = ScriptPath, port = undefined},
5050+ {ok, ensure_port(State)}.
5151+5252+handle_call({run, JsCode, RecordJson}, _From, State0) ->
5353+ State = ensure_port(State0),
5454+ %% Parse the record JSON to embed it in the request
5555+ try json:decode(RecordJson) of
5656+ Record when is_map(Record); is_list(Record) ->
5757+ Request = json:encode(#{<<"type">> => <<"run">>, <<"code">> => JsCode, <<"record">> => Record}),
5858+ case send_request(State#state.port, iolist_to_binary(Request)) of
5959+ {ok, #{<<"result">> := Result}} ->
6060+ {reply, {ok, Result}, State};
6161+ {ok, #{<<"error">> := Reason}} ->
6262+ {reply, {error, Reason}, State};
6363+ {error, Reason} ->
6464+ {reply, {error, Reason}, restart_port(State)}
6565+ end;
6666+ _ ->
6767+ {reply, {error, <<"Invalid JSON">>}, State}
6868+ catch
6969+ error:_ ->
7070+ {reply, {error, <<"Invalid JSON">>}, State}
7171+ end;
7272+7373+handle_call({check, JsCode}, _From, State0) ->
7474+ State = ensure_port(State0),
7575+ Request = json:encode(#{<<"type">> => <<"check">>, <<"code">> => JsCode}),
7676+ case send_request(State#state.port, iolist_to_binary(Request)) of
7777+ {ok, #{<<"ok">> := true}} ->
7878+ {reply, {ok, nil}, State};
7979+ {ok, #{<<"error">> := Reason}} ->
8080+ {reply, {error, Reason}, State};
8181+ {error, Reason} ->
8282+ {reply, {error, Reason}, restart_port(State)}
8383+ end;
8484+8585+handle_call(_Request, _From, State) ->
8686+ {reply, {error, <<"Unknown request">>}, State}.
8787+8888+handle_cast(_Msg, State) ->
8989+ {noreply, State}.
9090+9191+handle_info({Port, {exit_status, Status}}, #state{port = Port} = State) ->
9292+ logger:warning("[validator_worker] Node process exited with status ~p", [Status]),
9393+ {noreply, State#state{port = undefined}};
9494+9595+handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
9696+ logger:warning("[validator_worker] Port exited: ~p", [Reason]),
9797+ {noreply, State#state{port = undefined}};
9898+9999+handle_info({Port, {data, {eol, Data}}}, #state{port = Port} = State) ->
100100+ %% Log stderr output from validator
101101+ case Data of
102102+ <<"[log]", Rest/binary>> ->
103103+ logger:info(unicode:characters_to_binary(["[validator] ", Rest]));
104104+ <<"[error]", Rest/binary>> ->
105105+ logger:warning(unicode:characters_to_binary(["[validator] ", Rest]));
106106+ _ ->
107107+ ok
108108+ end,
109109+ {noreply, State};
110110+111111+handle_info({Port, {data, _}}, #state{port = Port} = State) ->
112112+ %% Other data from port, ignore
113113+ {noreply, State};
114114+115115+handle_info(_Info, State) ->
116116+ {noreply, State}.
117117+118118+terminate(_Reason, #state{port = undefined}) ->
119119+ ok;
120120+terminate(_Reason, #state{port = Port}) ->
121121+ catch port_close(Port),
122122+ ok.
123123+124124+%% ===================================================================
125125+%% Internal functions
126126+%% ===================================================================
127127+128128+default_script_path() ->
129129+ %% Find the priv directory
130130+ case code:priv_dir(server) of
131131+ {error, _} ->
132132+ %% Fallback for development
133133+ "priv/validator_worker.cjs";
134134+ PrivDir ->
135135+ filename:join(PrivDir, "validator_worker.cjs")
136136+ end.
137137+138138+ensure_port(#state{port = undefined} = State) ->
139139+ start_port(State);
140140+ensure_port(State) ->
141141+ State.
142142+143143+start_port(#state{script_path = ScriptPath} = State) ->
144144+ Cmd = "node " ++ ScriptPath,
145145+ Port = open_port({spawn, Cmd}, [
146146+ {line, 1048576}, %% 1MB line buffer for large validators/records
147147+ binary,
148148+ use_stdio,
149149+ exit_status
150150+ ]),
151151+ logger:info("[validator_worker] Started Node.js worker"),
152152+ State#state{port = Port}.
153153+154154+restart_port(State) ->
155155+ case State#state.port of
156156+ undefined -> ok;
157157+ Port -> catch port_close(Port)
158158+ end,
159159+ logger:info("[validator_worker] Restarting Node.js worker"),
160160+ start_port(State#state{port = undefined}).
161161+162162+send_request(Port, Request) ->
163163+ %% Send request as a line
164164+ port_command(Port, [Request, $\n]),
165165+ %% Wait for response
166166+ receive_response(Port, <<>>, 5000).
167167+168168+receive_response(Port, Acc, Timeout) ->
169169+ receive
170170+ {Port, {data, {eol, Data}}} ->
171171+ %% Complete line received
172172+ FullData = <<Acc/binary, Data/binary>>,
173173+ try json:decode(FullData) of
174174+ Response when is_map(Response) ->
175175+ {ok, Response};
176176+ _ ->
177177+ {error, <<"Invalid response from validator">>}
178178+ catch
179179+ error:_ ->
180180+ {error, <<"Invalid response from validator">>}
181181+ end;
182182+ {Port, {data, {noeol, Data}}} ->
183183+ %% Partial line, accumulate
184184+ receive_response(Port, <<Acc/binary, Data/binary>>, Timeout);
185185+ {Port, {exit_status, Status}} ->
186186+ {error, iolist_to_binary(io_lib:format("Node process exited with status ~p", [Status]))};
187187+ {'EXIT', Port, Reason} ->
188188+ {error, iolist_to_binary(io_lib:format("Port exited: ~p", [Reason]))}
189189+ after Timeout ->
190190+ {error, <<"Timeout waiting for validator response">>}
191191+ end.
192192+193193+to_binary(B) when is_binary(B) -> B;
194194+to_binary(L) when is_list(L) -> list_to_binary(L).
+5-5
server/test/blob_integration_test.gleam
···99import database/repositories/records
1010import gleam/http
1111import gleam/json
1212-import gleam/option
1212+import gleam/option.{None}
1313import gleam/string
1414import gleeunit/should
1515import handlers/graphql as graphql_handler
···72727373 // Insert profile lexicon with blob fields
7474 let lexicon = create_profile_lexicon()
7575- let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon)
7575+ let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon, None)
76767777 // Insert a profile record with avatar blob
7878 // AT Protocol blob format: { ref: { $link: "cid" }, mimeType: "...", size: 123 }
···171171172172 // Insert profile lexicon
173173 let lexicon = create_profile_lexicon()
174174- let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon)
174174+ let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon, None)
175175176176 // Insert a profile with banner blob
177177 let record_json =
···246246 let assert Ok(_) = test_helpers.create_record_table(exec)
247247248248 let lexicon = create_profile_lexicon()
249249- let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon)
249249+ let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon, None)
250250251251 let record_json =
252252 json.object([
···318318 let assert Ok(_) = test_helpers.create_record_table(exec)
319319320320 let lexicon = create_profile_lexicon()
321321- let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon)
321321+ let assert Ok(_) = lexicons.insert(exec, "app.test.profile", lexicon, None)
322322323323 // Insert record without avatar field
324324 let record_json =
···4949 "CREATE TABLE IF NOT EXISTS lexicon (
5050 id TEXT PRIMARY KEY NOT NULL,
5151 json TEXT NOT NULL,
5252- created_at TEXT NOT NULL DEFAULT (datetime('now'))
5252+ created_at TEXT NOT NULL DEFAULT (datetime('now')),
5353+ validator_js TEXT
5354 )",
5455 [],
5556 )