Serenity Operating System
1/*
2 * Copyright (c) 2021-2022, Ali Mohammad Pur <mpfard@serenityos.org>
3 * Copyright (c) 2022, the SerenityOS developers.
4 *
5 * SPDX-License-Identifier: BSD-2-Clause
6 */
7
8#pragma once
9
10#include <AK/Debug.h>
11#include <AK/HashMap.h>
12#include <AK/URL.h>
13#include <AK/Vector.h>
14#include <LibCore/ElapsedTimer.h>
15#include <LibCore/EventLoop.h>
16#include <LibCore/NetworkJob.h>
17#include <LibCore/SOCKSProxyClient.h>
18#include <LibCore/Timer.h>
19#include <LibTLS/TLSv12.h>
20
21namespace RequestServer {
22
23enum class CacheLevel {
24 ResolveOnly,
25 CreateConnection,
26};
27
28}
29
30namespace RequestServer::ConnectionCache {
31
32struct Proxy {
33 Core::ProxyData data;
34 OwnPtr<Core::SOCKSProxyClient> proxy_client_storage {};
35
36 template<typename SocketType, typename StorageType, typename... Args>
37 ErrorOr<NonnullOwnPtr<StorageType>> tunnel(URL const& url, Args&&... args)
38 {
39 if (data.type == Core::ProxyData::Direct) {
40 return TRY(SocketType::connect(url.host(), url.port_or_default(), forward<Args>(args)...));
41 }
42 if (data.type == Core::ProxyData::SOCKS5) {
43 if constexpr (requires { SocketType::connect(declval<DeprecatedString>(), *proxy_client_storage, forward<Args>(args)...); }) {
44 proxy_client_storage = TRY(Core::SOCKSProxyClient::connect(data.host_ipv4, data.port, Core::SOCKSProxyClient::Version::V5, url.host(), url.port_or_default()));
45 return TRY(SocketType::connect(url.host(), *proxy_client_storage, forward<Args>(args)...));
46 } else if constexpr (IsSame<SocketType, Core::TCPSocket>) {
47 return TRY(Core::SOCKSProxyClient::connect(data.host_ipv4, data.port, Core::SOCKSProxyClient::Version::V5, url.host(), url.port_or_default()));
48 } else {
49 return Error::from_string_literal("SOCKS5 not supported for this socket type");
50 }
51 }
52 VERIFY_NOT_REACHED();
53 }
54};
55
56template<typename Socket, typename SocketStorageType = Socket>
57struct Connection {
58 struct JobData {
59 Function<void(Core::Socket&)> start {};
60 Function<void(Core::NetworkJob::Error)> fail {};
61 Function<Vector<TLS::Certificate>()> provide_client_certificates {};
62
63 template<typename T>
64 static JobData create(T& job)
65 {
66 // Clang-format _really_ messes up formatting this, so just format it manually.
67 // clang-format off
68 return JobData {
69 .start = [&job](auto& socket) {
70 job.start(socket);
71 },
72 .fail = [&job](auto error) {
73 job.fail(error);
74 },
75 .provide_client_certificates = [&job] {
76 if constexpr (requires { job.on_certificate_requested; }) {
77 if (job.on_certificate_requested)
78 return job.on_certificate_requested();
79 } else {
80 // "use" `job`, otherwise clang gets sad.
81 (void)job;
82 }
83 return Vector<TLS::Certificate> {};
84 },
85 };
86 // clang-format on
87 }
88 };
89 using QueueType = Vector<JobData>;
90 using SocketType = Socket;
91 using StorageType = SocketStorageType;
92
93 NonnullOwnPtr<Core::BufferedSocket<SocketStorageType>> socket;
94 QueueType request_queue;
95 NonnullRefPtr<Core::Timer> removal_timer;
96 bool has_started { false };
97 URL current_url {};
98 Core::ElapsedTimer timer {};
99 JobData job_data {};
100 Proxy proxy {};
101};
102
103struct ConnectionKey {
104 DeprecatedString hostname;
105 u16 port { 0 };
106 Core::ProxyData proxy_data {};
107
108 bool operator==(ConnectionKey const&) const = default;
109};
110
111};
112
113template<>
114struct AK::Traits<RequestServer::ConnectionCache::ConnectionKey> : public AK::GenericTraits<RequestServer::ConnectionCache::ConnectionKey> {
115 static u32 hash(RequestServer::ConnectionCache::ConnectionKey const& key)
116 {
117 return pair_int_hash(pair_int_hash(key.proxy_data.host_ipv4, key.proxy_data.port), pair_int_hash(key.hostname.hash(), key.port));
118 }
119};
120
121namespace RequestServer::ConnectionCache {
122
123extern HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>> g_tcp_connection_cache;
124extern HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::TLSv12>>>>> g_tls_connection_cache;
125
126void request_did_finish(URL const&, Core::Socket const*);
127void dump_jobs();
128
129constexpr static size_t MaxConcurrentConnectionsPerURL = 4;
130constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 10'000;
131
132template<typename T>
133ErrorOr<void> recreate_socket_if_needed(T& connection, URL const& url)
134{
135 using SocketType = typename T::SocketType;
136 using SocketStorageType = typename T::StorageType;
137
138 if (!connection.socket->is_open() || connection.socket->is_eof()) {
139 // Create another socket for the connection.
140 auto set_socket = [&](auto socket) -> ErrorOr<void> {
141 connection.socket = TRY(Core::BufferedSocket<SocketStorageType>::create(move(socket)));
142 return {};
143 };
144
145 if constexpr (IsSame<TLS::TLSv12, SocketType>) {
146 TLS::Options options;
147 options.set_alert_handler([&connection](TLS::AlertDescription alert) {
148 Core::NetworkJob::Error reason;
149 if (alert == TLS::AlertDescription::HandshakeFailure)
150 reason = Core::NetworkJob::Error::ProtocolFailed;
151 else if (alert == TLS::AlertDescription::DecryptError)
152 reason = Core::NetworkJob::Error::ConnectionFailed;
153 else
154 reason = Core::NetworkJob::Error::TransmissionFailed;
155
156 if (connection.job_data.fail)
157 connection.job_data.fail(reason);
158 });
159 options.set_certificate_provider([&connection]() -> Vector<TLS::Certificate> {
160 if (connection.job_data.provide_client_certificates)
161 return connection.job_data.provide_client_certificates();
162 return {};
163 });
164 TRY(set_socket(TRY((connection.proxy.template tunnel<SocketType, SocketStorageType>(url, move(options))))));
165 } else {
166 TRY(set_socket(TRY((connection.proxy.template tunnel<SocketType, SocketStorageType>(url)))));
167 }
168 dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket);
169 }
170 return {};
171}
172
173decltype(auto) get_or_create_connection(auto& cache, URL const& url, auto& job, Core::ProxyData proxy_data = {})
174{
175 using CacheEntryType = RemoveCVReference<decltype(*cache.begin()->value)>;
176 auto& sockets_for_url = *cache.ensure({ url.host(), url.port_or_default(), proxy_data }, [] { return make<CacheEntryType>(); });
177
178 Proxy proxy { proxy_data };
179
180 using ReturnType = decltype(sockets_for_url[0].ptr());
181 auto it = sockets_for_url.find_if([](auto& connection) { return connection->request_queue.is_empty(); });
182 auto did_add_new_connection = false;
183 auto failed_to_find_a_socket = it.is_end();
184 if (failed_to_find_a_socket && sockets_for_url.size() < ConnectionCache::MaxConcurrentConnectionsPerURL) {
185 using ConnectionType = RemoveCVReference<decltype(*cache.begin()->value->at(0))>;
186 auto connection_result = proxy.tunnel<typename ConnectionType::SocketType, typename ConnectionType::StorageType>(url);
187 if (connection_result.is_error()) {
188 dbgln("ConnectionCache: Connection to {} failed: {}", url, connection_result.error());
189 Core::deferred_invoke([&job] {
190 job.fail(Core::NetworkJob::Error::ConnectionFailed);
191 });
192 return ReturnType { nullptr };
193 }
194 auto socket_result = Core::BufferedSocket<typename ConnectionType::StorageType>::create(connection_result.release_value());
195 if (socket_result.is_error()) {
196 dbgln("ConnectionCache: Failed to make a buffered socket for {}: {}", url, socket_result.error());
197 Core::deferred_invoke([&job] {
198 job.fail(Core::NetworkJob::Error::ConnectionFailed);
199 });
200 return ReturnType { nullptr };
201 }
202 sockets_for_url.append(make<ConnectionType>(
203 socket_result.release_value(),
204 typename ConnectionType::QueueType {},
205 Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr).release_value_but_fixme_should_propagate_errors()));
206 sockets_for_url.last()->proxy = move(proxy);
207 did_add_new_connection = true;
208 }
209 size_t index;
210 if (failed_to_find_a_socket) {
211 if (did_add_new_connection) {
212 index = sockets_for_url.size() - 1;
213 } else {
214 // Find the least backed-up connection (based on how many entries are in their request queue).
215 index = 0;
216 auto min_queue_size = (size_t)-1;
217 for (auto it = sockets_for_url.begin(); it != sockets_for_url.end(); ++it) {
218 if (auto queue_size = (*it)->request_queue.size(); min_queue_size > queue_size) {
219 index = it.index();
220 min_queue_size = queue_size;
221 }
222 }
223 }
224 } else {
225 index = it.index();
226 }
227 if (sockets_for_url.is_empty()) {
228 Core::deferred_invoke([&job] {
229 job.fail(Core::NetworkJob::Error::ConnectionFailed);
230 });
231 return ReturnType { nullptr };
232 }
233
234 auto& connection = *sockets_for_url[index];
235 if (!connection.has_started) {
236 if (auto result = recreate_socket_if_needed(connection, url); result.is_error()) {
237 dbgln("ConnectionCache: request failed to start, failed to make a socket: {}", result.error());
238 Core::deferred_invoke([&job] {
239 job.fail(Core::NetworkJob::Error::ConnectionFailed);
240 });
241 return ReturnType { nullptr };
242 }
243 dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket);
244 connection.has_started = true;
245 connection.removal_timer->stop();
246 connection.timer.start();
247 connection.current_url = url;
248 connection.job_data = decltype(connection.job_data)::create(job);
249 connection.socket->set_notifications_enabled(true);
250 connection.job_data.start(*connection.socket);
251 } else {
252 dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket);
253 connection.request_queue.append(decltype(connection.job_data)::create(job));
254 }
255 return &connection;
256}
257
258}