Bullet Collision Detection & Physics Library
btTaskScheduler.cpp
Go to the documentation of this file.
1
6#include <stdio.h>
7#include <algorithm>
8
9#if BT_THREADSAFE
10
12
13#if defined(_WIN32)
14
15#define WIN32_LEAN_AND_MEAN
16
17#include <windows.h>
18
19#endif
20
21typedef unsigned long long btU64;
22static const int kCacheLineSize = 64;
23
24void btSpinPause()
25{
26#if defined(_WIN32)
28#endif
29}
30
32{
33 enum Type
34 {
39 };
40};
41
44{
45 static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
46 // directives for all worker threads packed into a single cacheline
48
49public:
50 enum Type
51 {
53 kGoToSleep, // go to sleep
54 kStayAwakeButIdle, // wait for not checking job queue
55 kScanForJobs, // actively scan job queue for jobs
56 };
58 {
59 for (int i = 0; i < kMaxThreadCount; ++i)
60 {
61 m_threadDirs[i] = 0;
62 }
63 }
64
65 Type getDirective(int threadId)
66 {
68 return static_cast<Type>(m_threadDirs[threadId]);
69 }
70
71 void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
72 {
75 char dirChar = static_cast<char>(dir);
76 for (int i = threadBegin; i < threadEnd; ++i)
77 {
79 }
80 }
81};
82
83class JobQueue;
84
87{
88 int m_threadId;
89 WorkerThreadStatus::Type m_status;
91 btSpinMutex m_mutex;
96 unsigned int m_cooldownTime;
97};
98
99struct IJob
100{
101 virtual void executeJob(int threadId) = 0;
102};
103
104class ParallelForJob : public IJob
105{
106 const btIParallelForBody* m_body;
107 int m_begin;
108 int m_end;
109
110public:
111 ParallelForJob(int iBegin, int iEnd, const btIParallelForBody& body)
112 {
113 m_body = &body;
114 m_begin = iBegin;
115 m_end = iEnd;
116 }
117 virtual void executeJob(int threadId) BT_OVERRIDE
118 {
119 BT_PROFILE("executeJob");
120
121 // call the functor body to do the work
122 m_body->forLoop(m_begin, m_end);
123 }
124};
125
126class ParallelSumJob : public IJob
127{
128 const btIParallelSumBody* m_body;
130 int m_begin;
131 int m_end;
132
133public:
135 {
136 m_body = &body;
138 m_begin = iBegin;
139 m_end = iEnd;
140 }
141 virtual void executeJob(int threadId) BT_OVERRIDE
142 {
143 BT_PROFILE("executeJob");
144
145 // call the functor body to do the work
146 btScalar val = m_body->sumLoop(m_begin, m_end);
147#if BT_PARALLEL_SUM_DETERMINISTISM
148 // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
149 const float TRUNC_SCALE = float(1 << 19);
150 val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE; // truncate some bits
151#endif
152 m_threadLocalStoreArray[threadId].m_sumResult += val;
153 }
154};
155
158{
161 btSpinMutex m_mutex;
162
164 char* m_jobMem;
165 int m_jobMemSize;
166 bool m_queueIsEmpty;
167 int m_tailIndex;
168 int m_headIndex;
169 int m_allocSize;
170 bool m_useSpinMutex;
172 char m_cachePadding[kCacheLineSize]; // prevent false sharing
173
174 void freeJobMem()
175 {
176 if (m_jobMem)
177 {
178 // free old
180 m_jobMem = NULL;
181 }
182 }
183 void resizeJobMem(int newSize)
184 {
185 if (newSize > m_jobMemSize)
186 {
187 freeJobMem();
188 m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
190 }
191 }
192
193public:
194 JobQueue()
195 {
196 m_jobMem = NULL;
197 m_jobMemSize = 0;
200 m_headIndex = 0;
201 m_tailIndex = 0;
202 m_useSpinMutex = false;
203 }
204 ~JobQueue()
205 {
206 exit();
207 }
208 void exit()
209 {
210 freeJobMem();
212 {
213 m_threadSupport->deleteCriticalSection(m_queueLock);
215 m_threadSupport = 0;
216 }
217 }
218
220 {
222 if (threadSup)
223 {
224 m_queueLock = m_threadSupport->createCriticalSection();
225 }
227 }
229 {
231 int selfIndex = 0;
232 for (int i = 0; i < contexts.size(); ++i)
233 {
234 if (this == &contexts[i])
235 {
236 selfIndex = i;
237 break;
238 }
239 }
240 int numNeighbors = btMin(2, contexts.size() - 1);
241 int neighborOffsets[] = {-1, 1, -2, 2, -3, 3};
242 int numOffsets = sizeof(neighborOffsets) / sizeof(neighborOffsets[0]);
244 m_neighborContexts.resizeNoInitialize(0);
245 for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
246 {
249 {
251 }
252 }
253 }
254
255 bool isQueueEmpty() const { return m_queueIsEmpty; }
256 void lockQueue()
257 {
258 if (m_useSpinMutex)
259 {
260 m_mutex.lock();
261 }
262 else
263 {
264 m_queueLock->lock();
265 }
266 }
267 void unlockQueue()
268 {
269 if (m_useSpinMutex)
270 {
271 m_mutex.unlock();
272 }
273 else
274 {
275 m_queueLock->unlock();
276 }
277 }
278 void clearQueue(int jobCount, int jobSize)
279 {
280 lockQueue();
281 m_headIndex = 0;
282 m_tailIndex = 0;
283 m_allocSize = 0;
284 m_queueIsEmpty = true;
286 // make sure we have enough memory allocated to store jobs
288 {
290 }
291 // make sure job queue is big enough
292 if (jobCount > m_jobQueue.capacity())
293 {
294 m_jobQueue.reserve(jobCount);
295 }
296 unlockQueue();
297 m_jobQueue.resizeNoInitialize(0);
298 }
299 void* allocJobMem(int jobSize)
300 {
302 void* jobMem = &m_jobMem[m_allocSize];
304 return jobMem;
305 }
306 void submitJob(IJob * job)
307 {
308 btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
309 m_jobQueue.push_back(job);
310 lockQueue();
311 m_tailIndex++;
312 m_queueIsEmpty = false;
313 unlockQueue();
314 }
316 {
317 if (m_queueIsEmpty)
318 {
319 // lock free path. even if this is taken erroneously it isn't harmful
320 return NULL;
321 }
322 IJob* job = NULL;
323 lockQueue();
324 if (!m_queueIsEmpty)
325 {
327 btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
329 {
330 m_queueIsEmpty = true;
331 }
332 }
333 unlockQueue();
334 return job;
335 }
337 {
339 {
340 return job;
341 }
342 // own queue is empty, try to steal from neighbor
343 for (int i = 0; i < m_neighborContexts.size(); ++i)
344 {
346 if (IJob* job = otherContext->consumeJobFromOwnQueue())
347 {
348 return job;
349 }
350 }
351 return NULL;
352 }
353};
354
355static void WorkerThreadFunc(void* userPtr)
356{
357 BT_PROFILE("WorkerThreadFunc");
359 JobQueue* jobQueue = localStorage->m_queue;
360
361 bool shouldSleep = false;
362 int threadId = localStorage->m_threadId;
363 while (!shouldSleep)
364 {
365 // do work
366 localStorage->m_mutex.lock();
367 while (IJob* job = jobQueue->consumeJob())
368 {
369 localStorage->m_status = WorkerThreadStatus::kWorking;
370 job->executeJob(threadId);
371 localStorage->m_numJobsFinished++;
372 }
373 localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
374 localStorage->m_mutex.unlock();
375 btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
376 // while queue is empty,
377 while (jobQueue->isQueueEmpty())
378 {
379 // todo: spin wait a bit to avoid hammering the empty queue
380 btSpinPause();
381 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep)
382 {
383 shouldSleep = true;
384 break;
385 }
386 // if jobs are incoming,
387 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs)
388 {
389 clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
390 }
391 else
392 {
393 for (int i = 0; i < 50; ++i)
394 {
395 btSpinPause();
396 btSpinPause();
397 btSpinPause();
398 btSpinPause();
399 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
400 {
401 break;
402 }
403 }
404 // if no jobs incoming and queue has been empty for the cooldown time, sleep
405 btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
406 if (timeElapsed > localStorage->m_cooldownTime)
407 {
408 shouldSleep = true;
409 break;
410 }
411 }
412 }
413 }
414 {
415 BT_PROFILE("sleep");
416 // go sleep
417 localStorage->m_mutex.lock();
418 localStorage->m_status = WorkerThreadStatus::kSleeping;
419 localStorage->m_mutex.unlock();
420 }
421}
422
424{
430 btSpinMutex m_antiNestingLock; // prevent nested parallel-for
432 int m_numThreads;
435 int m_maxNumThreads;
436 int m_numJobs;
437 static const int kFirstWorkerThreadId = 1;
438
439public:
440 btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
441 {
444 }
445
447 {
449
450 for (int i = 0; i < m_jobQueues.size(); ++i)
451 {
452 m_jobQueues[i].exit();
453 }
454
455 if (m_threadSupport)
456 {
457 delete m_threadSupport;
459 }
461 {
464 }
465 }
466
467 void init()
468 {
472
473 m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
474 m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
476 // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
477 int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
481 for (int i = 0; i < m_jobQueues.size(); ++i)
482 {
484 }
486 for (int i = 0; i < m_numThreads; i++)
487 {
488 JobQueue* jq = NULL;
489 // only worker threads get a job queue
490 if (i > 0)
491 {
492 if (numThreadsPerQueue == 1)
493 {
494 // one queue per worker thread
496 }
497 else
498 {
499 // 2 threads share each queue
501 }
502 }
504 }
506 for (int i = 0; i < m_numThreads; i++)
507 {
509 storage.m_threadId = i;
510 storage.m_directive = m_workerDirective;
511 storage.m_status = WorkerThreadStatus::kSleeping;
512 storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
513 storage.m_clock = &m_clock;
514 storage.m_queue = m_perThreadJobQueues[i];
515 }
516 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); // no work for them yet
517 setNumThreads(m_threadSupport->getCacheFriendlyNumThreads());
518 }
519
520 void setWorkerDirectives(WorkerThreadDirectives::Type dir)
521 {
523 }
524
526 {
527 return m_maxNumThreads;
528 }
529
530 virtual int getNumThreads() const BT_OVERRIDE
531 {
532 return m_numThreads;
533 }
534
535 virtual void setNumThreads(int numThreads) BT_OVERRIDE
536 {
540 // if there is at least 1 worker,
541 if (m_numWorkerThreads > 0)
542 {
543 // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
547 for (int i = 0; i < m_jobQueues.size(); ++i)
548 {
549 m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues);
550 }
551 }
552 m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
553 }
554
555 void waitJobs()
556 {
557 BT_PROFILE("waitJobs");
558 // have the main thread work until the job queues are empty
560 for (int i = 0; i < m_numActiveJobQueues; ++i)
561 {
562 while (IJob* job = m_jobQueues[i].consumeJob())
563 {
564 job->executeJob(0);
566 }
567 }
568
569 // done with jobs for now, tell workers to rest (but not sleep)
570 setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle);
571
572 btU64 clockStart = m_clock.getTimeMicroseconds();
573 // wait for workers to finish any jobs in progress
574 while (true)
575 {
576 int numWorkerJobsFinished = 0;
578 {
580 storage->m_mutex.lock();
581 numWorkerJobsFinished += storage->m_numJobsFinished;
582 storage->m_mutex.unlock();
583 }
585 {
586 break;
587 }
588 btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
589 btAssert(timeElapsed < 1000);
590 if (timeElapsed > 100000)
591 {
592 break;
593 }
594 btSpinPause();
595 }
596 }
597
599 {
600 BT_PROFILE("wakeWorkers");
601 btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs);
603 int numActiveWorkers = 0;
604 for (int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker)
605 {
606 // note this count of active workers is not necessarily totally reliable, because a worker thread could be
607 // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
609 if (storage.m_status != WorkerThreadStatus::kSleeping)
610 {
612 }
613 }
615 {
617 if (storage.m_status == WorkerThreadStatus::kSleeping)
618 {
619 m_threadSupport->runTask(iWorker, &storage);
621 }
622 }
623 }
624
626 {
627 BT_PROFILE("waitForWorkersToSleep");
628 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
629 m_threadSupport->waitForAllTasks();
630 for (int i = kFirstWorkerThreadId; i < m_numThreads; i++)
631 {
633 btAssert(storage.m_status == WorkerThreadStatus::kSleeping);
634 }
635 }
636
638 {
639 BT_PROFILE("sleepWorkerThreadsHint");
640 // hint the task scheduler that we may not be using these threads for a little while
641 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
642 }
643
645 {
646 for (int i = kFirstWorkerThreadId; i < m_numThreads; ++i)
647 {
649 storage.m_mutex.lock();
650 storage.m_numJobsFinished = 0;
651 storage.m_mutex.unlock();
652 }
653 setWorkerDirectives(WorkerThreadDirectives::kScanForJobs);
654 }
655
656 virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody& body) BT_OVERRIDE
657 {
658 BT_PROFILE("parallelFor_ThreadSupport");
659 btAssert(iEnd >= iBegin);
660 btAssert(grainSize >= 1);
661 int iterationCount = iEnd - iBegin;
663 {
664 typedef ParallelForJob JobType;
667 btAssert(jobCount >= 2); // need more than one job for multithreading
668 int jobSize = sizeof(JobType);
669
670 for (int i = 0; i < m_numActiveJobQueues; ++i)
671 {
672 m_jobQueues[i].clearQueue(jobCount, jobSize);
673 }
674 // prepare worker threads for incoming work
676 // submit all of the jobs
677 int iJob = 0;
678 int iThread = kFirstWorkerThreadId; // first worker thread
679 for (int i = iBegin; i < iEnd; i += grainSize)
680 {
682 int iE = btMin(i + grainSize, iEnd);
684 btAssert(jq);
686 void* jobMem = jq->allocJobMem(jobSize);
687 JobType* job = new (jobMem) ParallelForJob(i, iE, body); // placement new
688 jq->submitJob(job);
689 iJob++;
690 iThread++;
691 if (iThread >= m_numThreads)
692 {
693 iThread = kFirstWorkerThreadId; // first worker thread
694 }
695 }
697
698 // put the main thread to work on emptying the job queue and then wait for all workers to finish
699 waitJobs();
700 m_antiNestingLock.unlock();
701 }
702 else
703 {
704 BT_PROFILE("parallelFor_mainThread");
705 // just run on main thread
706 body.forLoop(iBegin, iEnd);
707 }
708 }
709 virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body) BT_OVERRIDE
710 {
711 BT_PROFILE("parallelSum_ThreadSupport");
712 btAssert(iEnd >= iBegin);
713 btAssert(grainSize >= 1);
714 int iterationCount = iEnd - iBegin;
716 {
717 typedef ParallelSumJob JobType;
720 btAssert(jobCount >= 2); // need more than one job for multithreading
721 int jobSize = sizeof(JobType);
722 for (int i = 0; i < m_numActiveJobQueues; ++i)
723 {
724 m_jobQueues[i].clearQueue(jobCount, jobSize);
725 }
726
727 // initialize summation
728 for (int iThread = 0; iThread < m_numThreads; ++iThread)
729 {
730 m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
731 }
732
733 // prepare worker threads for incoming work
735 // submit all of the jobs
736 int iJob = 0;
737 int iThread = kFirstWorkerThreadId; // first worker thread
738 for (int i = iBegin; i < iEnd; i += grainSize)
739 {
741 int iE = btMin(i + grainSize, iEnd);
743 btAssert(jq);
745 void* jobMem = jq->allocJobMem(jobSize);
746 JobType* job = new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]); // placement new
747 jq->submitJob(job);
748 iJob++;
749 iThread++;
750 if (iThread >= m_numThreads)
751 {
752 iThread = kFirstWorkerThreadId; // first worker thread
753 }
754 }
756
757 // put the main thread to work on emptying the job queue and then wait for all workers to finish
758 waitJobs();
759
760 // add up all the thread sums
761 btScalar sum = btScalar(0);
762 for (int iThread = 0; iThread < m_numThreads; ++iThread)
763 {
764 sum += m_threadLocalStorage[iThread].m_sumResult;
765 }
766 m_antiNestingLock.unlock();
767 return sum;
768 }
769 else
770 {
771 BT_PROFILE("parallelSum_mainThread");
772 // just run on main thread
773 return body.sumLoop(iBegin, iEnd);
774 }
775 }
776};
777
779{
781 ts->init();
782 return ts;
783}
784
785#else // #if BT_THREADSAFE
786
788{
789 return NULL;
790}
791
792#endif // #else // #if BT_THREADSAFE
#define btAlignedFree(ptr)
#define btAlignedAlloc(size, alignment)
const T & btMax(const T &a, const T &b)
Definition btMinMax.h:27
const T & btMin(const T &a, const T &b)
Definition btMinMax.h:21
#define BT_PROFILE(name)
float btScalar
The btScalar type abstracts floating point numbers, to easily switch between double and single floati...
Definition btScalar.h:314
#define ATTRIBUTE_ALIGNED64(a)
Definition btScalar.h:100
#define btAssert(x)
Definition btScalar.h:153
static T sum(const btAlignedObjectArray< T > &items)
btITaskScheduler * btCreateDefaultTaskScheduler()
#define BT_OVERRIDE
Definition btThreads.h:26
const unsigned int BT_MAX_THREAD_COUNT
Definition btThreads.h:31
The btAlignedObjectArray template class uses a subset of the stl::vector interface for its methods It...
The btClock is a portable basic clock that measures accurate time in seconds, use for profiling.
Definition btQuickprof.h:23
virtual void forLoop(int iBegin, int iEnd) const =0
virtual btScalar sumLoop(int iBegin, int iEnd) const =0
virtual int getNumThreads() const =0
virtual int getMaxNumThreads() const =0
virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody &body)=0
virtual void sleepWorkerThreadsHint()
Definition btThreads.h:135
virtual void setNumThreads(int numThreads)=0
virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody &body)=0
btSpinMutex – lightweight spin-mutex implemented with atomic ops, never puts a thread to sleep becaus...
Definition btThreads.h:46
void unlock()
static btThreadSupportInterface * create(const ConstructionInfo &info)