gleam HTTP server. because it glistens on a web
fork
Configure Feed
Select the types of activity you want to include in your feed.
1import gleam/bit_array
2import gleam/bytes_tree.{type BytesTree}
3import gleam/erlang.{rescue}
4import gleam/erlang/process.{type ProcessDown, type Selector, type Subject}
5import gleam/function
6import gleam/http.{type Scheme, Http, Https} as gleam_http
7import gleam/http/request.{type Request}
8import gleam/http/response.{type Response}
9import gleam/int
10import gleam/io
11import gleam/list
12import gleam/option.{type Option, None, Some}
13import gleam/otp/actor
14import gleam/otp/supervisor
15import gleam/result
16import gleam/string
17import gleam/string_tree.{type StringTree}
18import gleam/yielder.{type Yielder}
19import glisten
20import glisten/transport
21import gramps/websocket.{BinaryFrame, Data, TextFrame} as gramps_websocket
22import logging
23import mist/internal/buffer.{type Buffer, Buffer}
24import mist/internal/encoder
25import mist/internal/file
26import mist/internal/handler
27import mist/internal/http.{
28 type Connection as InternalConnection,
29 type ResponseData as InternalResponseData, Bytes as InternalBytes,
30 Chunked as InternalChunked, File as InternalFile,
31 ServerSentEvents as InternalServerSentEvents, Websocket as InternalWebsocket,
32}
33import mist/internal/websocket.{
34 type HandlerMessage, type WebsocketConnection as InternalWebsocketConnection,
35 Internal, User,
36}
37
38/// Re-exported type that represents the default `Request` body type. See
39/// `mist.read_body` to convert this type into a `BitString`. The `Connection`
40/// also holds some additional information about the request. Currently, the
41/// only useful field is `client_ip` which is a `Result` with a tuple of
42/// integers representing the IPv4 address.
43pub type Connection =
44 InternalConnection
45
46/// When accessing client information, these are the possible shapes of the IP
47/// addresses. A best effort will be made to determine whether IPv4 is most
48/// relevant.
49pub type IpAddress {
50 IpV4(Int, Int, Int, Int)
51 IpV6(Int, Int, Int, Int, Int, Int, Int, Int)
52}
53
54/// Convenience function for printing the `IpAddress` type. It will convert the
55/// IPv6 loopback to the short-hand `::1`.
56pub fn ip_address_to_string(address: IpAddress) -> String {
57 glisten.ip_address_to_string(to_glisten_ip_address(address))
58}
59
60fn to_mist_ip_address(ip: glisten.IpAddress) -> IpAddress {
61 case ip {
62 glisten.IpV4(a, b, c, d) -> IpV4(a, b, c, d)
63 glisten.IpV6(a, b, c, d, e, f, g, h) -> IpV6(a, b, c, d, e, f, g, h)
64 }
65}
66
67fn to_glisten_ip_address(ip: IpAddress) -> glisten.IpAddress {
68 case ip {
69 IpV4(a, b, c, d) -> glisten.IpV4(a, b, c, d)
70 IpV6(a, b, c, d, e, f, g, h) -> glisten.IpV6(a, b, c, d, e, f, g, h)
71 }
72}
73
74pub type ConnectionInfo {
75 ConnectionInfo(port: Int, ip_address: IpAddress)
76}
77
78/// Tries to get the IP address and port of a connected client.
79pub fn get_client_info(conn: Connection) -> Result(ConnectionInfo, Nil) {
80 transport.peername(conn.transport, conn.socket)
81 |> result.map(fn(pair) {
82 ConnectionInfo(
83 ip_address: pair.0
84 |> glisten.convert_ip_address
85 |> to_mist_ip_address,
86 port: pair.1,
87 )
88 })
89}
90
91/// The response body type. This allows `mist` to handle these different cases
92/// for you. `Bytes` is the regular data return. `Websocket` will upgrade the
93/// socket to websockets, but should not be used directly. See the
94/// `mist.upgrade` function for usage. `Chunked` will use
95/// `Transfer-Encoding: chunked` to send an iterator in chunks. `File` will use
96/// Erlang's `sendfile` to more efficiently return a file to the client.
97pub type ResponseData {
98 Websocket(Selector(ProcessDown))
99 Bytes(BytesTree)
100 Chunked(Yielder(BytesTree))
101 /// See `mist.send_file` to use this response type.
102 File(descriptor: file.FileDescriptor, offset: Int, length: Int)
103 ServerSentEvents(Selector(ProcessDown))
104}
105
106/// Potential errors when opening a file to send. This list is
107/// currently not exhaustive with POSIX errors.
108pub type FileError {
109 IsDir
110 NoAccess
111 NoEntry
112 UnknownFileError
113}
114
115fn convert_file_errors(err: file.FileError) -> FileError {
116 case err {
117 file.IsDir -> IsDir
118 file.NoAccess -> NoAccess
119 file.NoEntry -> NoEntry
120 file.UnknownFileError -> UnknownFileError
121 }
122}
123
124/// To respond with a file using Erlang's `sendfile`, use this function
125/// with the specified offset and limit (optional). It will attempt to open the
126/// file for reading, get its file size, and then send the file. If the read
127/// errors, this will return the relevant `FileError`. Generally, this will be
128/// more memory efficient than manually doing this process with `mist.Bytes`.
129pub fn send_file(
130 path: String,
131 offset offset: Int,
132 limit limit: Option(Int),
133) -> Result(ResponseData, FileError) {
134 path
135 |> bit_array.from_string
136 |> file.stat
137 |> result.map_error(convert_file_errors)
138 |> result.map(fn(stat) {
139 File(
140 descriptor: stat.descriptor,
141 offset: offset,
142 length: option.unwrap(limit, stat.file_size),
143 )
144 })
145}
146
147/// The possible errors from reading the request body. If the size is larger
148/// than the provided value, `ExcessBody` is returned. If there is an error
149/// reading the body from the socket or the body is malformed (i.e a chunked
150/// request with invalid sizes), `MalformedBody` is returned.
151pub type ReadError {
152 ExcessBody
153 MalformedBody
154}
155
156/// The request body is not pulled from the socket until requested. The
157/// `content-length` header is used to determine whether the socket is read
158/// from or not. The read may also fail, and a `ReadError` is raised.
159pub fn read_body(
160 req: Request(Connection),
161 max_body_limit max_body_limit: Int,
162) -> Result(Request(BitArray), ReadError) {
163 req
164 |> request.get_header("content-length")
165 |> result.then(int.parse)
166 |> result.unwrap(0)
167 |> fn(content_length) {
168 case content_length {
169 value if value <= max_body_limit -> {
170 http.read_body(req)
171 |> result.replace_error(MalformedBody)
172 }
173 _ -> {
174 Error(ExcessBody)
175 }
176 }
177 }
178}
179
180/// The values returning from streaming the request body. The `Chunk`
181/// variant gives back some data and the next token. `Done` signifies
182/// that we have completed reading the body.
183pub type Chunk {
184 Chunk(data: BitArray, consume: fn(Int) -> Result(Chunk, ReadError))
185 Done
186}
187
188fn do_stream(
189 req: Request(Connection),
190 buffer: Buffer,
191) -> fn(Int) -> Result(Chunk, ReadError) {
192 fn(size) {
193 let socket = req.body.socket
194 let transport = req.body.transport
195 let byte_size = bit_array.byte_size(buffer.data)
196
197 case buffer.remaining, byte_size {
198 0, 0 -> Ok(Done)
199
200 0, _buffer_size -> {
201 let #(data, rest) = buffer.slice(buffer, size)
202 Ok(Chunk(data, do_stream(req, buffer.new(rest))))
203 }
204
205 _, buffer_size if buffer_size >= size -> {
206 let #(data, rest) = buffer.slice(buffer, size)
207 let new_buffer = Buffer(..buffer, data: rest)
208 Ok(Chunk(data, do_stream(req, new_buffer)))
209 }
210
211 _, _buffer_size -> {
212 http.read_data(socket, transport, buffer.empty(), http.InvalidBody)
213 |> result.replace_error(MalformedBody)
214 |> result.map(fn(data) {
215 let fetched_data = bit_array.byte_size(data)
216 let new_buffer =
217 Buffer(
218 data: bit_array.append(buffer.data, data),
219 remaining: int.max(0, buffer.remaining - fetched_data),
220 )
221 let #(new_data, rest) = buffer.slice(new_buffer, size)
222 Chunk(new_data, do_stream(req, Buffer(..new_buffer, data: rest)))
223 })
224 }
225 }
226 }
227}
228
229type ChunkState {
230 ChunkState(data_buffer: Buffer, chunk_buffer: Buffer, done: Bool)
231}
232
233fn do_stream_chunked(
234 req: Request(Connection),
235 state: ChunkState,
236) -> fn(Int) -> Result(Chunk, ReadError) {
237 let socket = req.body.socket
238 let transport = req.body.transport
239
240 fn(size) {
241 case fetch_chunks_until(socket, transport, state, size) {
242 Ok(#(data, ChunkState(done: True, ..))) -> {
243 Ok(Chunk(data, fn(_size) { Ok(Done) }))
244 }
245 Ok(#(data, state)) -> {
246 Ok(Chunk(data, do_stream_chunked(req, state)))
247 }
248 Error(_) -> Error(MalformedBody)
249 }
250 }
251}
252
253fn fetch_chunks_until(
254 socket: glisten.Socket,
255 transport: transport.Transport,
256 state: ChunkState,
257 byte_size: Int,
258) -> Result(#(BitArray, ChunkState), ReadError) {
259 let data_size = bit_array.byte_size(state.data_buffer.data)
260 case state.done, data_size {
261 _, size if size >= byte_size -> {
262 let #(value, rest) = buffer.slice(state.data_buffer, byte_size)
263 Ok(#(value, ChunkState(..state, data_buffer: buffer.new(rest))))
264 }
265
266 True, _ -> {
267 Ok(#(state.data_buffer.data, ChunkState(..state, done: True)))
268 }
269
270 False, _ -> {
271 case http.parse_chunk(state.chunk_buffer.data) {
272 http.Complete -> {
273 let updated_state =
274 ChunkState(..state, chunk_buffer: buffer.empty(), done: True)
275 fetch_chunks_until(socket, transport, updated_state, byte_size)
276 }
277 http.Chunk(<<>>, next_buffer) -> {
278 http.read_data(socket, transport, next_buffer, http.InvalidBody)
279 |> result.replace_error(MalformedBody)
280 |> result.then(fn(new_data) {
281 let updated_state =
282 ChunkState(..state, chunk_buffer: buffer.new(new_data))
283 fetch_chunks_until(socket, transport, updated_state, byte_size)
284 })
285 }
286 http.Chunk(data, next_buffer) -> {
287 let updated_state =
288 ChunkState(
289 ..state,
290 data_buffer: buffer.append(state.data_buffer, data),
291 chunk_buffer: next_buffer,
292 )
293 fetch_chunks_until(socket, transport, updated_state, byte_size)
294 }
295 }
296 }
297 }
298}
299
300/// Rather than explicitly reading either the whole body (optionally up to
301/// `N` bytes), this function allows you to consume a stream of the request
302/// body. Any errors reading the body will propagate out, or `Chunk`s will be
303/// emitted. This provides a `consume` method to attempt to grab the next
304/// `size` chunk from the socket.
305pub fn stream(
306 req: Request(Connection),
307) -> Result(fn(Int) -> Result(Chunk, ReadError), ReadError) {
308 let continue =
309 req
310 |> http.handle_continue
311 |> result.replace_error(MalformedBody)
312
313 use _nil <- result.map(continue)
314
315 let is_chunked = case request.get_header(req, "transfer-encoding") {
316 Ok("chunked") -> True
317 _ -> False
318 }
319
320 let assert http.Initial(data) = req.body.body
321
322 case is_chunked {
323 True -> {
324 let state = ChunkState(buffer.new(<<>>), buffer.new(data), False)
325 do_stream_chunked(req, state)
326 }
327 False -> {
328 let content_length =
329 req
330 |> request.get_header("content-length")
331 |> result.then(int.parse)
332 |> result.unwrap(0)
333
334 let initial_size = bit_array.byte_size(data)
335
336 let buffer =
337 Buffer(data: data, remaining: int.max(0, content_length - initial_size))
338
339 do_stream(req, buffer)
340 }
341 }
342}
343
344pub opaque type Builder(request_body, response_body) {
345 Builder(
346 port: Int,
347 handler: fn(Request(request_body)) -> Response(response_body),
348 after_start: fn(Int, Scheme, IpAddress) -> Nil,
349 interface: String,
350 ipv6_support: Bool,
351 )
352}
353
354/// Create a new `mist` handler with a given function. The default port is
355/// 4000.
356pub fn new(handler: fn(Request(in)) -> Response(out)) -> Builder(in, out) {
357 Builder(
358 port: 4000,
359 handler: handler,
360 interface: "localhost",
361 ipv6_support: False,
362 after_start: fn(port, scheme, interface) {
363 let address = case interface {
364 IpV6(..) -> "[" <> ip_address_to_string(interface) <> "]"
365 _ -> ip_address_to_string(interface)
366 }
367 let message =
368 "Listening on "
369 <> gleam_http.scheme_to_string(scheme)
370 <> "://"
371 <> address
372 <> ":"
373 <> int.to_string(port)
374 io.println(message)
375 },
376 )
377}
378
379/// Assign a different listening port to the service.
380pub fn port(builder: Builder(in, out), port: Int) -> Builder(in, out) {
381 Builder(..builder, port: port)
382}
383
384/// This function allows for implicitly reading the body of requests up
385/// to a given size. If the size is too large, or the read fails, the provided
386/// `failure_response` will be sent back as the response.
387pub fn read_request_body(
388 builder: Builder(BitArray, out),
389 bytes_limit bytes_limit: Int,
390 failure_response failure_response: Response(out),
391) -> Builder(Connection, out) {
392 let handler = fn(request) {
393 case read_body(request, bytes_limit) {
394 Ok(request) -> builder.handler(request)
395 Error(_) -> failure_response
396 }
397 }
398 Builder(
399 builder.port,
400 handler,
401 builder.after_start,
402 builder.interface,
403 builder.ipv6_support,
404 )
405}
406
407/// Override the default function to be called after the service starts. The
408/// default is to log a message with the listening port.
409pub fn after_start(
410 builder: Builder(in, out),
411 after_start: fn(Int, Scheme, IpAddress) -> Nil,
412) -> Builder(in, out) {
413 Builder(..builder, after_start: after_start)
414}
415
416/// Specify an interface to listen on. This is a string that can have the
417/// following values: "localhost", a valid IPv4 address (i.e. "127.0.0.1"), or
418/// a valid IPv6 address (i.e. "::1"). An invalid value will cause the
419/// application to crash.
420pub fn bind(builder: Builder(in, out), interface: String) -> Builder(in, out) {
421 Builder(..builder, interface: interface)
422}
423
424/// By default, `mist` will listen on `localhost` over IPv4. If you specify an
425/// IPv4 address to bind to, it will still only serve over IPv4. Calling this
426/// function will listen on both IPv4 and IPv6 for the given interface. If it is
427/// not supported, your application will crash. If you provide an IPv6 address
428/// to `mist.bind`, this function will have no effect.
429pub fn with_ipv6(builder: Builder(in, out)) -> Builder(in, out) {
430 Builder(..builder, ipv6_support: True)
431}
432
433fn convert_body_types(
434 resp: Response(ResponseData),
435) -> Response(InternalResponseData) {
436 let new_body = case resp.body {
437 Websocket(selector) -> InternalWebsocket(selector)
438 Bytes(data) -> InternalBytes(data)
439 File(descriptor, offset, length) -> InternalFile(descriptor, offset, length)
440 Chunked(iter) -> InternalChunked(iter)
441 ServerSentEvents(selector) -> InternalServerSentEvents(selector)
442 }
443 response.set_body(resp, new_body)
444}
445
446pub type Port {
447 Assigned
448 Provided(Int)
449}
450
451pub opaque type Server {
452 Server(
453 supervisor: Subject(supervisor.Message),
454 port: Int,
455 ip_address: IpAddress,
456 )
457}
458
459pub fn get_supervisor(server: Server) -> Subject(supervisor.Message) {
460 server.supervisor
461}
462
463pub fn get_port(server: Server) -> Int {
464 server.port
465}
466
467/// Start a `mist` service over HTTP with the provided builder.
468pub fn start_http(
469 builder: Builder(Connection, ResponseData),
470) -> Result(Subject(supervisor.Message), glisten.StartError) {
471 start_http_server(builder)
472 |> result.map(get_supervisor)
473}
474
475/// See the documentation for `start_http`. For now, you almost certainly
476/// want to use that. In the future, this will allow access to things like
477/// OS-provided ports, graceful shutdown, etc.
478pub fn start_http_server(
479 builder: Builder(Connection, ResponseData),
480) -> Result(Server, glisten.StartError) {
481 fn(req) { convert_body_types(builder.handler(req)) }
482 |> handler.with_func
483 |> glisten.handler(handler.init, _)
484 |> glisten.bind(builder.interface)
485 |> fn(handler) {
486 case builder.ipv6_support {
487 True -> glisten.with_ipv6(handler)
488 False -> handler
489 }
490 }
491 |> glisten.start_server(builder.port)
492 |> result.map(fn(server) {
493 case glisten.get_server_info(server, 5000) {
494 Ok(info) -> {
495 let ip_address = to_mist_ip_address(info.ip_address)
496 builder.after_start(info.port, Http, ip_address)
497 Server(
498 supervisor: glisten.get_supervisor(server),
499 port: info.port,
500 ip_address: ip_address,
501 )
502 }
503 Error(reason) -> {
504 logging.log(
505 logging.Error,
506 "Failed to read port from socket: " <> string.inspect(reason),
507 )
508 panic
509 }
510 }
511 })
512}
513
514/// These are the types of errors raised by trying to read the certificate and
515/// key files.
516pub type CertificateError {
517 NoCertificate
518 NoKey
519 NoKeyOrCertificate
520}
521
522/// These are the possible errors raised when trying to start an Https server.
523/// If there are issues reading the certificate or key files, those will be
524/// returned.
525pub type HttpsError {
526 GlistenError(glisten.StartError)
527 CertificateError(CertificateError)
528}
529
530/// Start a `mist` service over HTTPS with the provided builder. This method
531/// requires both a certificate file and a key file. The library will attempt
532/// to read these files off of the disk.
533pub fn start_https(
534 builder: Builder(Connection, ResponseData),
535 certfile certfile: String,
536 keyfile keyfile: String,
537) -> Result(Subject(supervisor.Message), HttpsError) {
538 start_https_server(builder, certfile, keyfile)
539 |> result.map(get_supervisor)
540}
541
542/// See the documentation for `start_https`. For now, you almost certainly
543/// want to use that. In the future, this will allow access to things like
544/// OS-provided ports, graceful shutdown, etc.
545pub fn start_https_server(
546 builder: Builder(Connection, ResponseData),
547 certfile certfile: String,
548 keyfile keyfile: String,
549) -> Result(Server, HttpsError) {
550 let cert = file.open(bit_array.from_string(certfile))
551 let key = file.open(bit_array.from_string(keyfile))
552
553 let res = case cert, key {
554 Error(_), Error(_) -> Error(CertificateError(NoKeyOrCertificate))
555 Ok(_), Error(_) -> Error(CertificateError(NoKey))
556 Error(_), Ok(_) -> Error(CertificateError(NoCertificate))
557 Ok(_), Ok(_) -> Ok(Nil)
558 }
559
560 use _ <- result.then(res)
561
562 fn(req) { convert_body_types(builder.handler(req)) }
563 |> handler.with_func
564 |> glisten.handler(handler.init, _)
565 |> glisten.bind(builder.interface)
566 |> glisten.start_ssl_server(builder.port, certfile, keyfile)
567 |> result.map(fn(server) {
568 case glisten.get_server_info(server, 1000) {
569 Ok(info) -> {
570 let ip_address = to_mist_ip_address(info.ip_address)
571 builder.after_start(info.port, Https, ip_address)
572 Server(
573 supervisor: glisten.get_supervisor(server),
574 port: info.port,
575 ip_address: ip_address,
576 )
577 }
578 Error(reason) -> {
579 logging.log(
580 logging.Error,
581 "Failed to read port from socket: " <> string.inspect(reason),
582 )
583 panic
584 }
585 }
586 })
587 |> result.map_error(GlistenError)
588}
589
590/// These are the types of messages that a websocket handler may receive.
591pub type WebsocketMessage(custom) {
592 Text(String)
593 Binary(BitArray)
594 Closed
595 Shutdown
596 Custom(custom)
597}
598
599fn internal_to_public_ws_message(
600 msg: HandlerMessage(custom),
601) -> Result(WebsocketMessage(custom), Nil) {
602 case msg {
603 Internal(Data(TextFrame(_length, data))) -> {
604 data
605 |> bit_array.to_string
606 |> result.map(Text)
607 }
608 Internal(Data(BinaryFrame(_length, data))) -> Ok(Binary(data))
609 User(msg) -> Ok(Custom(msg))
610 _ -> Error(Nil)
611 }
612}
613
614/// Upgrade a request to handle websockets. If the request is
615/// malformed, or the websocket process fails to initialize, an empty
616/// 400 response will be sent to the client.
617///
618/// The `on_init` method will be called when the actual WebSocket process
619/// is started, and the return value is the initial state and an optional
620/// selector for receiving user messages.
621///
622/// The `on_close` method is called when the WebSocket process shuts down
623/// for any reason, valid or otherwise.
624pub fn websocket(
625 request request: Request(Connection),
626 handler handler: fn(state, WebsocketConnection, WebsocketMessage(message)) ->
627 actor.Next(message, state),
628 on_init on_init: fn(WebsocketConnection) ->
629 #(state, Option(process.Selector(message))),
630 on_close on_close: fn(state) -> Nil,
631) -> Response(ResponseData) {
632 let handler = fn(state, connection, message) {
633 message
634 |> internal_to_public_ws_message
635 |> result.map(handler(state, connection, _))
636 |> result.unwrap(actor.continue(state))
637 }
638 let extensions =
639 request
640 |> request.get_header("sec-websocket-extensions")
641 |> result.map(fn(header) { string.split(header, ";") })
642 |> result.unwrap([])
643
644 let socket = request.body.socket
645 let transport = request.body.transport
646 request
647 |> http.upgrade(socket, transport, extensions, _)
648 |> result.then(fn(_nil) {
649 websocket.initialize_connection(
650 on_init,
651 on_close,
652 handler,
653 socket,
654 transport,
655 extensions,
656 )
657 })
658 |> result.map(fn(subj) {
659 let ws_process = process.subject_owner(subj)
660 let monitor = process.monitor_process(ws_process)
661 let selector =
662 process.new_selector()
663 |> process.selecting_process_down(monitor, function.identity)
664 response.new(200)
665 |> response.set_body(Websocket(selector))
666 })
667 |> result.lazy_unwrap(fn() {
668 response.new(400)
669 |> response.set_body(Bytes(bytes_tree.new()))
670 })
671}
672
673pub type WebsocketConnection =
674 InternalWebsocketConnection
675
676/// Sends a binary frame across the websocket.
677pub fn send_binary_frame(
678 connection: WebsocketConnection,
679 frame: BitArray,
680) -> Result(Nil, glisten.SocketReason) {
681 let binary_frame =
682 rescue(fn() {
683 gramps_websocket.to_binary_frame(frame, connection.deflate, None)
684 })
685 case binary_frame {
686 Ok(binary_frame) -> {
687 transport.send(connection.transport, connection.socket, binary_frame)
688 }
689 Error(reason) -> {
690 logging.log(
691 logging.Error,
692 "Cannot send messages from a different process than the WebSocket: "
693 <> string.inspect(reason),
694 )
695 panic as "Exiting due to sending WebSocket message from non-owning process"
696 }
697 }
698}
699
700/// Sends a text frame across the websocket.
701pub fn send_text_frame(
702 connection: WebsocketConnection,
703 frame: String,
704) -> Result(Nil, glisten.SocketReason) {
705 let text_frame =
706 rescue(fn() {
707 gramps_websocket.to_text_frame(frame, connection.deflate, None)
708 })
709 case text_frame {
710 Ok(text_frame) -> {
711 transport.send(connection.transport, connection.socket, text_frame)
712 }
713 Error(reason) -> {
714 logging.log(
715 logging.Error,
716 "Cannot send messages from a different process than the WebSocket: "
717 <> string.inspect(reason),
718 )
719 panic as "Exiting due to sending WebSocket message from non-owning process"
720 }
721 }
722}
723
724// Returned by `init_server_sent_events`. This type must be passed to
725// `send_event` since we need to enforce that the correct headers / data shapw
726// is provided.
727pub opaque type SSEConnection {
728 SSEConnection(Connection)
729}
730
731// Represents each event. Only `data` is required. The `event` name will
732// default to `message`. If an `id` is provided, it will be included in the
733// event received by the client.
734pub opaque type SSEEvent {
735 SSEEvent(id: Option(String), event: Option(String), data: StringTree)
736}
737
738// Builder for generating the base event
739pub fn event(data: StringTree) -> SSEEvent {
740 SSEEvent(id: None, event: None, data: data)
741}
742
743// Adds an `id` to the event
744pub fn event_id(event: SSEEvent, id: String) -> SSEEvent {
745 SSEEvent(..event, id: Some(id))
746}
747
748// Sets the `event` name field
749pub fn event_name(event: SSEEvent, name: String) -> SSEEvent {
750 SSEEvent(..event, event: Some(name))
751}
752
753/// Sets up the connection for server-sent events. The initial response provided
754/// here will have its headers included in the SSE setup. The body is discarded.
755/// The `init` and `loop` parameters follow the same shape as the
756/// `gleam/otp/actor` module.
757///
758/// NOTE: There is no proper way within the spec for the server to "close" the
759/// SSE connection. There are ways around it.
760///
761/// See: `examples/eventz` for a sample usage.
762pub fn server_sent_events(
763 request req: Request(Connection),
764 initial_response resp: Response(discard),
765 init init: fn() -> actor.InitResult(state, message),
766 loop loop: fn(message, SSEConnection, state) -> actor.Next(message, state),
767) -> Response(ResponseData) {
768 let with_default_headers =
769 resp
770 |> response.set_header("content-type", "text/event-stream")
771 |> response.set_header("cache-control", "no-cache")
772 |> response.set_header("connection", "keep-alive")
773
774 transport.send(
775 req.body.transport,
776 req.body.socket,
777 encoder.response_builder(200, with_default_headers.headers, "1.1"),
778 )
779 |> result.replace_error(Nil)
780 |> result.then(fn(_nil) {
781 actor.start_spec(
782 actor.Spec(init: init, init_timeout: 1000, loop: fn(state, message) {
783 loop(state, SSEConnection(req.body), message)
784 }),
785 )
786 |> result.replace_error(Nil)
787 })
788 |> result.map(fn(subj) {
789 let sse_process = process.subject_owner(subj)
790 let monitor = process.monitor_process(sse_process)
791 let selector =
792 process.new_selector()
793 |> process.selecting_process_down(monitor, function.identity)
794 response.new(200)
795 |> response.set_body(ServerSentEvents(selector))
796 })
797 |> result.lazy_unwrap(fn() {
798 response.new(400)
799 |> response.set_body(Bytes(bytes_tree.new()))
800 })
801}
802
803// This constructs an event from the provided type. If `id` or `event` are
804// provided, they are included in the message. The data provided is split
805// across newlines, which I think is per the spec? The `Result` returned here
806// can be used to determine whether the event send has succeeded.
807pub fn send_event(conn: SSEConnection, event: SSEEvent) -> Result(Nil, Nil) {
808 let SSEConnection(conn) = conn
809 let id =
810 event.id
811 |> option.map(fn(id) { "id: " <> id <> "\n" })
812 |> option.unwrap("")
813 let event_name =
814 event.event
815 |> option.map(fn(name) { "event: " <> name <> "\n" })
816 |> option.unwrap("")
817 let data =
818 event.data
819 |> string_tree.split("\n")
820 |> list.map(fn(row) { string_tree.prepend(row, "data: ") })
821 |> string_tree.join("\n")
822
823 let message =
824 data
825 |> string_tree.prepend(event_name)
826 |> string_tree.prepend(id)
827 |> string_tree.append("\n\n")
828 |> bytes_tree.from_string_tree
829
830 transport.send(conn.transport, conn.socket, message)
831 |> result.replace(Nil)
832 |> result.replace_error(Nil)
833}