15#define WIN32_LEAN_AND_MEAN
21typedef unsigned long long btU64;
22static const int kCacheLineSize = 64;
31struct WorkerThreadStatus
47 char m_threadDirs[kMaxThreadCount];
57 WorkerThreadDirectives()
59 for (
int i = 0; i < kMaxThreadCount; ++i)
65 Type getDirective(
int threadId)
67 btAssert(threadId < kMaxThreadCount);
68 return static_cast<Type
>(m_threadDirs[threadId]);
71 void setDirectiveByRange(
int threadBegin,
int threadEnd, Type dir)
74 btAssert(threadEnd <= kMaxThreadCount);
75 char dirChar =
static_cast<char>(dir);
76 for (
int i = threadBegin; i < threadEnd; ++i)
78 m_threadDirs[i] = dirChar;
89 WorkerThreadStatus::Type m_status;
90 int m_numJobsFinished;
93 WorkerThreadDirectives* m_directive;
96 unsigned int m_cooldownTime;
101 virtual void executeJob(
int threadId) = 0;
104class ParallelForJob :
public IJob
122 m_body->
forLoop(m_begin, m_end);
126class ParallelSumJob :
public IJob
129 ThreadLocalStorage* m_threadLocalStoreArray;
134 ParallelSumJob(
int iBegin,
int iEnd,
const btIParallelSumBody& body, ThreadLocalStorage* tls)
137 m_threadLocalStoreArray = tls;
147#if BT_PARALLEL_SUM_DETERMINISTISM
149 const float TRUNC_SCALE = float(1 << 19);
150 val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE;
152 m_threadLocalStoreArray[threadId].m_sumResult += val;
172 char m_cachePadding[kCacheLineSize];
183 void resizeJobMem(
int newSize)
185 if (newSize > m_jobMemSize)
188 m_jobMem =
static_cast<char*
>(
btAlignedAlloc(newSize, kCacheLineSize));
189 m_jobMemSize = newSize;
198 m_threadSupport = NULL;
202 m_useSpinMutex =
false;
211 if (m_queueLock && m_threadSupport)
221 m_threadSupport = threadSup;
226 setupJobStealing(contextArray, contextArray->
size());
232 for (
int i = 0; i < contexts.
size(); ++i)
234 if (
this == &contexts[i])
240 int numNeighbors =
btMin(2, contexts.
size() - 1);
241 int neighborOffsets[] = {-1, 1, -2, 2, -3, 3};
242 int numOffsets =
sizeof(neighborOffsets) /
sizeof(neighborOffsets[0]);
243 m_neighborContexts.
reserve(numNeighbors);
245 for (
int i = 0; i < numOffsets && m_neighborContexts.
size() < numNeighbors; i++)
247 int neighborIndex = selfIndex + neighborOffsets[i];
248 if (neighborIndex >= 0 && neighborIndex < numActiveContexts)
250 m_neighborContexts.
push_back(&contexts[neighborIndex]);
255 bool isQueueEmpty()
const {
return m_queueIsEmpty; }
278 void clearQueue(
int jobCount,
int jobSize)
284 m_queueIsEmpty =
true;
285 int jobBufSize = jobSize * jobCount;
287 if (jobBufSize > m_jobMemSize)
289 resizeJobMem(jobBufSize);
292 if (jobCount > m_jobQueue.
capacity())
299 void* allocJobMem(
int jobSize)
301 btAssert(m_jobMemSize >= (m_allocSize + jobSize));
302 void* jobMem = &m_jobMem[m_allocSize];
303 m_allocSize += jobSize;
306 void submitJob(IJob * job)
308 btAssert(
reinterpret_cast<char*
>(job) >= &m_jobMem[0] &&
reinterpret_cast<char*
>(job) < &m_jobMem[0] + m_allocSize);
312 m_queueIsEmpty =
false;
315 IJob* consumeJobFromOwnQueue()
326 job = m_jobQueue[m_headIndex++];
327 btAssert(
reinterpret_cast<char*
>(job) >= &m_jobMem[0] &&
reinterpret_cast<char*
>(job) < &m_jobMem[0] + m_allocSize);
328 if (m_headIndex == m_tailIndex)
330 m_queueIsEmpty =
true;
338 if (IJob* job = consumeJobFromOwnQueue())
343 for (
int i = 0; i < m_neighborContexts.
size(); ++i)
345 JobQueue* otherContext = m_neighborContexts[i];
346 if (IJob* job = otherContext->consumeJobFromOwnQueue())
355static void WorkerThreadFunc(
void* userPtr)
358 ThreadLocalStorage* localStorage = (ThreadLocalStorage*)userPtr;
359 JobQueue* jobQueue = localStorage->m_queue;
361 bool shouldSleep =
false;
362 int threadId = localStorage->m_threadId;
366 localStorage->m_mutex.lock();
367 while (IJob* job = jobQueue->consumeJob())
369 localStorage->m_status = WorkerThreadStatus::kWorking;
370 job->executeJob(threadId);
371 localStorage->m_numJobsFinished++;
373 localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
374 localStorage->m_mutex.unlock();
375 btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
377 while (jobQueue->isQueueEmpty())
381 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep)
387 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs)
389 clockStart = localStorage->m_clock->getTimeMicroseconds();
393 for (
int i = 0; i < 50; ++i)
399 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
405 btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
406 if (timeElapsed > localStorage->m_cooldownTime)
417 localStorage->m_mutex.lock();
418 localStorage->m_status = WorkerThreadStatus::kSleeping;
419 localStorage->m_mutex.unlock();
426 WorkerThreadDirectives* m_workerDirective;
433 int m_numWorkerThreads;
434 int m_numActiveJobQueues;
437 static const int kFirstWorkerThreadId = 1;
442 m_threadSupport = NULL;
443 m_workerDirective = NULL;
446 virtual ~btTaskSchedulerDefault()
448 waitForWorkersToSleep();
450 for (
int i = 0; i < m_jobQueues.
size(); ++i)
452 m_jobQueues[i].exit();
457 delete m_threadSupport;
458 m_threadSupport = NULL;
460 if (m_workerDirective)
463 m_workerDirective = NULL;
471 m_workerDirective =
static_cast<WorkerThreadDirectives*
>(
btAlignedAlloc(
sizeof(*m_workerDirective), 64));
475 m_numThreads = m_maxNumThreads;
478 int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads - 1) : (m_maxNumThreads / numThreadsPerQueue);
479 m_jobQueues.
resize(numJobQueues);
480 m_numActiveJobQueues = numJobQueues;
481 for (
int i = 0; i < m_jobQueues.
size(); ++i)
483 m_jobQueues[i].
init(m_threadSupport, &m_jobQueues);
485 m_perThreadJobQueues.
resize(m_numThreads);
486 for (
int i = 0; i < m_numThreads; i++)
492 if (numThreadsPerQueue == 1)
495 jq = &m_jobQueues[i - kFirstWorkerThreadId];
500 jq = &m_jobQueues[i / numThreadsPerQueue];
503 m_perThreadJobQueues[i] = jq;
505 m_threadLocalStorage.
resize(m_numThreads);
506 for (
int i = 0; i < m_numThreads; i++)
508 ThreadLocalStorage& storage = m_threadLocalStorage[i];
509 storage.m_threadId = i;
510 storage.m_directive = m_workerDirective;
511 storage.m_status = WorkerThreadStatus::kSleeping;
512 storage.m_cooldownTime = 100;
513 storage.m_clock = &m_clock;
514 storage.m_queue = m_perThreadJobQueues[i];
516 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
520 void setWorkerDirectives(WorkerThreadDirectives::Type dir)
522 m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
527 return m_maxNumThreads;
537 m_numThreads =
btMax(
btMin(numThreads,
int(m_maxNumThreads)), 1);
538 m_numWorkerThreads = m_numThreads - 1;
539 m_numActiveJobQueues = 0;
541 if (m_numWorkerThreads > 0)
544 JobQueue* lastActiveContext = m_perThreadJobQueues[m_numThreads - 1];
545 int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
546 m_numActiveJobQueues = iLastActiveContext + 1;
547 for (
int i = 0; i < m_jobQueues.size(); ++i)
549 m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues);
552 m_workerDirective->setDirectiveByRange(m_numThreads,
BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
559 int numMainThreadJobsFinished = 0;
560 for (
int i = 0; i < m_numActiveJobQueues; ++i)
562 while (IJob* job = m_jobQueues[i].consumeJob())
565 numMainThreadJobsFinished++;
570 setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle);
576 int numWorkerJobsFinished = 0;
577 for (
int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread)
579 ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
580 storage->m_mutex.lock();
581 numWorkerJobsFinished += storage->m_numJobsFinished;
582 storage->m_mutex.unlock();
584 if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
590 if (timeElapsed > 100000)
598 void wakeWorkers(
int numWorkersToWake)
601 btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs);
602 int numDesiredWorkers =
btMin(numWorkersToWake, m_numWorkerThreads);
603 int numActiveWorkers = 0;
604 for (
int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker)
608 ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
609 if (storage.m_status != WorkerThreadStatus::kSleeping)
614 for (
int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker)
616 ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
617 if (storage.m_status == WorkerThreadStatus::kSleeping)
619 m_threadSupport->
runTask(iWorker, &storage);
625 void waitForWorkersToSleep()
628 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
630 for (
int i = kFirstWorkerThreadId; i < m_numThreads; i++)
632 ThreadLocalStorage& storage = m_threadLocalStorage[i];
633 btAssert(storage.m_status == WorkerThreadStatus::kSleeping);
641 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
644 void prepareWorkerThreads()
646 for (
int i = kFirstWorkerThreadId; i < m_numThreads; ++i)
648 ThreadLocalStorage& storage = m_threadLocalStorage[i];
649 storage.m_mutex.lock();
650 storage.m_numJobsFinished = 0;
651 storage.m_mutex.unlock();
653 setWorkerDirectives(WorkerThreadDirectives::kScanForJobs);
661 int iterationCount = iEnd - iBegin;
662 if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.
tryLock())
664 typedef ParallelForJob JobType;
665 int jobCount = (iterationCount + grainSize - 1) / grainSize;
666 m_numJobs = jobCount;
668 int jobSize =
sizeof(JobType);
670 for (
int i = 0; i < m_numActiveJobQueues; ++i)
672 m_jobQueues[i].clearQueue(jobCount, jobSize);
675 prepareWorkerThreads();
678 int iThread = kFirstWorkerThreadId;
679 for (
int i = iBegin; i < iEnd; i += grainSize)
682 int iE =
btMin(i + grainSize, iEnd);
683 JobQueue* jq = m_perThreadJobQueues[iThread];
685 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
686 void* jobMem = jq->allocJobMem(jobSize);
687 JobType* job =
new (jobMem) ParallelForJob(i, iE, body);
691 if (iThread >= m_numThreads)
693 iThread = kFirstWorkerThreadId;
696 wakeWorkers(jobCount - 1);
700 m_antiNestingLock.
unlock();
706 body.forLoop(iBegin, iEnd);
714 int iterationCount = iEnd - iBegin;
715 if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.
tryLock())
717 typedef ParallelSumJob JobType;
718 int jobCount = (iterationCount + grainSize - 1) / grainSize;
719 m_numJobs = jobCount;
721 int jobSize =
sizeof(JobType);
722 for (
int i = 0; i < m_numActiveJobQueues; ++i)
724 m_jobQueues[i].clearQueue(jobCount, jobSize);
728 for (
int iThread = 0; iThread < m_numThreads; ++iThread)
730 m_threadLocalStorage[iThread].m_sumResult =
btScalar(0);
734 prepareWorkerThreads();
737 int iThread = kFirstWorkerThreadId;
738 for (
int i = iBegin; i < iEnd; i += grainSize)
741 int iE =
btMin(i + grainSize, iEnd);
742 JobQueue* jq = m_perThreadJobQueues[iThread];
744 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
745 void* jobMem = jq->allocJobMem(jobSize);
746 JobType* job =
new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]);
750 if (iThread >= m_numThreads)
752 iThread = kFirstWorkerThreadId;
755 wakeWorkers(jobCount - 1);
762 for (
int iThread = 0; iThread < m_numThreads; ++iThread)
764 sum += m_threadLocalStorage[iThread].m_sumResult;
766 m_antiNestingLock.
unlock();
773 return body.sumLoop(iBegin, iEnd);
780 btTaskSchedulerDefault* ts =
new btTaskSchedulerDefault();
#define btAlignedFree(ptr)
#define btAlignedAlloc(size, alignment)
const T & btMax(const T &a, const T &b)
const T & btMin(const T &a, const T &b)
float btScalar
The btScalar type abstracts floating point numbers, to easily switch between double and single floati...
#define ATTRIBUTE_ALIGNED64(a)
static T sum(const btAlignedObjectArray< T > &items)
btITaskScheduler * btCreateDefaultTaskScheduler()
const unsigned int BT_MAX_THREAD_COUNT
The btAlignedObjectArray template class uses a subset of the stl::vector interface for its methods It...
void resizeNoInitialize(int newsize)
resize changes the number of elements in the array.
int size() const
return the number of elements in the array
void resize(int newsize, const T &fillData=T())
void push_back(const T &_Val)
int capacity() const
return the pre-allocated (reserved) elements, this is at least as large as the total number of elemen...
The btClock is a portable basic clock that measures accurate time in seconds, use for profiling.
unsigned long long int getTimeMicroseconds()
Returns the time in us since the last call to reset or since the Clock was created.
virtual void forLoop(int iBegin, int iEnd) const =0
virtual btScalar sumLoop(int iBegin, int iEnd) const =0
virtual int getNumThreads() const =0
virtual int getMaxNumThreads() const =0
virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody &body)=0
virtual void sleepWorkerThreadsHint()
virtual void setNumThreads(int numThreads)=0
virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody &body)=0
btSpinMutex – lightweight spin-mutex implemented with atomic ops, never puts a thread to sleep becaus...
virtual int getCacheFriendlyNumThreads() const =0
virtual int getLogicalToPhysicalCoreRatio() const =0
virtual void waitForAllTasks()=0
static btThreadSupportInterface * create(const ConstructionInfo &info)
virtual void runTask(int threadIndex, void *userData)=0
virtual int getNumWorkerThreads() const =0
virtual void deleteCriticalSection(btCriticalSection *criticalSection)=0
virtual btCriticalSection * createCriticalSection()=0