gleam HTTP server. because it glistens on a web
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 833 lines 26 kB view raw
1import gleam/bit_array 2import gleam/bytes_tree.{type BytesTree} 3import gleam/erlang.{rescue} 4import gleam/erlang/process.{type ProcessDown, type Selector, type Subject} 5import gleam/function 6import gleam/http.{type Scheme, Http, Https} as gleam_http 7import gleam/http/request.{type Request} 8import gleam/http/response.{type Response} 9import gleam/int 10import gleam/io 11import gleam/list 12import gleam/option.{type Option, None, Some} 13import gleam/otp/actor 14import gleam/otp/supervisor 15import gleam/result 16import gleam/string 17import gleam/string_tree.{type StringTree} 18import gleam/yielder.{type Yielder} 19import glisten 20import glisten/transport 21import gramps/websocket.{BinaryFrame, Data, TextFrame} as gramps_websocket 22import logging 23import mist/internal/buffer.{type Buffer, Buffer} 24import mist/internal/encoder 25import mist/internal/file 26import mist/internal/handler 27import mist/internal/http.{ 28 type Connection as InternalConnection, 29 type ResponseData as InternalResponseData, Bytes as InternalBytes, 30 Chunked as InternalChunked, File as InternalFile, 31 ServerSentEvents as InternalServerSentEvents, Websocket as InternalWebsocket, 32} 33import mist/internal/websocket.{ 34 type HandlerMessage, type WebsocketConnection as InternalWebsocketConnection, 35 Internal, User, 36} 37 38/// Re-exported type that represents the default `Request` body type. See 39/// `mist.read_body` to convert this type into a `BitString`. The `Connection` 40/// also holds some additional information about the request. Currently, the 41/// only useful field is `client_ip` which is a `Result` with a tuple of 42/// integers representing the IPv4 address. 43pub type Connection = 44 InternalConnection 45 46/// When accessing client information, these are the possible shapes of the IP 47/// addresses. A best effort will be made to determine whether IPv4 is most 48/// relevant. 49pub type IpAddress { 50 IpV4(Int, Int, Int, Int) 51 IpV6(Int, Int, Int, Int, Int, Int, Int, Int) 52} 53 54/// Convenience function for printing the `IpAddress` type. It will convert the 55/// IPv6 loopback to the short-hand `::1`. 56pub fn ip_address_to_string(address: IpAddress) -> String { 57 glisten.ip_address_to_string(to_glisten_ip_address(address)) 58} 59 60fn to_mist_ip_address(ip: glisten.IpAddress) -> IpAddress { 61 case ip { 62 glisten.IpV4(a, b, c, d) -> IpV4(a, b, c, d) 63 glisten.IpV6(a, b, c, d, e, f, g, h) -> IpV6(a, b, c, d, e, f, g, h) 64 } 65} 66 67fn to_glisten_ip_address(ip: IpAddress) -> glisten.IpAddress { 68 case ip { 69 IpV4(a, b, c, d) -> glisten.IpV4(a, b, c, d) 70 IpV6(a, b, c, d, e, f, g, h) -> glisten.IpV6(a, b, c, d, e, f, g, h) 71 } 72} 73 74pub type ConnectionInfo { 75 ConnectionInfo(port: Int, ip_address: IpAddress) 76} 77 78/// Tries to get the IP address and port of a connected client. 79pub fn get_client_info(conn: Connection) -> Result(ConnectionInfo, Nil) { 80 transport.peername(conn.transport, conn.socket) 81 |> result.map(fn(pair) { 82 ConnectionInfo( 83 ip_address: pair.0 84 |> glisten.convert_ip_address 85 |> to_mist_ip_address, 86 port: pair.1, 87 ) 88 }) 89} 90 91/// The response body type. This allows `mist` to handle these different cases 92/// for you. `Bytes` is the regular data return. `Websocket` will upgrade the 93/// socket to websockets, but should not be used directly. See the 94/// `mist.upgrade` function for usage. `Chunked` will use 95/// `Transfer-Encoding: chunked` to send an iterator in chunks. `File` will use 96/// Erlang's `sendfile` to more efficiently return a file to the client. 97pub type ResponseData { 98 Websocket(Selector(ProcessDown)) 99 Bytes(BytesTree) 100 Chunked(Yielder(BytesTree)) 101 /// See `mist.send_file` to use this response type. 102 File(descriptor: file.FileDescriptor, offset: Int, length: Int) 103 ServerSentEvents(Selector(ProcessDown)) 104} 105 106/// Potential errors when opening a file to send. This list is 107/// currently not exhaustive with POSIX errors. 108pub type FileError { 109 IsDir 110 NoAccess 111 NoEntry 112 UnknownFileError 113} 114 115fn convert_file_errors(err: file.FileError) -> FileError { 116 case err { 117 file.IsDir -> IsDir 118 file.NoAccess -> NoAccess 119 file.NoEntry -> NoEntry 120 file.UnknownFileError -> UnknownFileError 121 } 122} 123 124/// To respond with a file using Erlang's `sendfile`, use this function 125/// with the specified offset and limit (optional). It will attempt to open the 126/// file for reading, get its file size, and then send the file. If the read 127/// errors, this will return the relevant `FileError`. Generally, this will be 128/// more memory efficient than manually doing this process with `mist.Bytes`. 129pub fn send_file( 130 path: String, 131 offset offset: Int, 132 limit limit: Option(Int), 133) -> Result(ResponseData, FileError) { 134 path 135 |> bit_array.from_string 136 |> file.stat 137 |> result.map_error(convert_file_errors) 138 |> result.map(fn(stat) { 139 File( 140 descriptor: stat.descriptor, 141 offset: offset, 142 length: option.unwrap(limit, stat.file_size), 143 ) 144 }) 145} 146 147/// The possible errors from reading the request body. If the size is larger 148/// than the provided value, `ExcessBody` is returned. If there is an error 149/// reading the body from the socket or the body is malformed (i.e a chunked 150/// request with invalid sizes), `MalformedBody` is returned. 151pub type ReadError { 152 ExcessBody 153 MalformedBody 154} 155 156/// The request body is not pulled from the socket until requested. The 157/// `content-length` header is used to determine whether the socket is read 158/// from or not. The read may also fail, and a `ReadError` is raised. 159pub fn read_body( 160 req: Request(Connection), 161 max_body_limit max_body_limit: Int, 162) -> Result(Request(BitArray), ReadError) { 163 req 164 |> request.get_header("content-length") 165 |> result.then(int.parse) 166 |> result.unwrap(0) 167 |> fn(content_length) { 168 case content_length { 169 value if value <= max_body_limit -> { 170 http.read_body(req) 171 |> result.replace_error(MalformedBody) 172 } 173 _ -> { 174 Error(ExcessBody) 175 } 176 } 177 } 178} 179 180/// The values returning from streaming the request body. The `Chunk` 181/// variant gives back some data and the next token. `Done` signifies 182/// that we have completed reading the body. 183pub type Chunk { 184 Chunk(data: BitArray, consume: fn(Int) -> Result(Chunk, ReadError)) 185 Done 186} 187 188fn do_stream( 189 req: Request(Connection), 190 buffer: Buffer, 191) -> fn(Int) -> Result(Chunk, ReadError) { 192 fn(size) { 193 let socket = req.body.socket 194 let transport = req.body.transport 195 let byte_size = bit_array.byte_size(buffer.data) 196 197 case buffer.remaining, byte_size { 198 0, 0 -> Ok(Done) 199 200 0, _buffer_size -> { 201 let #(data, rest) = buffer.slice(buffer, size) 202 Ok(Chunk(data, do_stream(req, buffer.new(rest)))) 203 } 204 205 _, buffer_size if buffer_size >= size -> { 206 let #(data, rest) = buffer.slice(buffer, size) 207 let new_buffer = Buffer(..buffer, data: rest) 208 Ok(Chunk(data, do_stream(req, new_buffer))) 209 } 210 211 _, _buffer_size -> { 212 http.read_data(socket, transport, buffer.empty(), http.InvalidBody) 213 |> result.replace_error(MalformedBody) 214 |> result.map(fn(data) { 215 let fetched_data = bit_array.byte_size(data) 216 let new_buffer = 217 Buffer( 218 data: bit_array.append(buffer.data, data), 219 remaining: int.max(0, buffer.remaining - fetched_data), 220 ) 221 let #(new_data, rest) = buffer.slice(new_buffer, size) 222 Chunk(new_data, do_stream(req, Buffer(..new_buffer, data: rest))) 223 }) 224 } 225 } 226 } 227} 228 229type ChunkState { 230 ChunkState(data_buffer: Buffer, chunk_buffer: Buffer, done: Bool) 231} 232 233fn do_stream_chunked( 234 req: Request(Connection), 235 state: ChunkState, 236) -> fn(Int) -> Result(Chunk, ReadError) { 237 let socket = req.body.socket 238 let transport = req.body.transport 239 240 fn(size) { 241 case fetch_chunks_until(socket, transport, state, size) { 242 Ok(#(data, ChunkState(done: True, ..))) -> { 243 Ok(Chunk(data, fn(_size) { Ok(Done) })) 244 } 245 Ok(#(data, state)) -> { 246 Ok(Chunk(data, do_stream_chunked(req, state))) 247 } 248 Error(_) -> Error(MalformedBody) 249 } 250 } 251} 252 253fn fetch_chunks_until( 254 socket: glisten.Socket, 255 transport: transport.Transport, 256 state: ChunkState, 257 byte_size: Int, 258) -> Result(#(BitArray, ChunkState), ReadError) { 259 let data_size = bit_array.byte_size(state.data_buffer.data) 260 case state.done, data_size { 261 _, size if size >= byte_size -> { 262 let #(value, rest) = buffer.slice(state.data_buffer, byte_size) 263 Ok(#(value, ChunkState(..state, data_buffer: buffer.new(rest)))) 264 } 265 266 True, _ -> { 267 Ok(#(state.data_buffer.data, ChunkState(..state, done: True))) 268 } 269 270 False, _ -> { 271 case http.parse_chunk(state.chunk_buffer.data) { 272 http.Complete -> { 273 let updated_state = 274 ChunkState(..state, chunk_buffer: buffer.empty(), done: True) 275 fetch_chunks_until(socket, transport, updated_state, byte_size) 276 } 277 http.Chunk(<<>>, next_buffer) -> { 278 http.read_data(socket, transport, next_buffer, http.InvalidBody) 279 |> result.replace_error(MalformedBody) 280 |> result.then(fn(new_data) { 281 let updated_state = 282 ChunkState(..state, chunk_buffer: buffer.new(new_data)) 283 fetch_chunks_until(socket, transport, updated_state, byte_size) 284 }) 285 } 286 http.Chunk(data, next_buffer) -> { 287 let updated_state = 288 ChunkState( 289 ..state, 290 data_buffer: buffer.append(state.data_buffer, data), 291 chunk_buffer: next_buffer, 292 ) 293 fetch_chunks_until(socket, transport, updated_state, byte_size) 294 } 295 } 296 } 297 } 298} 299 300/// Rather than explicitly reading either the whole body (optionally up to 301/// `N` bytes), this function allows you to consume a stream of the request 302/// body. Any errors reading the body will propagate out, or `Chunk`s will be 303/// emitted. This provides a `consume` method to attempt to grab the next 304/// `size` chunk from the socket. 305pub fn stream( 306 req: Request(Connection), 307) -> Result(fn(Int) -> Result(Chunk, ReadError), ReadError) { 308 let continue = 309 req 310 |> http.handle_continue 311 |> result.replace_error(MalformedBody) 312 313 use _nil <- result.map(continue) 314 315 let is_chunked = case request.get_header(req, "transfer-encoding") { 316 Ok("chunked") -> True 317 _ -> False 318 } 319 320 let assert http.Initial(data) = req.body.body 321 322 case is_chunked { 323 True -> { 324 let state = ChunkState(buffer.new(<<>>), buffer.new(data), False) 325 do_stream_chunked(req, state) 326 } 327 False -> { 328 let content_length = 329 req 330 |> request.get_header("content-length") 331 |> result.then(int.parse) 332 |> result.unwrap(0) 333 334 let initial_size = bit_array.byte_size(data) 335 336 let buffer = 337 Buffer(data: data, remaining: int.max(0, content_length - initial_size)) 338 339 do_stream(req, buffer) 340 } 341 } 342} 343 344pub opaque type Builder(request_body, response_body) { 345 Builder( 346 port: Int, 347 handler: fn(Request(request_body)) -> Response(response_body), 348 after_start: fn(Int, Scheme, IpAddress) -> Nil, 349 interface: String, 350 ipv6_support: Bool, 351 ) 352} 353 354/// Create a new `mist` handler with a given function. The default port is 355/// 4000. 356pub fn new(handler: fn(Request(in)) -> Response(out)) -> Builder(in, out) { 357 Builder( 358 port: 4000, 359 handler: handler, 360 interface: "localhost", 361 ipv6_support: False, 362 after_start: fn(port, scheme, interface) { 363 let address = case interface { 364 IpV6(..) -> "[" <> ip_address_to_string(interface) <> "]" 365 _ -> ip_address_to_string(interface) 366 } 367 let message = 368 "Listening on " 369 <> gleam_http.scheme_to_string(scheme) 370 <> "://" 371 <> address 372 <> ":" 373 <> int.to_string(port) 374 io.println(message) 375 }, 376 ) 377} 378 379/// Assign a different listening port to the service. 380pub fn port(builder: Builder(in, out), port: Int) -> Builder(in, out) { 381 Builder(..builder, port: port) 382} 383 384/// This function allows for implicitly reading the body of requests up 385/// to a given size. If the size is too large, or the read fails, the provided 386/// `failure_response` will be sent back as the response. 387pub fn read_request_body( 388 builder: Builder(BitArray, out), 389 bytes_limit bytes_limit: Int, 390 failure_response failure_response: Response(out), 391) -> Builder(Connection, out) { 392 let handler = fn(request) { 393 case read_body(request, bytes_limit) { 394 Ok(request) -> builder.handler(request) 395 Error(_) -> failure_response 396 } 397 } 398 Builder( 399 builder.port, 400 handler, 401 builder.after_start, 402 builder.interface, 403 builder.ipv6_support, 404 ) 405} 406 407/// Override the default function to be called after the service starts. The 408/// default is to log a message with the listening port. 409pub fn after_start( 410 builder: Builder(in, out), 411 after_start: fn(Int, Scheme, IpAddress) -> Nil, 412) -> Builder(in, out) { 413 Builder(..builder, after_start: after_start) 414} 415 416/// Specify an interface to listen on. This is a string that can have the 417/// following values: "localhost", a valid IPv4 address (i.e. "127.0.0.1"), or 418/// a valid IPv6 address (i.e. "::1"). An invalid value will cause the 419/// application to crash. 420pub fn bind(builder: Builder(in, out), interface: String) -> Builder(in, out) { 421 Builder(..builder, interface: interface) 422} 423 424/// By default, `mist` will listen on `localhost` over IPv4. If you specify an 425/// IPv4 address to bind to, it will still only serve over IPv4. Calling this 426/// function will listen on both IPv4 and IPv6 for the given interface. If it is 427/// not supported, your application will crash. If you provide an IPv6 address 428/// to `mist.bind`, this function will have no effect. 429pub fn with_ipv6(builder: Builder(in, out)) -> Builder(in, out) { 430 Builder(..builder, ipv6_support: True) 431} 432 433fn convert_body_types( 434 resp: Response(ResponseData), 435) -> Response(InternalResponseData) { 436 let new_body = case resp.body { 437 Websocket(selector) -> InternalWebsocket(selector) 438 Bytes(data) -> InternalBytes(data) 439 File(descriptor, offset, length) -> InternalFile(descriptor, offset, length) 440 Chunked(iter) -> InternalChunked(iter) 441 ServerSentEvents(selector) -> InternalServerSentEvents(selector) 442 } 443 response.set_body(resp, new_body) 444} 445 446pub type Port { 447 Assigned 448 Provided(Int) 449} 450 451pub opaque type Server { 452 Server( 453 supervisor: Subject(supervisor.Message), 454 port: Int, 455 ip_address: IpAddress, 456 ) 457} 458 459pub fn get_supervisor(server: Server) -> Subject(supervisor.Message) { 460 server.supervisor 461} 462 463pub fn get_port(server: Server) -> Int { 464 server.port 465} 466 467/// Start a `mist` service over HTTP with the provided builder. 468pub fn start_http( 469 builder: Builder(Connection, ResponseData), 470) -> Result(Subject(supervisor.Message), glisten.StartError) { 471 start_http_server(builder) 472 |> result.map(get_supervisor) 473} 474 475/// See the documentation for `start_http`. For now, you almost certainly 476/// want to use that. In the future, this will allow access to things like 477/// OS-provided ports, graceful shutdown, etc. 478pub fn start_http_server( 479 builder: Builder(Connection, ResponseData), 480) -> Result(Server, glisten.StartError) { 481 fn(req) { convert_body_types(builder.handler(req)) } 482 |> handler.with_func 483 |> glisten.handler(handler.init, _) 484 |> glisten.bind(builder.interface) 485 |> fn(handler) { 486 case builder.ipv6_support { 487 True -> glisten.with_ipv6(handler) 488 False -> handler 489 } 490 } 491 |> glisten.start_server(builder.port) 492 |> result.map(fn(server) { 493 case glisten.get_server_info(server, 5000) { 494 Ok(info) -> { 495 let ip_address = to_mist_ip_address(info.ip_address) 496 builder.after_start(info.port, Http, ip_address) 497 Server( 498 supervisor: glisten.get_supervisor(server), 499 port: info.port, 500 ip_address: ip_address, 501 ) 502 } 503 Error(reason) -> { 504 logging.log( 505 logging.Error, 506 "Failed to read port from socket: " <> string.inspect(reason), 507 ) 508 panic 509 } 510 } 511 }) 512} 513 514/// These are the types of errors raised by trying to read the certificate and 515/// key files. 516pub type CertificateError { 517 NoCertificate 518 NoKey 519 NoKeyOrCertificate 520} 521 522/// These are the possible errors raised when trying to start an Https server. 523/// If there are issues reading the certificate or key files, those will be 524/// returned. 525pub type HttpsError { 526 GlistenError(glisten.StartError) 527 CertificateError(CertificateError) 528} 529 530/// Start a `mist` service over HTTPS with the provided builder. This method 531/// requires both a certificate file and a key file. The library will attempt 532/// to read these files off of the disk. 533pub fn start_https( 534 builder: Builder(Connection, ResponseData), 535 certfile certfile: String, 536 keyfile keyfile: String, 537) -> Result(Subject(supervisor.Message), HttpsError) { 538 start_https_server(builder, certfile, keyfile) 539 |> result.map(get_supervisor) 540} 541 542/// See the documentation for `start_https`. For now, you almost certainly 543/// want to use that. In the future, this will allow access to things like 544/// OS-provided ports, graceful shutdown, etc. 545pub fn start_https_server( 546 builder: Builder(Connection, ResponseData), 547 certfile certfile: String, 548 keyfile keyfile: String, 549) -> Result(Server, HttpsError) { 550 let cert = file.open(bit_array.from_string(certfile)) 551 let key = file.open(bit_array.from_string(keyfile)) 552 553 let res = case cert, key { 554 Error(_), Error(_) -> Error(CertificateError(NoKeyOrCertificate)) 555 Ok(_), Error(_) -> Error(CertificateError(NoKey)) 556 Error(_), Ok(_) -> Error(CertificateError(NoCertificate)) 557 Ok(_), Ok(_) -> Ok(Nil) 558 } 559 560 use _ <- result.then(res) 561 562 fn(req) { convert_body_types(builder.handler(req)) } 563 |> handler.with_func 564 |> glisten.handler(handler.init, _) 565 |> glisten.bind(builder.interface) 566 |> glisten.start_ssl_server(builder.port, certfile, keyfile) 567 |> result.map(fn(server) { 568 case glisten.get_server_info(server, 1000) { 569 Ok(info) -> { 570 let ip_address = to_mist_ip_address(info.ip_address) 571 builder.after_start(info.port, Https, ip_address) 572 Server( 573 supervisor: glisten.get_supervisor(server), 574 port: info.port, 575 ip_address: ip_address, 576 ) 577 } 578 Error(reason) -> { 579 logging.log( 580 logging.Error, 581 "Failed to read port from socket: " <> string.inspect(reason), 582 ) 583 panic 584 } 585 } 586 }) 587 |> result.map_error(GlistenError) 588} 589 590/// These are the types of messages that a websocket handler may receive. 591pub type WebsocketMessage(custom) { 592 Text(String) 593 Binary(BitArray) 594 Closed 595 Shutdown 596 Custom(custom) 597} 598 599fn internal_to_public_ws_message( 600 msg: HandlerMessage(custom), 601) -> Result(WebsocketMessage(custom), Nil) { 602 case msg { 603 Internal(Data(TextFrame(_length, data))) -> { 604 data 605 |> bit_array.to_string 606 |> result.map(Text) 607 } 608 Internal(Data(BinaryFrame(_length, data))) -> Ok(Binary(data)) 609 User(msg) -> Ok(Custom(msg)) 610 _ -> Error(Nil) 611 } 612} 613 614/// Upgrade a request to handle websockets. If the request is 615/// malformed, or the websocket process fails to initialize, an empty 616/// 400 response will be sent to the client. 617/// 618/// The `on_init` method will be called when the actual WebSocket process 619/// is started, and the return value is the initial state and an optional 620/// selector for receiving user messages. 621/// 622/// The `on_close` method is called when the WebSocket process shuts down 623/// for any reason, valid or otherwise. 624pub fn websocket( 625 request request: Request(Connection), 626 handler handler: fn(state, WebsocketConnection, WebsocketMessage(message)) -> 627 actor.Next(message, state), 628 on_init on_init: fn(WebsocketConnection) -> 629 #(state, Option(process.Selector(message))), 630 on_close on_close: fn(state) -> Nil, 631) -> Response(ResponseData) { 632 let handler = fn(state, connection, message) { 633 message 634 |> internal_to_public_ws_message 635 |> result.map(handler(state, connection, _)) 636 |> result.unwrap(actor.continue(state)) 637 } 638 let extensions = 639 request 640 |> request.get_header("sec-websocket-extensions") 641 |> result.map(fn(header) { string.split(header, ";") }) 642 |> result.unwrap([]) 643 644 let socket = request.body.socket 645 let transport = request.body.transport 646 request 647 |> http.upgrade(socket, transport, extensions, _) 648 |> result.then(fn(_nil) { 649 websocket.initialize_connection( 650 on_init, 651 on_close, 652 handler, 653 socket, 654 transport, 655 extensions, 656 ) 657 }) 658 |> result.map(fn(subj) { 659 let ws_process = process.subject_owner(subj) 660 let monitor = process.monitor_process(ws_process) 661 let selector = 662 process.new_selector() 663 |> process.selecting_process_down(monitor, function.identity) 664 response.new(200) 665 |> response.set_body(Websocket(selector)) 666 }) 667 |> result.lazy_unwrap(fn() { 668 response.new(400) 669 |> response.set_body(Bytes(bytes_tree.new())) 670 }) 671} 672 673pub type WebsocketConnection = 674 InternalWebsocketConnection 675 676/// Sends a binary frame across the websocket. 677pub fn send_binary_frame( 678 connection: WebsocketConnection, 679 frame: BitArray, 680) -> Result(Nil, glisten.SocketReason) { 681 let binary_frame = 682 rescue(fn() { 683 gramps_websocket.to_binary_frame(frame, connection.deflate, None) 684 }) 685 case binary_frame { 686 Ok(binary_frame) -> { 687 transport.send(connection.transport, connection.socket, binary_frame) 688 } 689 Error(reason) -> { 690 logging.log( 691 logging.Error, 692 "Cannot send messages from a different process than the WebSocket: " 693 <> string.inspect(reason), 694 ) 695 panic as "Exiting due to sending WebSocket message from non-owning process" 696 } 697 } 698} 699 700/// Sends a text frame across the websocket. 701pub fn send_text_frame( 702 connection: WebsocketConnection, 703 frame: String, 704) -> Result(Nil, glisten.SocketReason) { 705 let text_frame = 706 rescue(fn() { 707 gramps_websocket.to_text_frame(frame, connection.deflate, None) 708 }) 709 case text_frame { 710 Ok(text_frame) -> { 711 transport.send(connection.transport, connection.socket, text_frame) 712 } 713 Error(reason) -> { 714 logging.log( 715 logging.Error, 716 "Cannot send messages from a different process than the WebSocket: " 717 <> string.inspect(reason), 718 ) 719 panic as "Exiting due to sending WebSocket message from non-owning process" 720 } 721 } 722} 723 724// Returned by `init_server_sent_events`. This type must be passed to 725// `send_event` since we need to enforce that the correct headers / data shapw 726// is provided. 727pub opaque type SSEConnection { 728 SSEConnection(Connection) 729} 730 731// Represents each event. Only `data` is required. The `event` name will 732// default to `message`. If an `id` is provided, it will be included in the 733// event received by the client. 734pub opaque type SSEEvent { 735 SSEEvent(id: Option(String), event: Option(String), data: StringTree) 736} 737 738// Builder for generating the base event 739pub fn event(data: StringTree) -> SSEEvent { 740 SSEEvent(id: None, event: None, data: data) 741} 742 743// Adds an `id` to the event 744pub fn event_id(event: SSEEvent, id: String) -> SSEEvent { 745 SSEEvent(..event, id: Some(id)) 746} 747 748// Sets the `event` name field 749pub fn event_name(event: SSEEvent, name: String) -> SSEEvent { 750 SSEEvent(..event, event: Some(name)) 751} 752 753/// Sets up the connection for server-sent events. The initial response provided 754/// here will have its headers included in the SSE setup. The body is discarded. 755/// The `init` and `loop` parameters follow the same shape as the 756/// `gleam/otp/actor` module. 757/// 758/// NOTE: There is no proper way within the spec for the server to "close" the 759/// SSE connection. There are ways around it. 760/// 761/// See: `examples/eventz` for a sample usage. 762pub fn server_sent_events( 763 request req: Request(Connection), 764 initial_response resp: Response(discard), 765 init init: fn() -> actor.InitResult(state, message), 766 loop loop: fn(message, SSEConnection, state) -> actor.Next(message, state), 767) -> Response(ResponseData) { 768 let with_default_headers = 769 resp 770 |> response.set_header("content-type", "text/event-stream") 771 |> response.set_header("cache-control", "no-cache") 772 |> response.set_header("connection", "keep-alive") 773 774 transport.send( 775 req.body.transport, 776 req.body.socket, 777 encoder.response_builder(200, with_default_headers.headers, "1.1"), 778 ) 779 |> result.replace_error(Nil) 780 |> result.then(fn(_nil) { 781 actor.start_spec( 782 actor.Spec(init: init, init_timeout: 1000, loop: fn(state, message) { 783 loop(state, SSEConnection(req.body), message) 784 }), 785 ) 786 |> result.replace_error(Nil) 787 }) 788 |> result.map(fn(subj) { 789 let sse_process = process.subject_owner(subj) 790 let monitor = process.monitor_process(sse_process) 791 let selector = 792 process.new_selector() 793 |> process.selecting_process_down(monitor, function.identity) 794 response.new(200) 795 |> response.set_body(ServerSentEvents(selector)) 796 }) 797 |> result.lazy_unwrap(fn() { 798 response.new(400) 799 |> response.set_body(Bytes(bytes_tree.new())) 800 }) 801} 802 803// This constructs an event from the provided type. If `id` or `event` are 804// provided, they are included in the message. The data provided is split 805// across newlines, which I think is per the spec? The `Result` returned here 806// can be used to determine whether the event send has succeeded. 807pub fn send_event(conn: SSEConnection, event: SSEEvent) -> Result(Nil, Nil) { 808 let SSEConnection(conn) = conn 809 let id = 810 event.id 811 |> option.map(fn(id) { "id: " <> id <> "\n" }) 812 |> option.unwrap("") 813 let event_name = 814 event.event 815 |> option.map(fn(name) { "event: " <> name <> "\n" }) 816 |> option.unwrap("") 817 let data = 818 event.data 819 |> string_tree.split("\n") 820 |> list.map(fn(row) { string_tree.prepend(row, "data: ") }) 821 |> string_tree.join("\n") 822 823 let message = 824 data 825 |> string_tree.prepend(event_name) 826 |> string_tree.prepend(id) 827 |> string_tree.append("\n\n") 828 |> bytes_tree.from_string_tree 829 830 transport.send(conn.transport, conn.socket, message) 831 |> result.replace(Nil) 832 |> result.replace_error(Nil) 833}