A game about forced loneliness, made by TACStudios
1using System;
2using System.Diagnostics;
3using Unity.Collections.LowLevel.Unsafe;
4using Unity.Burst;
5using Unity.Jobs;
6using UnityEngine.Assertions;
7using System.Runtime.CompilerServices;
8
9namespace Unity.Collections
10{
11 /// <summary>
12 /// A set of untyped, append-only buffers. Allows for concurrent reading and concurrent writing without synchronization.
13 /// </summary>
14 /// <remarks>
15 /// As long as each individual buffer is written in one thread and read in one thread, multiple
16 /// threads can read and write the stream concurrently, *e.g.*
17 /// while thread *A* reads from buffer *X* of a stream, thread *B* can read from
18 /// buffer *Y* of the same stream.
19 ///
20 /// Each buffer is stored as a chain of blocks. When a write exceeds a buffer's current capacity, another block
21 /// is allocated and added to the end of the chain. Effectively, expanding the buffer never requires copying the existing
22 /// data (unlike with <see cref="NativeList{T}"/>, for example).
23 ///
24 /// **All writing to a stream should be completed before the stream is first read. Do not write to a stream after the first read.**
25 /// Violating these rules won't *necessarily* cause any problems, but they are the intended usage pattern.
26 ///
27 /// Writing is done with <see cref="NativeStream.Writer"/>, and reading is done with <see cref="NativeStream.Reader"/>.
28 /// An individual reader or writer cannot be used concurrently across threads: each thread must use its own.
29 ///
30 /// The data written to an individual buffer can be heterogeneous in type, and the data written
31 /// to different buffers of a stream can be entirely different in type, number, and order. Just make sure
32 /// that the code reading from a particular buffer knows what to expect to read from it.
33 /// </remarks>
34 [NativeContainer]
35 [GenerateTestsForBurstCompatibility]
36 public unsafe struct NativeStream : INativeDisposable
37 {
38 UnsafeStream m_Stream;
39
40#if ENABLE_UNITY_COLLECTIONS_CHECKS
41 AtomicSafetyHandle m_Safety;
42 internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<NativeStream>();
43#endif
44
45 /// <summary>
46 /// Initializes and returns an instance of NativeStream.
47 /// </summary>
48 /// <param name="bufferCount">The number of buffers to give the stream. You usually want
49 /// one buffer for each thread that will read or write the stream.</param>
50 /// <param name="allocator">The allocator to use.</param>
51 public NativeStream(int bufferCount, AllocatorManager.AllocatorHandle allocator)
52 {
53 AllocateBlock(out this, allocator);
54 m_Stream.AllocateForEach(bufferCount);
55 }
56
57 /// <summary>
58 /// Creates and schedules a job to allocate a new stream.
59 /// </summary>
60 /// <remarks>The stream can be used on the main thread after completing the returned job or used in other jobs that depend upon the returned job.
61 ///
62 /// Using a job to allocate the buffers can be more efficient, particularly for a stream with many buffers.
63 /// </remarks>
64 /// <typeparam name="T">Ignored.</typeparam>
65 /// <param name="stream">Outputs the new stream.</param>
66 /// <param name="bufferCount">A list whose length determines the number of buffers in the stream.</param>
67 /// <param name="dependency">A job handle. The new job will depend upon this handle.</param>
68 /// <param name="allocator">The allocator to use.</param>
69 /// <returns>The handle of the new job.</returns>
70 [GenerateTestsForBurstCompatibility(GenericTypeArguments = new[] { typeof(int) })]
71 public static JobHandle ScheduleConstruct<T>(out NativeStream stream, NativeList<T> bufferCount, JobHandle dependency, AllocatorManager.AllocatorHandle allocator)
72 where T : unmanaged
73 {
74 AllocateBlock(out stream, allocator);
75 var jobData = new ConstructJobList { List = (UntypedUnsafeList*)bufferCount.GetUnsafeList(), Container = stream };
76 return jobData.Schedule(dependency);
77 }
78
79 /// <summary>
80 /// Creates and schedules a job to allocate a new stream.
81 /// </summary>
82 /// <remarks>The stream can be used...
83 /// - after completing the returned job
84 /// - or in other jobs that depend upon the returned job.
85 ///
86 /// Allocating the buffers in a job can be more efficient, particularly for a stream with many buffers.
87 /// </remarks>
88 /// <param name="stream">Outputs the new stream.</param>
89 /// <param name="bufferCount">An array whose value at index 0 determines the number of buffers in the stream.</param>
90 /// <param name="dependency">A job handle. The new job will depend upon this handle.</param>
91 /// <param name="allocator">The allocator to use.</param>
92 /// <returns>The handle of the new job.</returns>
93 public static JobHandle ScheduleConstruct(out NativeStream stream, NativeArray<int> bufferCount, JobHandle dependency, AllocatorManager.AllocatorHandle allocator)
94 {
95 AllocateBlock(out stream, allocator);
96 var jobData = new ConstructJob { Length = bufferCount, Container = stream };
97 return jobData.Schedule(dependency);
98 }
99
100 /// <summary>
101 /// Returns true if this stream is empty.
102 /// </summary>
103 /// <returns>True if this stream is empty or the stream has not been constructed.</returns>
104 public readonly bool IsEmpty()
105 {
106 CheckRead();
107 return m_Stream.IsEmpty();
108 }
109
110 /// <summary>
111 /// Whether this stream has been allocated (and not yet deallocated).
112 /// </summary>
113 /// <remarks>Does not necessarily reflect whether the buffers of the stream have themselves been allocated.</remarks>
114 /// <value>True if this stream has been allocated (and not yet deallocated).</value>
115 public readonly bool IsCreated
116 {
117 [MethodImpl(MethodImplOptions.AggressiveInlining)]
118 get => m_Stream.IsCreated;
119 }
120
121 /// <summary>
122 /// The number of buffers in this stream.
123 /// </summary>
124 /// <value>The number of buffers in this stream.</value>
125 public readonly int ForEachCount
126 {
127 get
128 {
129 CheckRead();
130 return m_Stream.ForEachCount;
131 }
132 }
133
134 /// <summary>
135 /// Returns a reader of this stream.
136 /// </summary>
137 /// <returns>A reader of this stream.</returns>
138 public Reader AsReader()
139 {
140 return new Reader(ref this);
141 }
142
143 /// <summary>
144 /// Returns a writer of this stream.
145 /// </summary>
146 /// <returns>A writer of this stream.</returns>
147 public Writer AsWriter()
148 {
149 return new Writer(ref this);
150 }
151
152 /// <summary>
153 /// Returns the total number of items in the buffers of this stream.
154 /// </summary>
155 /// <remarks>Each <see cref="Writer.Write{T}"/> and <see cref="Writer.Allocate"/> call increments this number.</remarks>
156 /// <returns>The total number of items in the buffers of this stream.</returns>
157 public int Count()
158 {
159 CheckRead();
160 return m_Stream.Count();
161 }
162
163 /// <summary>
164 /// Returns a new NativeArray copy of this stream's data.
165 /// </summary>
166 /// <remarks>The length of the array will equal the count of this stream.
167 ///
168 /// Each buffer of this stream is copied to the array, one after the other.
169 /// </remarks>
170 /// <typeparam name="T">The type of values in the array.</typeparam>
171 /// <param name="allocator">The allocator to use.</param>
172 /// <returns>A new NativeArray copy of this stream's data.</returns>
173 [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
174 public NativeArray<T> ToNativeArray<T>(AllocatorManager.AllocatorHandle allocator) where T : unmanaged
175 {
176 CheckRead();
177 return m_Stream.ToNativeArray<T>(allocator);
178 }
179
180 /// <summary>
181 /// Releases all resources (memory and safety handles).
182 /// </summary>
183 public void Dispose()
184 {
185#if ENABLE_UNITY_COLLECTIONS_CHECKS
186 if (!AtomicSafetyHandle.IsDefaultValue(m_Safety))
187 {
188 AtomicSafetyHandle.CheckExistsAndThrow(m_Safety);
189 }
190#endif
191 if (!IsCreated)
192 {
193 return;
194 }
195
196#if ENABLE_UNITY_COLLECTIONS_CHECKS
197 CollectionHelper.DisposeSafetyHandle(ref m_Safety);
198#endif
199 m_Stream.Dispose();
200 }
201
202 /// <summary>
203 /// Creates and schedules a job that will release all resources (memory and safety handles) of this stream.
204 /// </summary>
205 /// <param name="inputDeps">A job handle which the newly scheduled job will depend upon.</param>
206 /// <returns>The handle of a new job that will release all resources (memory and safety handles) of this stream.</returns>
207 public JobHandle Dispose(JobHandle inputDeps)
208 {
209#if ENABLE_UNITY_COLLECTIONS_CHECKS
210 if (!AtomicSafetyHandle.IsDefaultValue(m_Safety))
211 {
212 AtomicSafetyHandle.CheckExistsAndThrow(m_Safety);
213 }
214#endif
215 if (!IsCreated)
216 {
217 return inputDeps;
218 }
219
220#if ENABLE_UNITY_COLLECTIONS_CHECKS
221 var jobHandle = new NativeStreamDisposeJob { Data = new NativeStreamDispose { m_StreamData = m_Stream, m_Safety = m_Safety } }.Schedule(inputDeps);
222 AtomicSafetyHandle.Release(m_Safety);
223#else
224 var jobHandle = new NativeStreamDisposeJob { Data = new NativeStreamDispose { m_StreamData = m_Stream } }.Schedule(inputDeps);
225#endif
226 m_Stream = default;
227
228 return jobHandle;
229 }
230
231 [BurstCompile]
232 struct ConstructJobList : IJob
233 {
234 public NativeStream Container;
235
236 [ReadOnly]
237 [NativeDisableUnsafePtrRestriction]
238 public UntypedUnsafeList* List;
239
240 public void Execute()
241 {
242 Container.AllocateForEach(List->m_length);
243 }
244 }
245
246 [BurstCompile]
247 struct ConstructJob : IJob
248 {
249 public NativeStream Container;
250
251 [ReadOnly]
252 public NativeArray<int> Length;
253
254 public void Execute()
255 {
256 Container.AllocateForEach(Length[0]);
257 }
258 }
259
260 static void AllocateBlock(out NativeStream stream, AllocatorManager.AllocatorHandle allocator)
261 {
262 CollectionHelper.CheckAllocator(allocator);
263
264 UnsafeStream.AllocateBlock(out stream.m_Stream, allocator);
265
266#if ENABLE_UNITY_COLLECTIONS_CHECKS
267 stream.m_Safety = CollectionHelper.CreateSafetyHandle(allocator);
268
269 CollectionHelper.SetStaticSafetyId(ref stream.m_Safety, ref s_staticSafetyId.Data, "Unity.Collections.NativeStream");
270#endif
271 }
272
273 void AllocateForEach(int forEachCount)
274 {
275#if ENABLE_UNITY_COLLECTIONS_CHECKS
276 CheckForEachCountGreaterThanZero(forEachCount);
277
278 var blockData = (UnsafeStreamBlockData*)m_Stream.m_BlockData.Range.Pointer;
279 var ranges = (UnsafeStreamRange*)blockData->Ranges.Range.Pointer;
280
281 Assert.IsTrue(ranges == null);
282 Assert.AreEqual(0, blockData->RangeCount);
283 Assert.AreNotEqual(0, blockData->BlockCount);
284#endif
285
286 m_Stream.AllocateForEach(forEachCount);
287 }
288
289 /// <summary>
290 /// Writes data into a buffer of a <see cref="NativeStream"/>.
291 /// </summary>
292 /// <remarks>An individual writer can only be used for one buffer of one stream.
293 /// Do not create more than one writer for an individual buffer.</remarks>
294 [NativeContainer]
295 [NativeContainerSupportsMinMaxWriteRestriction]
296 [GenerateTestsForBurstCompatibility]
297 public unsafe struct Writer
298 {
299 UnsafeStream.Writer m_Writer;
300
301#if ENABLE_UNITY_COLLECTIONS_CHECKS
302 AtomicSafetyHandle m_Safety;
303 internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<Writer>();
304#pragma warning disable CS0414 // warning CS0414: The field 'NativeStream.Writer.m_Length' is assigned but its value is never used
305 int m_Length;
306#pragma warning restore CS0414
307 int m_MinIndex;
308 int m_MaxIndex;
309#endif
310#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
311 [NativeDisableUnsafePtrRestriction]
312 void* m_PassByRefCheck;
313#endif
314
315 internal Writer(ref NativeStream stream)
316 {
317 m_Writer = stream.m_Stream.AsWriter();
318
319#if ENABLE_UNITY_COLLECTIONS_CHECKS
320 m_Safety = stream.m_Safety;
321 CollectionHelper.SetStaticSafetyId(ref m_Safety, ref s_staticSafetyId.Data, "Unity.Collections.NativeStream.Writer");
322 m_Length = int.MaxValue;
323 m_MinIndex = int.MinValue;
324 m_MaxIndex = int.MinValue;
325#endif
326#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
327 m_PassByRefCheck = null;
328#endif
329 }
330
331 /// <summary>
332 /// The number of buffers in the stream of this writer.
333 /// </summary>
334 /// <value>The number of buffers in the stream of this writer.</value>
335 public int ForEachCount
336 {
337 get
338 {
339#if ENABLE_UNITY_COLLECTIONS_CHECKS
340 AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
341#endif
342 return m_Writer.ForEachCount;
343 }
344 }
345
346 /// <summary>
347 /// For internal use only.
348 /// </summary>
349 /// <param name="foreEachIndex"></param>
350 public void PatchMinMaxRange(int foreEachIndex)
351 {
352#if ENABLE_UNITY_COLLECTIONS_CHECKS
353 m_MinIndex = foreEachIndex;
354 m_MaxIndex = foreEachIndex;
355#endif
356 }
357
358 /// <summary>
359 /// Readies this writer to write to a particular buffer of the stream.
360 /// </summary>
361 /// <remarks>Must be called before using this writer. For an individual writer, call this method only once.
362 ///
363 /// After calling BeginForEachIndex on this writer, passing this writer into functions must be passed by reference.
364 ///
365 /// When done using this writer, you must call <see cref="EndForEachIndex"/>.</remarks>
366 /// <param name="foreachIndex">The index of the buffer to write.</param>
367 public void BeginForEachIndex(int foreachIndex)
368 {
369 CheckBeginForEachIndex(foreachIndex);
370 m_Writer.BeginForEachIndex(foreachIndex);
371 }
372
373 /// <summary>
374 /// Readies the buffer written by this writer for reading.
375 /// </summary>
376 /// <remarks>Must be called before reading the buffer written by this writer.</remarks>
377 public void EndForEachIndex()
378 {
379 CheckEndForEachIndex();
380 m_Writer.EndForEachIndex();
381
382#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
383 m_Writer.m_ForeachIndex = int.MinValue;
384#endif
385 }
386
387 /// <summary>
388 /// Write a value to a buffer.
389 /// </summary>
390 /// <remarks>The value is written to the buffer which was specified
391 /// with <see cref="BeginForEachIndex"/>.
392 /// </remarks>
393 /// <typeparam name="T">The type of value to write.</typeparam>
394 /// <param name="value">The value to write.</param>
395 /// <exception cref="ArgumentException">Thrown if BeginForEachIndex was not called.</exception>
396 /// <exception cref="ArgumentException">Thrown when the NativeStream.Writer instance has been passed by value instead of by reference.</exception>
397 [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
398 public void Write<T>(T value) where T : unmanaged
399 {
400 ref T dst = ref Allocate<T>();
401 dst = value;
402 }
403
404 /// <summary>
405 /// Allocate space in a buffer.
406 /// </summary>
407 /// <remarks>The space is allocated in the buffer which was specified
408 /// with <see cref="BeginForEachIndex"/>.
409 /// </remarks>
410 /// <typeparam name="T">The type of value to allocate space for.</typeparam>
411 /// <returns>A reference to the allocation.</returns>
412 /// <exception cref="ArgumentException">Thrown if BeginForEachIndex was not called.</exception>
413 /// <exception cref="ArgumentException">Thrown when the NativeStream.Writer instance has been passed by value instead of by reference.</exception>
414 [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
415 public ref T Allocate<T>() where T : unmanaged
416 {
417#if ENABLE_UNITY_COLLECTIONS_CHECKS
418 if (UnsafeUtility.IsNativeContainerType<T>())
419 AtomicSafetyHandle.SetNestedContainer(m_Safety, true);
420#endif
421 int size = UnsafeUtility.SizeOf<T>();
422 return ref UnsafeUtility.AsRef<T>(Allocate(size));
423 }
424
425 /// <summary>
426 /// Allocate space in a buffer.
427 /// </summary>
428 /// <remarks>The space is allocated in the buffer which was specified
429 /// with <see cref="BeginForEachIndex"/>.</remarks>
430 /// <param name="size">The number of bytes to allocate.</param>
431 /// <returns>The allocation.</returns>
432 /// <exception cref="ArgumentException">Thrown if BeginForEachIndex was not called.</exception>
433 /// <exception cref="ArgumentException">Thrown when the NativeStream.Writer instance has been passed by value instead of by reference.</exception>
434 public byte* Allocate(int size)
435 {
436 CheckAllocateSize(size);
437 return m_Writer.Allocate(size);
438 }
439
440 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
441 void CheckBeginForEachIndex(int foreachIndex)
442 {
443#if ENABLE_UNITY_COLLECTIONS_CHECKS
444 AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
445#endif
446#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
447 if (m_PassByRefCheck == null)
448 {
449 m_PassByRefCheck = UnsafeUtility.AddressOf(ref this);
450 }
451 var blockData = (UnsafeStreamBlockData*)m_Writer.m_BlockData.Range.Pointer;
452 var ranges = (UnsafeStreamRange*)blockData->Ranges.Range.Pointer;
453
454#if ENABLE_UNITY_COLLECTIONS_CHECKS
455 if (foreachIndex < m_MinIndex || foreachIndex > m_MaxIndex)
456 {
457 // When the code is not running through the job system no ParallelForRange patching will occur
458 // We can't grab m_BlockStream->RangeCount on creation of the writer because the RangeCount can be initialized
459 // in a job after creation of the writer
460 if (m_MinIndex == int.MinValue && m_MaxIndex == int.MinValue)
461 {
462 m_MinIndex = 0;
463
464 m_MaxIndex = blockData->RangeCount - 1;
465 }
466
467 if (foreachIndex < m_MinIndex || foreachIndex > m_MaxIndex)
468 {
469 throw new ArgumentException($"Index {foreachIndex} is out of restricted IJobParallelFor range [{m_MinIndex}...{m_MaxIndex}] in NativeStream.");
470 }
471 }
472#endif
473
474 if (m_Writer.m_ForeachIndex != int.MinValue)
475 {
476 throw new ArgumentException($"BeginForEachIndex must always be balanced by a EndForEachIndex call");
477 }
478
479 if (0 != ranges[foreachIndex].ElementCount)
480 {
481 throw new ArgumentException($"BeginForEachIndex can only be called once for the same index ({foreachIndex}).");
482 }
483
484 Assert.IsTrue(foreachIndex >= 0 && foreachIndex < blockData->RangeCount);
485#endif
486 }
487
488 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
489 void CheckEndForEachIndex()
490 {
491#if ENABLE_UNITY_COLLECTIONS_CHECKS
492 AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
493#endif
494#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
495 if (m_Writer.m_ForeachIndex == int.MinValue)
496 {
497 throw new System.ArgumentException("EndForEachIndex must always be called balanced by a BeginForEachIndex or AppendForEachIndex call");
498 }
499#endif
500 }
501
502 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
503 void CheckAllocateSize(int size)
504 {
505#if ENABLE_UNITY_COLLECTIONS_CHECKS
506 AtomicSafetyHandle.CheckWriteAndThrow(m_Safety);
507#endif
508#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
509 if (m_PassByRefCheck != UnsafeUtility.AddressOf(ref this)
510 || m_Writer.m_ForeachIndex == int.MinValue)
511 {
512 throw new ArgumentException("BeginForEachIndex has not been called on NativeStream.Writer, or NativeStream.Writer is not passed by reference.");
513 }
514
515 if (size > UnsafeStreamBlockData.AllocationSize - sizeof(void*))
516 {
517 throw new ArgumentException("Allocation size is too large");
518 }
519#endif
520 }
521 }
522
523 /// <summary>
524 /// Reads data from a buffer of a <see cref="NativeStream"/>.
525 /// </summary>
526 /// <remarks>An individual reader can only be used for one buffer of one stream.
527 /// Do not create more than one reader for an individual buffer.</remarks>
528 [NativeContainer]
529 [NativeContainerIsReadOnly]
530 [GenerateTestsForBurstCompatibility]
531 public unsafe struct Reader
532 {
533 UnsafeStream.Reader m_Reader;
534
535#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
536 int m_RemainingBlocks;
537#endif
538#if ENABLE_UNITY_COLLECTIONS_CHECKS
539 internal AtomicSafetyHandle m_Safety;
540 internal static readonly SharedStatic<int> s_staticSafetyId = SharedStatic<int>.GetOrCreate<Reader>();
541#endif
542
543 internal Reader(ref NativeStream stream)
544 {
545 m_Reader = stream.m_Stream.AsReader();
546
547#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
548 m_RemainingBlocks = 0;
549#endif
550#if ENABLE_UNITY_COLLECTIONS_CHECKS
551 m_Safety = stream.m_Safety;
552 CollectionHelper.SetStaticSafetyId(ref m_Safety, ref s_staticSafetyId.Data, "Unity.Collections.NativeStream.Reader");
553#endif
554 }
555
556 /// <summary>
557 /// Readies this reader to read a particular buffer of the stream.
558 /// </summary>
559 /// <remarks>Must be called before using this reader. For an individual reader, call this method only once.
560 ///
561 /// When done using this reader, you must call <see cref="EndForEachIndex"/>.</remarks>
562 /// <param name="foreachIndex">The index of the buffer to read.</param>
563 /// <returns>The number of elements left to read from the buffer.</returns>
564 public int BeginForEachIndex(int foreachIndex)
565 {
566 CheckBeginForEachIndex(foreachIndex);
567
568 var remainingItemCount = m_Reader.BeginForEachIndex(foreachIndex);
569
570#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
571 var blockData = (UnsafeStreamBlockData*)m_Reader.m_BlockData.Range.Pointer;
572 var ranges = (UnsafeStreamRange*)blockData->Ranges.Range.Pointer;
573
574 m_RemainingBlocks = ranges[foreachIndex].NumberOfBlocks;
575 if (m_RemainingBlocks == 0)
576 {
577 m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + m_Reader.m_LastBlockSize;
578 }
579#endif
580
581 return remainingItemCount;
582 }
583
584 /// <summary>
585 /// Checks if all data has been read from the buffer.
586 /// </summary>
587 /// <remarks>If you intentionally don't want to read *all* the data in the buffer, don't call this method.
588 /// Otherwise, calling this method is recommended, even though it's not strictly necessary.</remarks>
589 /// <exception cref="ArgumentException">Thrown if not all the buffer's data has been read.</exception>
590 public void EndForEachIndex()
591 {
592 m_Reader.EndForEachIndex();
593 CheckEndForEachIndex();
594 }
595
596 /// <summary>
597 /// The number of buffers in the stream of this reader.
598 /// </summary>
599 /// <value>The number of buffers in the stream of this reader.</value>
600 public int ForEachCount
601 {
602 get
603 {
604 CheckRead();
605 return m_Reader.ForEachCount;
606 }
607 }
608
609 /// <summary>
610 /// The number of items not yet read from the buffer.
611 /// </summary>
612 /// <value>The number of items not yet read from the buffer.</value>
613 public int RemainingItemCount => m_Reader.RemainingItemCount;
614
615 /// <summary>
616 /// Returns a pointer to the next position to read from the buffer. Advances the reader some number of bytes.
617 /// </summary>
618 /// <param name="size">The number of bytes to advance the reader.</param>
619 /// <returns>A pointer to the next position to read from the buffer.</returns>
620 /// <exception cref="ArgumentException">Thrown if the reader would advance past the end of the buffer.</exception>
621 public byte* ReadUnsafePtr(int size)
622 {
623 CheckReadSize(size);
624
625 m_Reader.m_RemainingItemCount--;
626
627 byte* ptr = m_Reader.m_CurrentPtr;
628 m_Reader.m_CurrentPtr += size;
629
630 if (m_Reader.m_CurrentPtr > m_Reader.m_CurrentBlockEnd)
631 {
632 /*
633 * On netfw/mono/il2cpp, doing m_CurrentBlock->Data does not throw, because it knows that it can
634 * just do pointer + 8. On netcore, doing that throws a NullReferenceException. So, first check for
635 * out of bounds accesses, and only then update m_CurrentBlock and m_CurrentPtr.
636 */
637#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
638 m_RemainingBlocks--;
639
640 CheckNotReadingOutOfBounds(size);
641#endif
642 m_Reader.m_CurrentBlock = m_Reader.m_CurrentBlock->Next;
643 m_Reader.m_CurrentPtr = m_Reader.m_CurrentBlock->Data;
644
645#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
646 if (m_RemainingBlocks <= 0)
647 {
648 m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + m_Reader.m_LastBlockSize;
649 }
650 else
651 {
652 m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;
653 }
654#else
655 m_Reader.m_CurrentBlockEnd = (byte*)m_Reader.m_CurrentBlock + UnsafeStreamBlockData.AllocationSize;
656#endif
657 ptr = m_Reader.m_CurrentPtr;
658 m_Reader.m_CurrentPtr += size;
659 }
660
661 return ptr;
662 }
663
664 /// <summary>
665 /// Reads the next value from the buffer.
666 /// </summary>
667 /// <remarks>Each read advances the reader to the next item in the buffer.</remarks>
668 /// <typeparam name="T">The type of value to read.</typeparam>
669 /// <returns>A reference to the next value from the buffer.</returns>
670 /// <exception cref="ArgumentException">Thrown if the reader would advance past the end of the buffer.</exception>
671 [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
672 public ref T Read<T>() where T : unmanaged
673 {
674 int size = UnsafeUtility.SizeOf<T>();
675 return ref UnsafeUtility.AsRef<T>(ReadUnsafePtr(size));
676 }
677
678 /// <summary>
679 /// Reads the next value from the buffer. Does not advance the reader.
680 /// </summary>
681 /// <typeparam name="T">The type of value to read.</typeparam>
682 /// <returns>A reference to the next value from the buffer.</returns>
683 /// <exception cref="ArgumentException">Thrown if the read would go past the end of the buffer.</exception>
684 [GenerateTestsForBurstCompatibility(GenericTypeArguments = new [] { typeof(int) })]
685 public ref T Peek<T>() where T : unmanaged
686 {
687 int size = UnsafeUtility.SizeOf<T>();
688 CheckReadSize(size);
689
690 return ref m_Reader.Peek<T>();
691 }
692
693 /// <summary>
694 /// Returns the total number of items in the buffers of the stream.
695 /// </summary>
696 /// <returns>The total number of items in the buffers of the stream.</returns>
697 public int Count()
698 {
699 CheckRead();
700 return m_Reader.Count();
701 }
702
703 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
704 void CheckNotReadingOutOfBounds(int size)
705 {
706#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
707 if (m_RemainingBlocks < 0)
708 throw new System.ArgumentException("Reading out of bounds");
709
710 if (m_RemainingBlocks == 0 && size + sizeof(void*) > m_Reader.m_LastBlockSize)
711 throw new System.ArgumentException("Reading out of bounds");
712#endif
713 }
714
715 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
716 void CheckRead()
717 {
718#if ENABLE_UNITY_COLLECTIONS_CHECKS
719 AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
720#endif
721 }
722
723 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
724 void CheckReadSize(int size)
725 {
726#if ENABLE_UNITY_COLLECTIONS_CHECKS
727 AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
728#endif
729#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
730 Assert.IsTrue(size <= UnsafeStreamBlockData.AllocationSize - (sizeof(void*)));
731 if (m_Reader.m_RemainingItemCount < 1)
732 {
733 throw new ArgumentException("There are no more items left to be read.");
734 }
735#endif
736 }
737
738 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
739 void CheckBeginForEachIndex(int forEachIndex)
740 {
741#if ENABLE_UNITY_COLLECTIONS_CHECKS
742 AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
743#endif
744#if ENABLE_UNITY_COLLECTIONS_CHECKS || UNITY_DOTS_DEBUG
745 var blockData = (UnsafeStreamBlockData*)m_Reader.m_BlockData.Range.Pointer;
746
747 if ((uint)forEachIndex >= (uint)blockData->RangeCount)
748 {
749 throw new System.ArgumentOutOfRangeException(nameof(forEachIndex), $"foreachIndex: {forEachIndex} must be between 0 and ForEachCount: {blockData->RangeCount}");
750 }
751#endif
752 }
753
754 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
755 void CheckEndForEachIndex()
756 {
757 if (m_Reader.m_RemainingItemCount != 0)
758 {
759 throw new System.ArgumentException("Not all elements (Count) have been read. If this is intentional, simply skip calling EndForEachIndex();");
760 }
761
762 if (m_Reader.m_CurrentBlockEnd != m_Reader.m_CurrentPtr)
763 {
764 throw new System.ArgumentException("Not all data (Data Size) has been read. If this is intentional, simply skip calling EndForEachIndex();");
765 }
766 }
767 }
768
769 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS"), Conditional("UNITY_DOTS_DEBUG")]
770 static void CheckForEachCountGreaterThanZero(int forEachCount)
771 {
772 if (forEachCount <= 0)
773 throw new ArgumentException("foreachCount must be > 0", "foreachCount");
774 }
775
776 [Conditional("ENABLE_UNITY_COLLECTIONS_CHECKS")]
777 readonly void CheckRead()
778 {
779#if ENABLE_UNITY_COLLECTIONS_CHECKS
780 AtomicSafetyHandle.CheckReadAndThrow(m_Safety);
781#endif
782 }
783 }
784
785 [NativeContainer]
786 [GenerateTestsForBurstCompatibility]
787 internal unsafe struct NativeStreamDispose
788 {
789 public UnsafeStream m_StreamData;
790
791#if ENABLE_UNITY_COLLECTIONS_CHECKS
792 internal AtomicSafetyHandle m_Safety;
793#endif
794
795 public void Dispose()
796 {
797 m_StreamData.Dispose();
798 }
799 }
800
801 [BurstCompile]
802 struct NativeStreamDisposeJob : IJob
803 {
804 public NativeStreamDispose Data;
805
806 public void Execute()
807 {
808 Data.Dispose();
809 }
810 }
811}