That fuck shit the fascists are using
at master 405 lines 13 kB view raw
1package org.tm.archive.net; 2 3import android.text.TextUtils; 4 5import androidx.annotation.NonNull; 6 7import com.annimon.stream.Stream; 8import com.bumptech.glide.util.ContentLengthInputStream; 9 10import org.signal.core.util.concurrent.SignalExecutors; 11import org.signal.core.util.logging.Log; 12import org.signal.libsignal.protocol.util.Pair; 13import org.tm.archive.util.Util; 14 15import java.io.FilterInputStream; 16import java.io.IOException; 17import java.io.InputStream; 18import java.security.SecureRandom; 19import java.util.ArrayList; 20import java.util.LinkedList; 21import java.util.List; 22import java.util.Optional; 23 24import okhttp3.CacheControl; 25import okhttp3.Call; 26import okhttp3.OkHttpClient; 27import okhttp3.Request; 28import okhttp3.Response; 29 30public class ChunkedDataFetcher { 31 32 private static final String TAG = Log.tag(ChunkedDataFetcher.class); 33 34 private static final CacheControl NO_CACHE = new CacheControl.Builder().noCache().build(); 35 36 private static final long MB = 1024 * 1024; 37 private static final long KB = 1024; 38 39 private final OkHttpClient client; 40 41 public ChunkedDataFetcher(@NonNull OkHttpClient client) { 42 this.client = client; 43 } 44 45 public RequestController fetch(@NonNull String url, long contentLength, @NonNull Callback callback) { 46 if (contentLength <= 0) { 47 return fetchChunksWithUnknownTotalSize(url, callback); 48 } 49 50 CompositeRequestController compositeController = new CompositeRequestController(); 51 fetchChunks(url, contentLength, Optional.empty(), compositeController, callback); 52 return compositeController; 53 } 54 55 private RequestController fetchChunksWithUnknownTotalSize(@NonNull String url, @NonNull Callback callback) { 56 CompositeRequestController compositeController = new CompositeRequestController(); 57 58 long chunkSize = new SecureRandom().nextInt(1024) + 1024; 59 Request request = new Request.Builder() 60 .url(url) 61 .cacheControl(NO_CACHE) 62 .addHeader("Range", "bytes=0-" + (chunkSize - 1)) 63 .addHeader("Accept-Encoding", "identity") 64 .build(); 65 66 Call firstChunkCall = client.newCall(request); 67 compositeController.addController(new CallRequestController(firstChunkCall)); 68 69 firstChunkCall.enqueue(new okhttp3.Callback() { 70 @Override 71 public void onFailure(@NonNull Call call, @NonNull IOException e) { 72 if (!compositeController.isCanceled()) { 73 callback.onFailure(e); 74 compositeController.cancel(); 75 } 76 } 77 78 @Override 79 public void onResponse(@NonNull Call call, @NonNull Response response) { 80 String contentRange = response.header("Content-Range"); 81 82 if (!response.isSuccessful()) { 83 Log.w(TAG, "Non-successful response code: " + response.code()); 84 callback.onFailure(new IOException("Non-successful response code: " + response.code())); 85 compositeController.cancel(); 86 if (response.body() != null) response.body().close(); 87 return; 88 } 89 90 if (TextUtils.isEmpty(contentRange)) { 91 Log.w(TAG, "Missing Content-Range header."); 92 callback.onFailure(new IOException("Missing Content-Length header.")); 93 compositeController.cancel(); 94 if (response.body() != null) response.body().close(); 95 return; 96 } 97 98 if (response.body() == null) { 99 Log.w(TAG, "Missing body."); 100 callback.onFailure(new IOException("Missing body on initial request.")); 101 compositeController.cancel(); 102 return; 103 } 104 105 Optional<Long> contentLength = parseLengthFromContentRange(contentRange); 106 107 if (!contentLength.isPresent()) { 108 Log.w(TAG, "Unable to parse length from Content-Range."); 109 callback.onFailure(new IOException("Unable to get parse length from Content-Range.")); 110 compositeController.cancel(); 111 return; 112 } 113 114 if (chunkSize >= contentLength.get()) { 115 try { 116 callback.onSuccess(response.body().byteStream()); 117 } catch (IOException e) { 118 callback.onFailure(e); 119 compositeController.cancel(); 120 } 121 } else { 122 InputStream stream = ContentLengthInputStream.obtain(response.body().byteStream(), chunkSize); 123 fetchChunks(url, contentLength.get(), Optional.of(new Pair<>(stream, chunkSize)), compositeController, callback); 124 } 125 } 126 }); 127 128 return compositeController; 129 } 130 131 private void fetchChunks(@NonNull String url, 132 long contentLength, 133 Optional<Pair<InputStream, Long>> firstChunk, 134 CompositeRequestController compositeController, 135 Callback callback) 136 { 137 List<ByteRange> requestPattern; 138 try { 139 if (firstChunk.isPresent()) { 140 requestPattern = Stream.of(getRequestPattern(contentLength - firstChunk.get().second())) 141 .map(b -> new ByteRange(b.start + firstChunk.get().second(), 142 b.end + firstChunk.get().second(), 143 b.ignoreFirst)) 144 .toList(); 145 } else { 146 requestPattern = getRequestPattern(contentLength); 147 } 148 } catch (IOException e) { 149 callback.onFailure(e); 150 compositeController.cancel(); 151 return; 152 } 153 154 SignalExecutors.UNBOUNDED.execute(() -> { 155 List<CallRequestController> controllers = Stream.of(requestPattern).map(range -> makeChunkRequest(client, url, range)).toList(); 156 List<InputStream> streams = new ArrayList<>(controllers.size() + (firstChunk.isPresent() ? 1 : 0)); 157 158 if (firstChunk.isPresent()) { 159 streams.add(firstChunk.get().first()); 160 } 161 162 Stream.of(controllers).forEach(compositeController::addController); 163 164 for (CallRequestController controller : controllers) { 165 Optional<InputStream> stream = controller.getStream(); 166 167 if (!stream.isPresent()) { 168 Log.w(TAG, "Stream was canceled."); 169 callback.onFailure(new IOException("Failure")); 170 compositeController.cancel(); 171 return; 172 } 173 174 streams.add(stream.get()); 175 } 176 177 try { 178 callback.onSuccess(new InputStreamList(streams)); 179 } catch (IOException e) { 180 callback.onFailure(e); 181 compositeController.cancel(); 182 } 183 }); 184 } 185 186 private CallRequestController makeChunkRequest(@NonNull OkHttpClient client, @NonNull String url, @NonNull ByteRange range) { 187 Request request = new Request.Builder() 188 .url(url) 189 .cacheControl(NO_CACHE) 190 .addHeader("Range", "bytes=" + range.start + "-" + range.end) 191 .addHeader("Accept-Encoding", "identity") 192 .build(); 193 194 Call call = client.newCall(request); 195 CallRequestController callController = new CallRequestController(call); 196 197 call.enqueue(new okhttp3.Callback() { 198 @Override 199 public void onFailure(@NonNull Call call, @NonNull IOException e) { 200 callController.cancel(); 201 } 202 203 @Override 204 public void onResponse(@NonNull Call call, @NonNull Response response) { 205 if (!response.isSuccessful()) { 206 callController.cancel(); 207 if (response.body() != null) response.body().close(); 208 return; 209 } 210 211 if (response.body() == null) { 212 callController.cancel(); 213 if (response.body() != null) response.body().close(); 214 return; 215 } 216 217 InputStream stream = new SkippingInputStream(ContentLengthInputStream.obtain(response.body().byteStream(), response.body().contentLength()), range.ignoreFirst); 218 callController.setStream(stream); 219 } 220 }); 221 222 return callController; 223 } 224 225 private Optional<Long> parseLengthFromContentRange(@NonNull String contentRange) { 226 int totalStartPos = contentRange.indexOf('/'); 227 228 if (totalStartPos >= 0 && contentRange.length() > totalStartPos + 1) { 229 String totalString = contentRange.substring(totalStartPos + 1); 230 231 try { 232 return Optional.of(Long.parseLong(totalString)); 233 } catch (NumberFormatException e) { 234 return Optional.empty(); 235 } 236 } 237 238 return Optional.empty(); 239 } 240 241 private List<ByteRange> getRequestPattern(long size) throws IOException { 242 if (size > MB) return getRequestPattern(size, MB); 243 else if (size > 500 * KB) return getRequestPattern(size, 500 * KB); 244 else if (size > 100 * KB) return getRequestPattern(size, 100 * KB); 245 else if (size > 50 * KB) return getRequestPattern(size, 50 * KB); 246 else if (size > 10 * KB) return getRequestPattern(size, 10 * KB); 247 else if (size > KB) return getRequestPattern(size, KB); 248 249 throw new IOException("Unsupported size: " + size); 250 } 251 252 private List<ByteRange> getRequestPattern(long size, long increment) { 253 List<ByteRange> results = new LinkedList<>(); 254 255 long offset = 0; 256 257 while (size - offset > increment) { 258 results.add(new ByteRange(offset, offset + increment - 1, 0)); 259 offset += increment; 260 } 261 262 if (size - offset > 0) { 263 results.add(new ByteRange(size - increment, size-1, increment - (size - offset))); 264 } 265 266 return results; 267 } 268 269 private static class ByteRange { 270 private final long start; 271 private final long end; 272 private final long ignoreFirst; 273 274 private ByteRange(long start, long end, long ignoreFirst) { 275 this.start = start; 276 this.end = end; 277 this.ignoreFirst = ignoreFirst; 278 } 279 } 280 281 private static class SkippingInputStream extends FilterInputStream { 282 283 private long skip; 284 285 SkippingInputStream(InputStream in, long skip) { 286 super(in); 287 this.skip = skip; 288 } 289 290 @Override 291 public int read() throws IOException { 292 if (skip != 0) { 293 skipFully(skip); 294 skip = 0; 295 } 296 297 return super.read(); 298 } 299 300 @Override 301 public int read(@NonNull byte[] buffer) throws IOException { 302 if (skip != 0) { 303 skipFully(skip); 304 skip = 0; 305 } 306 307 return super.read(buffer); 308 } 309 310 @Override 311 public int read(@NonNull byte[] buffer, int offset, int length) throws IOException { 312 if (skip != 0) { 313 skipFully(skip); 314 skip = 0; 315 } 316 317 return super.read(buffer, offset, length); 318 } 319 320 @Override 321 public int available() throws IOException { 322 return Util.toIntExact(super.available() - skip); 323 } 324 325 private void skipFully(long amount) throws IOException { 326 byte[] buffer = new byte[4096]; 327 328 while (amount > 0) { 329 int read = super.read(buffer, 0, Math.min(buffer.length, Util.toIntExact(amount))); 330 331 if (read != -1) amount -= read; 332 else return; 333 } 334 } 335 } 336 337 private static class InputStreamList extends InputStream { 338 339 private final List<InputStream> inputStreams; 340 341 private int currentStreamIndex = 0; 342 343 InputStreamList(List<InputStream> inputStreams) { 344 this.inputStreams = inputStreams; 345 } 346 347 @Override 348 public int read() throws IOException { 349 while (currentStreamIndex < inputStreams.size()) { 350 int result = inputStreams.get(currentStreamIndex).read(); 351 352 if (result == -1) currentStreamIndex++; 353 else return result; 354 } 355 356 return -1; 357 } 358 359 @Override 360 public int read(@NonNull byte[] buffer, int offset, int length) throws IOException { 361 while (currentStreamIndex < inputStreams.size()) { 362 int result = inputStreams.get(currentStreamIndex).read(buffer, offset, length); 363 364 if (result == -1) currentStreamIndex++; 365 else return result; 366 } 367 368 return -1; 369 } 370 371 @Override 372 public int read(@NonNull byte[] buffer) throws IOException { 373 return read(buffer, 0, buffer.length); 374 } 375 376 @Override 377 public void close() throws IOException { 378 for (InputStream stream : inputStreams) { 379 try { 380 stream.close(); 381 } catch (IOException ignored) {} 382 } 383 } 384 385 @Override 386 public int available() { 387 int total = 0; 388 389 for (int i=currentStreamIndex;i<inputStreams.size();i++) { 390 try { 391 int available = inputStreams.get(i).available(); 392 393 if (available != -1) total += available; 394 } catch (IOException ignored) {} 395 } 396 397 return total; 398 } 399 } 400 401 public interface Callback { 402 void onSuccess(InputStream stream) throws IOException; 403 void onFailure(Exception e); 404 } 405}