Serenity Operating System
1/*
2 * Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
3 * Copyright (c) 2022, the SerenityOS developers.
4 *
5 * SPDX-License-Identifier: BSD-2-Clause
6 */
7
8#include <AK/CharacterTypes.h>
9#include <AK/Debug.h>
10#include <AK/JsonObject.h>
11#include <AK/MemoryStream.h>
12#include <AK/Try.h>
13#include <LibCompress/Brotli.h>
14#include <LibCompress/Gzip.h>
15#include <LibCompress/Zlib.h>
16#include <LibCore/Event.h>
17#include <LibHTTP/HttpResponse.h>
18#include <LibHTTP/Job.h>
19#include <stdio.h>
20#include <unistd.h>
21
22namespace HTTP {
23
24static ErrorOr<ByteBuffer> handle_content_encoding(ByteBuffer const& buf, DeprecatedString const& content_encoding)
25{
26 dbgln_if(JOB_DEBUG, "Job::handle_content_encoding: buf has content_encoding={}", content_encoding);
27
28 // FIXME: Actually do the decompression of the data using streams, instead of all at once when everything has been
29 // received. This will require that some of the decompression algorithms are implemented in a streaming way.
30 // Gzip and Deflate are implemented using Stream, while Brotli uses the newer Core::Stream. The Gzip and
31 // Deflate implementations will likely need to be changed to LibCore::Stream for this to work easily.
32
33 if (content_encoding == "gzip") {
34 if (!Compress::GzipDecompressor::is_likely_compressed(buf)) {
35 dbgln("Job::handle_content_encoding: buf is not gzip compressed!");
36 }
37
38 dbgln_if(JOB_DEBUG, "Job::handle_content_encoding: buf is gzip compressed!");
39
40 auto uncompressed = TRY(Compress::GzipDecompressor::decompress_all(buf));
41
42 if constexpr (JOB_DEBUG) {
43 dbgln("Job::handle_content_encoding: Gzip::decompress() successful.");
44 dbgln(" Input size: {}", buf.size());
45 dbgln(" Output size: {}", uncompressed.size());
46 }
47
48 return uncompressed;
49 } else if (content_encoding == "deflate") {
50 dbgln_if(JOB_DEBUG, "Job::handle_content_encoding: buf is deflate compressed!");
51
52 // Even though the content encoding is "deflate", it's actually deflate with the zlib wrapper.
53 // https://tools.ietf.org/html/rfc7230#section-4.2.2
54 auto uncompressed = Compress::ZlibDecompressor::decompress_all(buf);
55 if (!uncompressed.has_value()) {
56 // From the RFC:
57 // "Note: Some non-conformant implementations send the "deflate"
58 // compressed data without the zlib wrapper."
59 dbgln_if(JOB_DEBUG, "Job::handle_content_encoding: ZlibDecompressor::decompress_all() failed. Trying DeflateDecompressor::decompress_all()");
60 uncompressed = TRY(Compress::DeflateDecompressor::decompress_all(buf));
61 }
62
63 if constexpr (JOB_DEBUG) {
64 dbgln("Job::handle_content_encoding: Deflate decompression successful.");
65 dbgln(" Input size: {}", buf.size());
66 dbgln(" Output size: {}", uncompressed.value().size());
67 }
68
69 return uncompressed.release_value();
70 } else if (content_encoding == "br") {
71 dbgln_if(JOB_DEBUG, "Job::handle_content_encoding: buf is brotli compressed!");
72
73 FixedMemoryStream bufstream { buf };
74 auto brotli_stream = Compress::BrotliDecompressionStream { bufstream };
75
76 auto uncompressed = TRY(brotli_stream.read_until_eof());
77 if constexpr (JOB_DEBUG) {
78 dbgln("Job::handle_content_encoding: Brotli::decompress() successful.");
79 dbgln(" Input size: {}", buf.size());
80 dbgln(" Output size: {}", uncompressed.size());
81 }
82
83 return uncompressed;
84 }
85
86 return buf;
87}
88
89Job::Job(HttpRequest&& request, Stream& output_stream)
90 : Core::NetworkJob(output_stream)
91 , m_request(move(request))
92{
93}
94
95void Job::start(Core::Socket& socket)
96{
97 VERIFY(!m_socket);
98 m_socket = static_cast<Core::BufferedSocketBase*>(&socket);
99 dbgln_if(HTTPJOB_DEBUG, "Reusing previous connection for {}", url());
100 deferred_invoke([this] {
101 dbgln_if(HTTPJOB_DEBUG, "HttpJob: on_connected callback");
102 on_socket_connected();
103 });
104}
105
106void Job::shutdown(ShutdownMode mode)
107{
108 if (!m_socket)
109 return;
110 if (mode == ShutdownMode::CloseSocket) {
111 m_socket->close();
112 m_socket->on_ready_to_read = nullptr;
113 } else {
114 m_socket->on_ready_to_read = nullptr;
115 m_socket = nullptr;
116 }
117}
118
119void Job::flush_received_buffers()
120{
121 if (!m_can_stream_response || m_buffered_size == 0)
122 return;
123 dbgln_if(JOB_DEBUG, "Job: Flushing received buffers: have {} bytes in {} buffers for {}", m_buffered_size, m_received_buffers.size(), m_request.url());
124 for (size_t i = 0; i < m_received_buffers.size(); ++i) {
125 auto& payload = m_received_buffers[i]->pending_flush;
126 auto result = do_write(payload);
127 if (result.is_error()) {
128 if (!result.error().is_errno()) {
129 dbgln_if(JOB_DEBUG, "Job: Failed to flush received buffers: {}", result.error());
130 continue;
131 }
132 if (result.error().code() == EINTR) {
133 i--;
134 continue;
135 }
136 break;
137 }
138 auto written = result.release_value();
139 m_buffered_size -= written;
140 if (written == payload.size()) {
141 // FIXME: Make this a take-first-friendly object?
142 (void)m_received_buffers.take_first();
143 --i;
144 continue;
145 }
146 VERIFY(written < payload.size());
147 payload = payload.slice(written, payload.size() - written);
148 break;
149 }
150 dbgln_if(JOB_DEBUG, "Job: Flushing received buffers done: have {} bytes in {} buffers for {}", m_buffered_size, m_received_buffers.size(), m_request.url());
151}
152
153void Job::register_on_ready_to_read(Function<void()> callback)
154{
155 m_socket->on_ready_to_read = [this, callback = move(callback)] {
156 callback();
157
158 // As `m_socket` is a buffered object, we might not get notifications for data in the buffer
159 // so exhaust the buffer to ensure we don't end up waiting forever.
160 auto can_read_without_blocking = m_socket->can_read_without_blocking();
161 if (can_read_without_blocking.is_error())
162 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
163 if (can_read_without_blocking.value() && m_state != State::Finished && !has_error()) {
164 deferred_invoke([this] {
165 if (m_socket && m_socket->on_ready_to_read)
166 m_socket->on_ready_to_read();
167 });
168 }
169 };
170}
171
172ErrorOr<DeprecatedString> Job::read_line(size_t size)
173{
174 auto buffer = TRY(ByteBuffer::create_uninitialized(size));
175 auto bytes_read = TRY(m_socket->read_until(buffer, "\r\n"sv));
176 return DeprecatedString::copy(bytes_read);
177}
178
179ErrorOr<ByteBuffer> Job::receive(size_t size)
180{
181 if (size == 0)
182 return ByteBuffer {};
183
184 auto buffer = TRY(ByteBuffer::create_uninitialized(size));
185 size_t nread;
186 do {
187 auto result = m_socket->read_some(buffer);
188 if (result.is_error() && result.error().is_errno() && result.error().code() == EINTR)
189 continue;
190 nread = TRY(result).size();
191 break;
192 } while (true);
193 return buffer.slice(0, nread);
194}
195
196void Job::on_socket_connected()
197{
198 auto raw_request = m_request.to_raw_request().release_value_but_fixme_should_propagate_errors();
199
200 if constexpr (JOB_DEBUG) {
201 dbgln("Job: raw_request:");
202 dbgln("{}", DeprecatedString::copy(raw_request));
203 }
204
205 bool success = !m_socket->write_until_depleted(raw_request).is_error();
206 if (!success)
207 deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
208
209 register_on_ready_to_read([&] {
210 dbgln_if(JOB_DEBUG, "Ready to read for {}, state = {}, cancelled = {}", m_request.url(), to_underlying(m_state), is_cancelled());
211 if (is_cancelled())
212 return;
213
214 if (m_state == State::Finished) {
215 // We have everything we want, at this point, we can either get an EOF, or a bunch of extra newlines
216 // (unless "Connection: close" isn't specified)
217 // So just ignore everything after this.
218 return;
219 }
220
221 if (m_socket->is_eof()) {
222 dbgln_if(JOB_DEBUG, "Read failure: Actually EOF!");
223 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::ProtocolFailed); });
224 }
225
226 while (m_state == State::InStatus) {
227 auto can_read_line = m_socket->can_read_line();
228 if (can_read_line.is_error()) {
229 dbgln_if(JOB_DEBUG, "Job {} could not figure out whether we could read a line", m_request.url());
230 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
231 }
232
233 if (!can_read_line.value()) {
234 dbgln_if(JOB_DEBUG, "Job {} cannot read a full line", m_request.url());
235 // TODO: Should we retry here instead of failing instantly?
236 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
237 }
238
239 auto maybe_line = read_line(PAGE_SIZE);
240 if (maybe_line.is_error()) {
241 dbgln_if(JOB_DEBUG, "Job {} could not read line: {}", m_request.url(), maybe_line.error());
242 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
243 }
244
245 auto line = maybe_line.release_value();
246
247 dbgln_if(JOB_DEBUG, "Job {} read line of length {}", m_request.url(), line.length());
248 if (line.is_null()) {
249 dbgln("Job: Expected HTTP status");
250 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
251 }
252 auto parts = line.split_view(' ');
253 if (parts.size() < 2) {
254 dbgln("Job: Expected 2-part or 3-part HTTP status line, got '{}'", line);
255 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::ProtocolFailed); });
256 }
257
258 if (!parts[0].matches("HTTP/?.?"sv, CaseSensitivity::CaseSensitive) || !is_ascii_digit(parts[0][5]) || !is_ascii_digit(parts[0][7])) {
259 dbgln("Job: Expected HTTP-Version to be of the form 'HTTP/X.Y', got '{}'", parts[0]);
260 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::ProtocolFailed); });
261 }
262 auto http_major_version = parse_ascii_digit(parts[0][5]);
263 auto http_minor_version = parse_ascii_digit(parts[0][7]);
264 m_legacy_connection = http_major_version < 1 || (http_major_version == 1 && http_minor_version == 0);
265
266 auto code = parts[1].to_uint();
267 if (!code.has_value()) {
268 dbgln("Job: Expected numeric HTTP status");
269 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::ProtocolFailed); });
270 }
271 m_code = code.value();
272 m_state = State::InHeaders;
273
274 auto can_read_without_blocking = m_socket->can_read_without_blocking();
275 if (can_read_without_blocking.is_error())
276 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
277
278 if (!can_read_without_blocking.value())
279 return;
280 }
281 while (m_state == State::InHeaders || m_state == State::Trailers) {
282 auto can_read_line = m_socket->can_read_line();
283 if (can_read_line.is_error()) {
284 dbgln_if(JOB_DEBUG, "Job {} could not figure out whether we could read a line", m_request.url());
285 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
286 }
287
288 if (!can_read_line.value()) {
289 dbgln_if(JOB_DEBUG, "Can't read lines anymore :(");
290 return;
291 }
292
293 // There's no max limit defined on headers, but for our sanity, let's limit it to 32K.
294 auto maybe_line = read_line(32 * KiB);
295 if (maybe_line.is_error()) {
296 dbgln_if(JOB_DEBUG, "Job {} could not read a header line: {}", m_request.url(), maybe_line.error());
297 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
298 }
299 auto line = maybe_line.release_value();
300
301 if (line.is_null()) {
302 if (m_state == State::Trailers) {
303 // Some servers like to send two ending chunks
304 // use this fact as an excuse to ignore anything after the last chunk
305 // that is not a valid trailing header.
306 return finish_up();
307 }
308 dbgln("Job: Expected HTTP header");
309 return did_fail(Core::NetworkJob::Error::ProtocolFailed);
310 }
311 if (line.is_empty()) {
312 if (m_state == State::Trailers) {
313 return finish_up();
314 }
315 if (on_headers_received) {
316 if (!m_set_cookie_headers.is_empty())
317 m_headers.set("Set-Cookie", JsonArray { m_set_cookie_headers }.to_deprecated_string());
318 on_headers_received(m_headers, m_code > 0 ? m_code : Optional<u32> {});
319 }
320 m_state = State::InBody;
321
322 // We've reached the end of the headers, there's a possibility that the server
323 // responds with nothing (content-length = 0 with normal encoding); if that's the case,
324 // quit early as we won't be reading anything anyway.
325 if (auto result = m_headers.get("Content-Length"sv).value_or(""sv).to_uint(); result.has_value()) {
326 if (result.value() == 0 && !m_headers.get("Transfer-Encoding"sv).value_or(""sv).view().trim_whitespace().equals_ignoring_ascii_case("chunked"sv))
327 return finish_up();
328 }
329 // There's also the possibility that the server responds with 204 (No Content),
330 // and manages to set a Content-Length anyway, in such cases ignore Content-Length and quit early;
331 // As the HTTP spec explicitly prohibits presence of Content-Length when the response code is 204.
332 if (m_code == 204)
333 return finish_up();
334
335 break;
336 }
337 auto parts = line.split_view(':');
338 if (parts.is_empty()) {
339 if (m_state == State::Trailers) {
340 // Some servers like to send two ending chunks
341 // use this fact as an excuse to ignore anything after the last chunk
342 // that is not a valid trailing header.
343 return finish_up();
344 }
345 dbgln("Job: Expected HTTP header with key/value");
346 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::ProtocolFailed); });
347 }
348 auto name = parts[0];
349 if (line.length() < name.length() + 2) {
350 if (m_state == State::Trailers) {
351 // Some servers like to send two ending chunks
352 // use this fact as an excuse to ignore anything after the last chunk
353 // that is not a valid trailing header.
354 return finish_up();
355 }
356 dbgln("Job: Malformed HTTP header: '{}' ({})", line, line.length());
357 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::ProtocolFailed); });
358 }
359 auto value = line.substring(name.length() + 2, line.length() - name.length() - 2);
360 if (name.equals_ignoring_ascii_case("Set-Cookie"sv)) {
361 dbgln_if(JOB_DEBUG, "Job: Received Set-Cookie header: '{}'", value);
362 m_set_cookie_headers.append(move(value));
363
364 auto can_read_without_blocking = m_socket->can_read_without_blocking();
365 if (can_read_without_blocking.is_error())
366 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
367 if (!can_read_without_blocking.value())
368 return;
369 } else if (auto existing_value = m_headers.get(name); existing_value.has_value()) {
370 StringBuilder builder;
371 builder.append(existing_value.value());
372 builder.append(',');
373 builder.append(value);
374 m_headers.set(name, builder.to_deprecated_string());
375 } else {
376 m_headers.set(name, value);
377 }
378 if (name.equals_ignoring_ascii_case("Content-Encoding"sv)) {
379 // Assume that any content-encoding means that we can't decode it as a stream :(
380 dbgln_if(JOB_DEBUG, "Content-Encoding {} detected, cannot stream output :(", value);
381 m_can_stream_response = false;
382 } else if (name.equals_ignoring_ascii_case("Content-Length"sv)) {
383 auto length = value.to_uint();
384 if (length.has_value())
385 m_content_length = length.value();
386 }
387 dbgln_if(JOB_DEBUG, "Job: [{}] = '{}'", name, value);
388
389 auto can_read_without_blocking = m_socket->can_read_without_blocking();
390 if (can_read_without_blocking.is_error())
391 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
392 if (!can_read_without_blocking.value()) {
393 dbgln_if(JOB_DEBUG, "Can't read headers anymore, byebye :(");
394 return;
395 }
396 }
397 VERIFY(m_state == State::InBody);
398
399 while (true) {
400 auto can_read_without_blocking = m_socket->can_read_without_blocking();
401 if (can_read_without_blocking.is_error())
402 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
403 if (!can_read_without_blocking.value())
404 break;
405
406 auto read_size = 64 * KiB;
407 if (m_current_chunk_remaining_size.has_value()) {
408 read_chunk_size:;
409 auto remaining = m_current_chunk_remaining_size.value();
410 if (remaining == -1) {
411 // read size
412 auto maybe_size_data = read_line(PAGE_SIZE);
413 if (maybe_size_data.is_error()) {
414 dbgln_if(JOB_DEBUG, "Job: Could not receive chunk: {}", maybe_size_data.error());
415 }
416 auto size_data = maybe_size_data.release_value();
417
418 if (m_should_read_chunk_ending_line) {
419 VERIFY(size_data.is_empty());
420 m_should_read_chunk_ending_line = false;
421 continue;
422 }
423 auto size_lines = size_data.view().lines();
424 dbgln_if(JOB_DEBUG, "Job: Received a chunk with size '{}'", size_data);
425 if (size_lines.size() == 0) {
426 if (!m_socket->is_eof())
427 break;
428 dbgln("Job: Reached end of stream");
429 finish_up();
430 break;
431 } else {
432 auto chunk = size_lines[0].split_view(';', SplitBehavior::KeepEmpty);
433 DeprecatedString size_string = chunk[0];
434 char* endptr;
435 auto size = strtoul(size_string.characters(), &endptr, 16);
436 if (*endptr) {
437 // invalid number
438 deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
439 break;
440 }
441 if (size == 0) {
442 // This is the last chunk
443 // '0' *[; chunk-ext-name = chunk-ext-value]
444 // We're going to ignore _all_ chunk extensions
445 read_size = 0;
446 m_current_chunk_total_size = 0;
447 m_current_chunk_remaining_size = 0;
448
449 dbgln_if(JOB_DEBUG, "Job: Received the last chunk with extensions '{}'", size_string.substring_view(1, size_string.length() - 1));
450 } else {
451 m_current_chunk_total_size = size;
452 m_current_chunk_remaining_size = size;
453 read_size = size;
454
455 dbgln_if(JOB_DEBUG, "Job: Chunk of size '{}' started", size);
456 }
457 }
458 } else {
459 read_size = remaining;
460
461 dbgln_if(JOB_DEBUG, "Job: Resuming chunk with '{}' bytes left over", remaining);
462 }
463 } else {
464 auto transfer_encoding = m_headers.get("Transfer-Encoding");
465 if (transfer_encoding.has_value()) {
466 // HTTP/1.1 3.3.3.3:
467 // If a message is received with both a Transfer-Encoding and a Content-Length header field, the Transfer-Encoding overrides the Content-Length. [...]
468 // https://httpwg.org/specs/rfc7230.html#message.body.length
469 m_content_length = {};
470
471 // Note: Some servers add extra spaces around 'chunked', see #6302.
472 auto encoding = transfer_encoding.value().trim_whitespace();
473
474 dbgln_if(JOB_DEBUG, "Job: This content has transfer encoding '{}'", encoding);
475 if (encoding.equals_ignoring_ascii_case("chunked"sv)) {
476 m_current_chunk_remaining_size = -1;
477 goto read_chunk_size;
478 } else {
479 dbgln("Job: Unknown transfer encoding '{}', the result will likely be wrong!", encoding);
480 }
481 }
482 }
483
484 can_read_without_blocking = m_socket->can_read_without_blocking();
485 if (can_read_without_blocking.is_error())
486 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
487 if (!can_read_without_blocking.value())
488 break;
489
490 dbgln_if(JOB_DEBUG, "Waiting for payload for {}", m_request.url());
491 auto maybe_payload = receive(read_size);
492 if (maybe_payload.is_error()) {
493 dbgln_if(JOB_DEBUG, "Could not read the payload: {}", maybe_payload.error());
494 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
495 }
496
497 auto payload = maybe_payload.release_value();
498
499 if (payload.is_empty() && m_socket->is_eof()) {
500 finish_up();
501 break;
502 }
503
504 bool read_everything = false;
505 if (m_content_length.has_value()) {
506 auto length = m_content_length.value();
507 if (m_received_size + payload.size() >= length) {
508 payload.resize(length - m_received_size);
509 read_everything = true;
510 }
511 }
512
513 m_received_buffers.append(make<ReceivedBuffer>(payload));
514 m_buffered_size += payload.size();
515 m_received_size += payload.size();
516 flush_received_buffers();
517
518 deferred_invoke([this] { did_progress(m_content_length, m_received_size); });
519
520 if (read_everything) {
521 VERIFY(m_received_size <= m_content_length.value());
522 finish_up();
523 break;
524 }
525
526 // Check after reading all the buffered data if we have reached the end of stream
527 // for cases where the server didn't send a content length, chunked encoding but is
528 // directly closing the connection.
529 if (!m_content_length.has_value() && !m_current_chunk_remaining_size.has_value() && m_socket->is_eof()) {
530 finish_up();
531 break;
532 }
533
534 if (m_current_chunk_remaining_size.has_value()) {
535 auto size = m_current_chunk_remaining_size.value() - payload.size();
536
537 dbgln_if(JOB_DEBUG, "Job: We have {} bytes left over in this chunk", size);
538 if (size == 0) {
539 dbgln_if(JOB_DEBUG, "Job: Finished a chunk of {} bytes", m_current_chunk_total_size.value());
540
541 if (m_current_chunk_total_size.value() == 0) {
542 m_state = State::Trailers;
543 break;
544 }
545
546 // we've read everything, now let's get the next chunk
547 size = -1;
548
549 auto can_read_line = m_socket->can_read_line();
550 if (can_read_line.is_error())
551 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
552 if (can_read_line.value()) {
553 auto maybe_line = read_line(PAGE_SIZE);
554 if (maybe_line.is_error()) {
555 return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
556 }
557
558 VERIFY(maybe_line.value().is_empty());
559 } else {
560 m_should_read_chunk_ending_line = true;
561 }
562 }
563 m_current_chunk_remaining_size = size;
564 }
565 }
566
567 if (!m_socket->is_open()) {
568 dbgln_if(JOB_DEBUG, "Connection appears to have closed, finishing up");
569 finish_up();
570 }
571 });
572}
573
574void Job::timer_event(Core::TimerEvent& event)
575{
576 event.accept();
577 finish_up();
578 if (m_buffered_size == 0)
579 stop_timer();
580}
581
582void Job::finish_up()
583{
584 VERIFY(!m_has_scheduled_finish);
585 m_state = State::Finished;
586 if (!m_can_stream_response) {
587 auto maybe_flattened_buffer = ByteBuffer::create_uninitialized(m_buffered_size);
588 if (maybe_flattened_buffer.is_error())
589 return did_fail(Core::NetworkJob::Error::TransmissionFailed);
590 auto flattened_buffer = maybe_flattened_buffer.release_value();
591
592 u8* flat_ptr = flattened_buffer.data();
593 for (auto& received_buffer : m_received_buffers) {
594 memcpy(flat_ptr, received_buffer->pending_flush.data(), received_buffer->pending_flush.size());
595 flat_ptr += received_buffer->pending_flush.size();
596 }
597 m_received_buffers.clear();
598
599 // For the time being, we cannot stream stuff with content-encoding set to _anything_.
600 // FIXME: LibCompress exposes a streaming interface, so this can be resolved
601 auto content_encoding = m_headers.get("Content-Encoding");
602 if (content_encoding.has_value()) {
603 if (auto result = handle_content_encoding(flattened_buffer, content_encoding.value()); !result.is_error())
604 flattened_buffer = result.release_value();
605 else
606 return did_fail(Core::NetworkJob::Error::TransmissionFailed);
607 }
608
609 m_buffered_size = flattened_buffer.size();
610 m_received_buffers.append(make<ReceivedBuffer>(move(flattened_buffer)));
611 m_can_stream_response = true;
612 }
613
614 flush_received_buffers();
615 if (m_buffered_size != 0) {
616 // We have to wait for the client to consume all the downloaded data
617 // before we can actually call `did_finish`. in a normal flow, this should
618 // never be hit since the client is reading as we are writing, unless there
619 // are too many concurrent downloads going on.
620 dbgln_if(JOB_DEBUG, "Flush finished with {} bytes remaining, will try again later", m_buffered_size);
621 if (!has_timer())
622 start_timer(50);
623 return;
624 }
625
626 m_has_scheduled_finish = true;
627 auto response = HttpResponse::create(m_code, move(m_headers), m_received_size);
628 deferred_invoke([this, response = move(response)] {
629 // If the server responded with "Connection: close", close the connection
630 // as the server may or may not want to close the socket. Also, if this is
631 // a legacy HTTP server (1.0 or older), assume close is the default value.
632 if (auto result = response->headers().get("Connection"sv); result.has_value() ? result->equals_ignoring_ascii_case("close"sv) : m_legacy_connection)
633 shutdown(ShutdownMode::CloseSocket);
634 did_finish(response);
635 });
636}
637}