this repo has no description
at fixPythonPipStalling 309 lines 9.6 kB view raw
1/* Copyright: � Copyright 2005 Apple Computer, Inc. All rights reserved. 2 3 Disclaimer: IMPORTANT: This Apple software is supplied to you by Apple Computer, Inc. 4 ("Apple") in consideration of your agreement to the following terms, and your 5 use, installation, modification or redistribution of this Apple software 6 constitutes acceptance of these terms. If you do not agree with these terms, 7 please do not use, install, modify or redistribute this Apple software. 8 9 In consideration of your agreement to abide by the following terms, and subject 10 to these terms, Apple grants you a personal, non-exclusive license, under Apple�s 11 copyrights in this original Apple software (the "Apple Software"), to use, 12 reproduce, modify and redistribute the Apple Software, with or without 13 modifications, in source and/or binary forms; provided that if you redistribute 14 the Apple Software in its entirety and without modifications, you must retain 15 this notice and the following text and disclaimers in all such redistributions of 16 the Apple Software. Neither the name, trademarks, service marks or logos of 17 Apple Computer, Inc. may be used to endorse or promote products derived from the 18 Apple Software without specific prior written permission from Apple. Except as 19 expressly stated in this notice, no other rights or licenses, express or implied, 20 are granted by Apple herein, including but not limited to any patent rights that 21 may be infringed by your derivative works or by other works in which the Apple 22 Software may be incorporated. 23 24 The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO 25 WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED 26 WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR 27 PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN 28 COMBINATION WITH YOUR PRODUCTS. 29 30 IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR 31 CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 32 GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 33 ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR DISTRIBUTION 34 OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF CONTRACT, TORT 35 (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF APPLE HAS BEEN 36 ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 37*/ 38/*============================================================================= 39 CABufferQueue.cpp 40 41=============================================================================*/ 42 43#include "CABufferQueue.h" 44 45#if TARGET_OS_WIN32 46 #include "CAWindows.h" 47#endif 48 49// ____________________________________________________________________________ 50 51CABufferQueue::WorkThread *CABufferQueue::sWorkThread = NULL; 52 53CABufferQueue::WorkThread::WorkThread() : 54 CAPThread(ThreadEntry, this, CAPThread::kMaxThreadPriority, true), 55 mStopped(false), 56 mRunGuard("CABufferQueue::mRunGuard") 57{ 58 // prime the container to have some elements so we're not calling malloc dynamically 59 Buffer *b = NULL; 60 for (int i = 0; i < 64; ++i) 61 mWorkQueue.push_back(b); 62 mWorkQueue.clear(); 63 Start(); 64} 65 66void CABufferQueue::WorkThread::Run() 67{ 68 while (!mStopped) { 69 CAGuard::Locker lock(mRunGuard); 70 mRunGuard.Wait(); 71 72 while (!mStopped) { 73 Buffer *b, *next; 74 75 // add buffers from the other thread 76 TAtomicStack<Buffer> reversed; 77 78 b = mBuffersToAdd.pop_all(); // these are in reverse order 79 while (b != NULL) { 80 next = b->get_next(); 81 reversed.push_NA(b); 82 b = next; 83 } 84 while ((b = reversed.pop_NA()) != NULL) 85 mWorkQueue.push_back(b); 86 87 if (mWorkQueue.empty()) 88 break; 89 b = mWorkQueue.front(); 90 mWorkQueue.pop_front(); 91 92 b->Queue()->ProcessBuffer(b); 93 b->SetInProgress(false); 94 } 95 } 96} 97 98void CABufferQueue::WorkThread::Stop() 99{ 100 mStopped = true; 101 mRunGuard.Notify(); 102} 103 104void CABufferQueue::WorkThread::AddBuffer(Buffer *b) 105{ 106 b->SetInProgress(true); 107 mBuffersToAdd.push_atomic(b); 108 mRunGuard.Notify(); 109} 110 111void CABufferQueue::WorkThread::RemoveBuffers(CABufferQueue *owner) 112{ 113 CAGuard::Locker lock(mRunGuard); 114 for (WorkQueue::iterator it = mWorkQueue.begin(); it != mWorkQueue.end(); ) { 115 if ((*it)->Queue() == owner) { 116 WorkQueue::iterator next = it; ++next; 117 mWorkQueue.erase(it); 118 it = next; 119 } else 120 ++it; 121 } 122} 123 124// ____________________________________________________________________________ 125 126CABufferQueue::Buffer::Buffer(CABufferQueue *queue, const CAStreamBasicDescription &fmt, UInt32 nBytes) : 127 mQueue(queue) 128{ 129 mMemory = CABufferList::New("", fmt); 130 mMemory->AllocateBuffers(nBytes); 131 mByteSize = nBytes; 132 mInProgress = false; 133 mStartFrame = mEndFrame = 0; 134 mEndOfStream = false; 135} 136 137// return true if buffer emptied AND we're not at end-of-stream 138bool CABufferQueue::Buffer::CopyInto(AudioBufferList *destBufferList, int bytesPerFrame, UInt32 &framesProduced, UInt32 &framesRequired) 139{ 140 UInt32 framesInBuffer = mEndFrame - mStartFrame; 141 UInt32 framesToCopy = std::min(framesInBuffer, framesRequired); 142 if (framesToCopy > 0) { 143 const CABufferList *bufMemory = mMemory; 144 const AudioBufferList &srcBufferList = bufMemory->GetBufferList(); 145 const AudioBuffer *srcbuf = srcBufferList.mBuffers; 146 AudioBuffer *dstbuf = destBufferList->mBuffers; 147 for (int i = destBufferList->mNumberBuffers; --i >= 0; ++srcbuf, ++dstbuf) { 148 memcpy( 149 (Byte *)dstbuf->mData + framesProduced * bytesPerFrame, 150 (Byte *)srcbuf->mData + mStartFrame * bytesPerFrame, 151 framesToCopy * bytesPerFrame); 152 } 153 framesProduced += framesToCopy; 154 framesRequired -= framesToCopy; 155 mStartFrame += framesToCopy; 156 } 157 return (framesToCopy == framesInBuffer) && !mEndOfStream; 158} 159 160// return true if buffer filled 161bool CABufferQueue::Buffer::CopyFrom(const AudioBufferList *srcBufferList, int bytesPerFrame, UInt32 &framesProduced, UInt32 &framesRequired) 162{ 163 UInt32 framesInBuffer = mEndFrame - mStartFrame; 164 UInt32 freeFramesInBuffer = (mByteSize / bytesPerFrame) - framesInBuffer; 165 UInt32 framesToCopy = std::min(freeFramesInBuffer, framesRequired); 166 if (framesToCopy > 0) { 167 const AudioBuffer *srcbuf = srcBufferList->mBuffers; 168 const CABufferList *bufMemory = mMemory; 169 const AudioBufferList &destBufferList = bufMemory->GetBufferList(); 170 const AudioBuffer *dstbuf = destBufferList.mBuffers; 171 for (int i = srcBufferList->mNumberBuffers; --i >= 0; ++srcbuf, ++dstbuf) { 172 memcpy( 173 (Byte *)dstbuf->mData + framesInBuffer * bytesPerFrame, 174 (Byte *)srcbuf->mData + framesProduced * bytesPerFrame, 175 framesToCopy * bytesPerFrame); 176 } 177 framesProduced += framesToCopy; 178 framesRequired -= framesToCopy; 179 mEndFrame += framesToCopy; 180 } 181 return (framesToCopy == freeFramesInBuffer); 182} 183 184// ____________________________________________________________________________ 185 186CABufferQueue::CABufferQueue(int nBuffers, UInt32 bufferSizeFrames) : 187 mNumberBuffers(nBuffers), 188 mBuffers(NULL), 189 mBufferSizeFrames(bufferSizeFrames), 190 mBufferList(NULL) 191{ 192 mCurrentBuffer = 0; 193 mErrorCount = 0; 194 195 if (sWorkThread == NULL) 196 sWorkThread = new WorkThread(); 197 mWorkThread = sWorkThread; // for now 198} 199 200CABufferQueue::~CABufferQueue() 201{ 202 CancelAndDisposeBuffers(); 203} 204 205void CABufferQueue::CancelBuffers() 206{ 207 mWorkThread->RemoveBuffers(this); 208} 209 210void CABufferQueue::CancelAndDisposeBuffers() 211{ 212 CancelBuffers(); 213 if (mBuffers) { 214 for (int i = 0; i < mNumberBuffers; ++i) 215 delete mBuffers[i]; 216 delete[] mBuffers; mBuffers = NULL; 217 } 218 delete mBufferList; mBufferList = NULL; 219} 220 221void CABufferQueue::SetFormat(const CAStreamBasicDescription &fmt) 222{ 223 CancelAndDisposeBuffers(); 224 225 mBytesPerFrame = fmt.mBytesPerFrame; 226 mBuffers = new Buffer*[mNumberBuffers]; 227 for (int i = 0; i < mNumberBuffers; ++i) 228 mBuffers[i] = CreateBuffer(fmt, mBufferSizeFrames * mBytesPerFrame); 229 mBufferList = CABufferList::New("", fmt); 230} 231 232// ____________________________________________________________________________ 233 234void CAPushBufferQueue::PushBuffer(UInt32 inNumberFrames, const AudioBufferList *inBufferList) 235{ 236 UInt32 framesRequired = inNumberFrames; 237 UInt32 framesProduced = 0; 238 239 do { 240 Buffer *b = mBuffers[mCurrentBuffer]; 241 242 if (b->InProgress()) { 243 ++mErrorCount; 244 break; 245 } 246 247 if (b->CopyFrom(inBufferList, mBytesPerFrame, framesProduced, framesRequired)) { 248 // buffer was filled, we're done with it 249 sWorkThread->AddBuffer(b); 250 if (++mCurrentBuffer == mNumberBuffers) 251 mCurrentBuffer = 0; 252 } 253 } while (framesRequired > 0); 254} 255 256void CAPushBufferQueue::Flush() 257{ 258 if (mBuffers != NULL) { 259 Buffer *b = mBuffers[mCurrentBuffer]; 260 if (b->FrameCount() > 0 && !b->InProgress()) 261 ProcessBuffer(b); 262 } 263} 264 265// ____________________________________________________________________________ 266 267void CAPullBufferQueue::PullBuffer(UInt32 &ioFrames, AudioBufferList *outBufferList) 268{ 269 if (mEndOfStream) { 270 ioFrames = 0; 271 return; 272 } 273 UInt32 framesRequired = ioFrames; 274 UInt32 framesProduced = 0; 275 276 do { 277 Buffer *b = mBuffers[mCurrentBuffer]; 278 279 if (b->InProgress()) { 280 ++mErrorCount; 281 break; 282 } 283 284 if (b->CopyInto(outBufferList, mBytesPerFrame, framesProduced, framesRequired)) { 285 // buffer emptied 286 sWorkThread->AddBuffer(b); 287 288 if (++mCurrentBuffer == mNumberBuffers) 289 mCurrentBuffer = 0; 290 } 291 else if (b->ReachedEndOfStream()) { 292 mEndOfStream = true; 293 break; 294 } 295 } while (framesRequired > 0); 296 ioFrames = framesProduced; 297} 298 299void CAPullBufferQueue::Prime() 300{ 301 mEndOfStream = false; 302 for (int i = 0; i < mNumberBuffers; ++i) { 303 Buffer *b = mBuffers[i]; 304 ProcessBuffer(b); 305 b->SetInProgress(false); 306 } 307 mCurrentBuffer = 0; 308} 309