Serenity Operating System
at master 258 lines 11 kB view raw
1/* 2 * Copyright (c) 2021-2022, Ali Mohammad Pur <mpfard@serenityos.org> 3 * Copyright (c) 2022, the SerenityOS developers. 4 * 5 * SPDX-License-Identifier: BSD-2-Clause 6 */ 7 8#pragma once 9 10#include <AK/Debug.h> 11#include <AK/HashMap.h> 12#include <AK/URL.h> 13#include <AK/Vector.h> 14#include <LibCore/ElapsedTimer.h> 15#include <LibCore/EventLoop.h> 16#include <LibCore/NetworkJob.h> 17#include <LibCore/SOCKSProxyClient.h> 18#include <LibCore/Timer.h> 19#include <LibTLS/TLSv12.h> 20 21namespace RequestServer { 22 23enum class CacheLevel { 24 ResolveOnly, 25 CreateConnection, 26}; 27 28} 29 30namespace RequestServer::ConnectionCache { 31 32struct Proxy { 33 Core::ProxyData data; 34 OwnPtr<Core::SOCKSProxyClient> proxy_client_storage {}; 35 36 template<typename SocketType, typename StorageType, typename... Args> 37 ErrorOr<NonnullOwnPtr<StorageType>> tunnel(URL const& url, Args&&... args) 38 { 39 if (data.type == Core::ProxyData::Direct) { 40 return TRY(SocketType::connect(url.host(), url.port_or_default(), forward<Args>(args)...)); 41 } 42 if (data.type == Core::ProxyData::SOCKS5) { 43 if constexpr (requires { SocketType::connect(declval<DeprecatedString>(), *proxy_client_storage, forward<Args>(args)...); }) { 44 proxy_client_storage = TRY(Core::SOCKSProxyClient::connect(data.host_ipv4, data.port, Core::SOCKSProxyClient::Version::V5, url.host(), url.port_or_default())); 45 return TRY(SocketType::connect(url.host(), *proxy_client_storage, forward<Args>(args)...)); 46 } else if constexpr (IsSame<SocketType, Core::TCPSocket>) { 47 return TRY(Core::SOCKSProxyClient::connect(data.host_ipv4, data.port, Core::SOCKSProxyClient::Version::V5, url.host(), url.port_or_default())); 48 } else { 49 return Error::from_string_literal("SOCKS5 not supported for this socket type"); 50 } 51 } 52 VERIFY_NOT_REACHED(); 53 } 54}; 55 56template<typename Socket, typename SocketStorageType = Socket> 57struct Connection { 58 struct JobData { 59 Function<void(Core::Socket&)> start {}; 60 Function<void(Core::NetworkJob::Error)> fail {}; 61 Function<Vector<TLS::Certificate>()> provide_client_certificates {}; 62 63 template<typename T> 64 static JobData create(T& job) 65 { 66 // Clang-format _really_ messes up formatting this, so just format it manually. 67 // clang-format off 68 return JobData { 69 .start = [&job](auto& socket) { 70 job.start(socket); 71 }, 72 .fail = [&job](auto error) { 73 job.fail(error); 74 }, 75 .provide_client_certificates = [&job] { 76 if constexpr (requires { job.on_certificate_requested; }) { 77 if (job.on_certificate_requested) 78 return job.on_certificate_requested(); 79 } else { 80 // "use" `job`, otherwise clang gets sad. 81 (void)job; 82 } 83 return Vector<TLS::Certificate> {}; 84 }, 85 }; 86 // clang-format on 87 } 88 }; 89 using QueueType = Vector<JobData>; 90 using SocketType = Socket; 91 using StorageType = SocketStorageType; 92 93 NonnullOwnPtr<Core::BufferedSocket<SocketStorageType>> socket; 94 QueueType request_queue; 95 NonnullRefPtr<Core::Timer> removal_timer; 96 bool has_started { false }; 97 URL current_url {}; 98 Core::ElapsedTimer timer {}; 99 JobData job_data {}; 100 Proxy proxy {}; 101}; 102 103struct ConnectionKey { 104 DeprecatedString hostname; 105 u16 port { 0 }; 106 Core::ProxyData proxy_data {}; 107 108 bool operator==(ConnectionKey const&) const = default; 109}; 110 111}; 112 113template<> 114struct AK::Traits<RequestServer::ConnectionCache::ConnectionKey> : public AK::GenericTraits<RequestServer::ConnectionCache::ConnectionKey> { 115 static u32 hash(RequestServer::ConnectionCache::ConnectionKey const& key) 116 { 117 return pair_int_hash(pair_int_hash(key.proxy_data.host_ipv4, key.proxy_data.port), pair_int_hash(key.hostname.hash(), key.port)); 118 } 119}; 120 121namespace RequestServer::ConnectionCache { 122 123extern HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>> g_tcp_connection_cache; 124extern HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::TLSv12>>>>> g_tls_connection_cache; 125 126void request_did_finish(URL const&, Core::Socket const*); 127void dump_jobs(); 128 129constexpr static size_t MaxConcurrentConnectionsPerURL = 4; 130constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 10'000; 131 132template<typename T> 133ErrorOr<void> recreate_socket_if_needed(T& connection, URL const& url) 134{ 135 using SocketType = typename T::SocketType; 136 using SocketStorageType = typename T::StorageType; 137 138 if (!connection.socket->is_open() || connection.socket->is_eof()) { 139 // Create another socket for the connection. 140 auto set_socket = [&](auto socket) -> ErrorOr<void> { 141 connection.socket = TRY(Core::BufferedSocket<SocketStorageType>::create(move(socket))); 142 return {}; 143 }; 144 145 if constexpr (IsSame<TLS::TLSv12, SocketType>) { 146 TLS::Options options; 147 options.set_alert_handler([&connection](TLS::AlertDescription alert) { 148 Core::NetworkJob::Error reason; 149 if (alert == TLS::AlertDescription::HandshakeFailure) 150 reason = Core::NetworkJob::Error::ProtocolFailed; 151 else if (alert == TLS::AlertDescription::DecryptError) 152 reason = Core::NetworkJob::Error::ConnectionFailed; 153 else 154 reason = Core::NetworkJob::Error::TransmissionFailed; 155 156 if (connection.job_data.fail) 157 connection.job_data.fail(reason); 158 }); 159 options.set_certificate_provider([&connection]() -> Vector<TLS::Certificate> { 160 if (connection.job_data.provide_client_certificates) 161 return connection.job_data.provide_client_certificates(); 162 return {}; 163 }); 164 TRY(set_socket(TRY((connection.proxy.template tunnel<SocketType, SocketStorageType>(url, move(options)))))); 165 } else { 166 TRY(set_socket(TRY((connection.proxy.template tunnel<SocketType, SocketStorageType>(url))))); 167 } 168 dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket); 169 } 170 return {}; 171} 172 173decltype(auto) get_or_create_connection(auto& cache, URL const& url, auto& job, Core::ProxyData proxy_data = {}) 174{ 175 using CacheEntryType = RemoveCVReference<decltype(*cache.begin()->value)>; 176 auto& sockets_for_url = *cache.ensure({ url.host(), url.port_or_default(), proxy_data }, [] { return make<CacheEntryType>(); }); 177 178 Proxy proxy { proxy_data }; 179 180 using ReturnType = decltype(sockets_for_url[0].ptr()); 181 auto it = sockets_for_url.find_if([](auto& connection) { return connection->request_queue.is_empty(); }); 182 auto did_add_new_connection = false; 183 auto failed_to_find_a_socket = it.is_end(); 184 if (failed_to_find_a_socket && sockets_for_url.size() < ConnectionCache::MaxConcurrentConnectionsPerURL) { 185 using ConnectionType = RemoveCVReference<decltype(*cache.begin()->value->at(0))>; 186 auto connection_result = proxy.tunnel<typename ConnectionType::SocketType, typename ConnectionType::StorageType>(url); 187 if (connection_result.is_error()) { 188 dbgln("ConnectionCache: Connection to {} failed: {}", url, connection_result.error()); 189 Core::deferred_invoke([&job] { 190 job.fail(Core::NetworkJob::Error::ConnectionFailed); 191 }); 192 return ReturnType { nullptr }; 193 } 194 auto socket_result = Core::BufferedSocket<typename ConnectionType::StorageType>::create(connection_result.release_value()); 195 if (socket_result.is_error()) { 196 dbgln("ConnectionCache: Failed to make a buffered socket for {}: {}", url, socket_result.error()); 197 Core::deferred_invoke([&job] { 198 job.fail(Core::NetworkJob::Error::ConnectionFailed); 199 }); 200 return ReturnType { nullptr }; 201 } 202 sockets_for_url.append(make<ConnectionType>( 203 socket_result.release_value(), 204 typename ConnectionType::QueueType {}, 205 Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr).release_value_but_fixme_should_propagate_errors())); 206 sockets_for_url.last()->proxy = move(proxy); 207 did_add_new_connection = true; 208 } 209 size_t index; 210 if (failed_to_find_a_socket) { 211 if (did_add_new_connection) { 212 index = sockets_for_url.size() - 1; 213 } else { 214 // Find the least backed-up connection (based on how many entries are in their request queue). 215 index = 0; 216 auto min_queue_size = (size_t)-1; 217 for (auto it = sockets_for_url.begin(); it != sockets_for_url.end(); ++it) { 218 if (auto queue_size = (*it)->request_queue.size(); min_queue_size > queue_size) { 219 index = it.index(); 220 min_queue_size = queue_size; 221 } 222 } 223 } 224 } else { 225 index = it.index(); 226 } 227 if (sockets_for_url.is_empty()) { 228 Core::deferred_invoke([&job] { 229 job.fail(Core::NetworkJob::Error::ConnectionFailed); 230 }); 231 return ReturnType { nullptr }; 232 } 233 234 auto& connection = *sockets_for_url[index]; 235 if (!connection.has_started) { 236 if (auto result = recreate_socket_if_needed(connection, url); result.is_error()) { 237 dbgln("ConnectionCache: request failed to start, failed to make a socket: {}", result.error()); 238 Core::deferred_invoke([&job] { 239 job.fail(Core::NetworkJob::Error::ConnectionFailed); 240 }); 241 return ReturnType { nullptr }; 242 } 243 dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket); 244 connection.has_started = true; 245 connection.removal_timer->stop(); 246 connection.timer.start(); 247 connection.current_url = url; 248 connection.job_data = decltype(connection.job_data)::create(job); 249 connection.socket->set_notifications_enabled(true); 250 connection.job_data.start(*connection.socket); 251 } else { 252 dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket); 253 connection.request_queue.append(decltype(connection.job_data)::create(job)); 254 } 255 return &connection; 256} 257 258}