Bullet Collision Detection & Physics Library
btTaskScheduler.cpp
Go to the documentation of this file.
1
6#include <stdio.h>
7#include <algorithm>
8
9#if BT_THREADSAFE
10
12
13#if defined(_WIN32)
14
15#define WIN32_LEAN_AND_MEAN
16
17#include <windows.h>
18
19#endif
20
21typedef unsigned long long btU64;
22static const int kCacheLineSize = 64;
23
24void btSpinPause()
25{
26#if defined(_WIN32)
27 YieldProcessor();
28#endif
29}
30
31struct WorkerThreadStatus
32{
33 enum Type
34 {
35 kInvalid,
36 kWaitingForWork,
37 kWorking,
38 kSleeping,
39 };
40};
41
43WorkerThreadDirectives
44{
45 static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
46 // directives for all worker threads packed into a single cacheline
47 char m_threadDirs[kMaxThreadCount];
48
49public:
50 enum Type
51 {
52 kInvalid,
53 kGoToSleep, // go to sleep
54 kStayAwakeButIdle, // wait for not checking job queue
55 kScanForJobs, // actively scan job queue for jobs
56 };
57 WorkerThreadDirectives()
58 {
59 for (int i = 0; i < kMaxThreadCount; ++i)
60 {
61 m_threadDirs[i] = 0;
62 }
63 }
64
65 Type getDirective(int threadId)
66 {
67 btAssert(threadId < kMaxThreadCount);
68 return static_cast<Type>(m_threadDirs[threadId]);
69 }
70
71 void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
72 {
73 btAssert(threadBegin < threadEnd);
74 btAssert(threadEnd <= kMaxThreadCount);
75 char dirChar = static_cast<char>(dir);
76 for (int i = threadBegin; i < threadEnd; ++i)
77 {
78 m_threadDirs[i] = dirChar;
79 }
80 }
81};
82
83class JobQueue;
84
86ThreadLocalStorage
87{
88 int m_threadId;
89 WorkerThreadStatus::Type m_status;
90 int m_numJobsFinished;
91 btSpinMutex m_mutex;
92 btScalar m_sumResult;
93 WorkerThreadDirectives* m_directive;
94 JobQueue* m_queue;
95 btClock* m_clock;
96 unsigned int m_cooldownTime;
97};
98
99struct IJob
100{
101 virtual void executeJob(int threadId) = 0;
102};
103
104class ParallelForJob : public IJob
105{
106 const btIParallelForBody* m_body;
107 int m_begin;
108 int m_end;
109
110public:
111 ParallelForJob(int iBegin, int iEnd, const btIParallelForBody& body)
112 {
113 m_body = &body;
114 m_begin = iBegin;
115 m_end = iEnd;
116 }
117 virtual void executeJob(int threadId) BT_OVERRIDE
118 {
119 BT_PROFILE("executeJob");
120
121 // call the functor body to do the work
122 m_body->forLoop(m_begin, m_end);
123 }
124};
125
126class ParallelSumJob : public IJob
127{
128 const btIParallelSumBody* m_body;
129 ThreadLocalStorage* m_threadLocalStoreArray;
130 int m_begin;
131 int m_end;
132
133public:
134 ParallelSumJob(int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls)
135 {
136 m_body = &body;
137 m_threadLocalStoreArray = tls;
138 m_begin = iBegin;
139 m_end = iEnd;
140 }
141 virtual void executeJob(int threadId) BT_OVERRIDE
142 {
143 BT_PROFILE("executeJob");
144
145 // call the functor body to do the work
146 btScalar val = m_body->sumLoop(m_begin, m_end);
147#if BT_PARALLEL_SUM_DETERMINISTISM
148 // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
149 const float TRUNC_SCALE = float(1 << 19);
150 val = floor(val * TRUNC_SCALE + 0.5f) / TRUNC_SCALE; // truncate some bits
151#endif
152 m_threadLocalStoreArray[threadId].m_sumResult += val;
153 }
154};
155
157JobQueue
158{
159 btThreadSupportInterface* m_threadSupport;
160 btCriticalSection* m_queueLock;
161 btSpinMutex m_mutex;
162
164 char* m_jobMem;
165 int m_jobMemSize;
166 bool m_queueIsEmpty;
167 int m_tailIndex;
168 int m_headIndex;
169 int m_allocSize;
170 bool m_useSpinMutex;
171 btAlignedObjectArray<JobQueue*> m_neighborContexts;
172 char m_cachePadding[kCacheLineSize]; // prevent false sharing
173
174 void freeJobMem()
175 {
176 if (m_jobMem)
177 {
178 // free old
179 btAlignedFree(m_jobMem);
180 m_jobMem = NULL;
181 }
182 }
183 void resizeJobMem(int newSize)
184 {
185 if (newSize > m_jobMemSize)
186 {
187 freeJobMem();
188 m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
189 m_jobMemSize = newSize;
190 }
191 }
192
193public:
194 JobQueue()
195 {
196 m_jobMem = NULL;
197 m_jobMemSize = 0;
198 m_threadSupport = NULL;
199 m_queueLock = NULL;
200 m_headIndex = 0;
201 m_tailIndex = 0;
202 m_useSpinMutex = false;
203 }
204 ~JobQueue()
205 {
206 exit();
207 }
208 void exit()
209 {
210 freeJobMem();
211 if (m_queueLock && m_threadSupport)
212 {
213 m_threadSupport->deleteCriticalSection(m_queueLock);
214 m_queueLock = NULL;
215 m_threadSupport = 0;
216 }
217 }
218
219 void init(btThreadSupportInterface * threadSup, btAlignedObjectArray<JobQueue> * contextArray)
220 {
221 m_threadSupport = threadSup;
222 if (threadSup)
223 {
224 m_queueLock = m_threadSupport->createCriticalSection();
225 }
226 setupJobStealing(contextArray, contextArray->size());
227 }
228 void setupJobStealing(btAlignedObjectArray<JobQueue> * contextArray, int numActiveContexts)
229 {
230 btAlignedObjectArray<JobQueue>& contexts = *contextArray;
231 int selfIndex = 0;
232 for (int i = 0; i < contexts.size(); ++i)
233 {
234 if (this == &contexts[i])
235 {
236 selfIndex = i;
237 break;
238 }
239 }
240 int numNeighbors = btMin(2, contexts.size() - 1);
241 int neighborOffsets[] = {-1, 1, -2, 2, -3, 3};
242 int numOffsets = sizeof(neighborOffsets) / sizeof(neighborOffsets[0]);
243 m_neighborContexts.reserve(numNeighbors);
244 m_neighborContexts.resizeNoInitialize(0);
245 for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
246 {
247 int neighborIndex = selfIndex + neighborOffsets[i];
248 if (neighborIndex >= 0 && neighborIndex < numActiveContexts)
249 {
250 m_neighborContexts.push_back(&contexts[neighborIndex]);
251 }
252 }
253 }
254
255 bool isQueueEmpty() const { return m_queueIsEmpty; }
256 void lockQueue()
257 {
258 if (m_useSpinMutex)
259 {
260 m_mutex.lock();
261 }
262 else
263 {
264 m_queueLock->lock();
265 }
266 }
267 void unlockQueue()
268 {
269 if (m_useSpinMutex)
270 {
271 m_mutex.unlock();
272 }
273 else
274 {
275 m_queueLock->unlock();
276 }
277 }
278 void clearQueue(int jobCount, int jobSize)
279 {
280 lockQueue();
281 m_headIndex = 0;
282 m_tailIndex = 0;
283 m_allocSize = 0;
284 m_queueIsEmpty = true;
285 int jobBufSize = jobSize * jobCount;
286 // make sure we have enough memory allocated to store jobs
287 if (jobBufSize > m_jobMemSize)
288 {
289 resizeJobMem(jobBufSize);
290 }
291 // make sure job queue is big enough
292 if (jobCount > m_jobQueue.capacity())
293 {
294 m_jobQueue.reserve(jobCount);
295 }
296 unlockQueue();
297 m_jobQueue.resizeNoInitialize(0);
298 }
299 void* allocJobMem(int jobSize)
300 {
301 btAssert(m_jobMemSize >= (m_allocSize + jobSize));
302 void* jobMem = &m_jobMem[m_allocSize];
303 m_allocSize += jobSize;
304 return jobMem;
305 }
306 void submitJob(IJob * job)
307 {
308 btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
309 m_jobQueue.push_back(job);
310 lockQueue();
311 m_tailIndex++;
312 m_queueIsEmpty = false;
313 unlockQueue();
314 }
315 IJob* consumeJobFromOwnQueue()
316 {
317 if (m_queueIsEmpty)
318 {
319 // lock free path. even if this is taken erroneously it isn't harmful
320 return NULL;
321 }
322 IJob* job = NULL;
323 lockQueue();
324 if (!m_queueIsEmpty)
325 {
326 job = m_jobQueue[m_headIndex++];
327 btAssert(reinterpret_cast<char*>(job) >= &m_jobMem[0] && reinterpret_cast<char*>(job) < &m_jobMem[0] + m_allocSize);
328 if (m_headIndex == m_tailIndex)
329 {
330 m_queueIsEmpty = true;
331 }
332 }
333 unlockQueue();
334 return job;
335 }
336 IJob* consumeJob()
337 {
338 if (IJob* job = consumeJobFromOwnQueue())
339 {
340 return job;
341 }
342 // own queue is empty, try to steal from neighbor
343 for (int i = 0; i < m_neighborContexts.size(); ++i)
344 {
345 JobQueue* otherContext = m_neighborContexts[i];
346 if (IJob* job = otherContext->consumeJobFromOwnQueue())
347 {
348 return job;
349 }
350 }
351 return NULL;
352 }
353};
354
355static void WorkerThreadFunc(void* userPtr)
356{
357 BT_PROFILE("WorkerThreadFunc");
358 ThreadLocalStorage* localStorage = (ThreadLocalStorage*)userPtr;
359 JobQueue* jobQueue = localStorage->m_queue;
360
361 bool shouldSleep = false;
362 int threadId = localStorage->m_threadId;
363 while (!shouldSleep)
364 {
365 // do work
366 localStorage->m_mutex.lock();
367 while (IJob* job = jobQueue->consumeJob())
368 {
369 localStorage->m_status = WorkerThreadStatus::kWorking;
370 job->executeJob(threadId);
371 localStorage->m_numJobsFinished++;
372 }
373 localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
374 localStorage->m_mutex.unlock();
375 btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
376 // while queue is empty,
377 while (jobQueue->isQueueEmpty())
378 {
379 // todo: spin wait a bit to avoid hammering the empty queue
380 btSpinPause();
381 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep)
382 {
383 shouldSleep = true;
384 break;
385 }
386 // if jobs are incoming,
387 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs)
388 {
389 clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
390 }
391 else
392 {
393 for (int i = 0; i < 50; ++i)
394 {
395 btSpinPause();
396 btSpinPause();
397 btSpinPause();
398 btSpinPause();
399 if (localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
400 {
401 break;
402 }
403 }
404 // if no jobs incoming and queue has been empty for the cooldown time, sleep
405 btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
406 if (timeElapsed > localStorage->m_cooldownTime)
407 {
408 shouldSleep = true;
409 break;
410 }
411 }
412 }
413 }
414 {
415 BT_PROFILE("sleep");
416 // go sleep
417 localStorage->m_mutex.lock();
418 localStorage->m_status = WorkerThreadStatus::kSleeping;
419 localStorage->m_mutex.unlock();
420 }
421}
422
423class btTaskSchedulerDefault : public btITaskScheduler
424{
425 btThreadSupportInterface* m_threadSupport;
426 WorkerThreadDirectives* m_workerDirective;
428 btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
429 btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
430 btSpinMutex m_antiNestingLock; // prevent nested parallel-for
431 btClock m_clock;
432 int m_numThreads;
433 int m_numWorkerThreads;
434 int m_numActiveJobQueues;
435 int m_maxNumThreads;
436 int m_numJobs;
437 static const int kFirstWorkerThreadId = 1;
438
439public:
440 btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
441 {
442 m_threadSupport = NULL;
443 m_workerDirective = NULL;
444 }
445
446 virtual ~btTaskSchedulerDefault()
447 {
448 waitForWorkersToSleep();
449
450 for (int i = 0; i < m_jobQueues.size(); ++i)
451 {
452 m_jobQueues[i].exit();
453 }
454
455 if (m_threadSupport)
456 {
457 delete m_threadSupport;
458 m_threadSupport = NULL;
459 }
460 if (m_workerDirective)
461 {
462 btAlignedFree(m_workerDirective);
463 m_workerDirective = NULL;
464 }
465 }
466
467 void init()
468 {
469 btThreadSupportInterface::ConstructionInfo constructionInfo("TaskScheduler", WorkerThreadFunc);
470 m_threadSupport = btThreadSupportInterface::create(constructionInfo);
471 m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
472
473 m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
474 m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
475 m_numThreads = m_maxNumThreads;
476 // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
477 int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
478 int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads - 1) : (m_maxNumThreads / numThreadsPerQueue);
479 m_jobQueues.resize(numJobQueues);
480 m_numActiveJobQueues = numJobQueues;
481 for (int i = 0; i < m_jobQueues.size(); ++i)
482 {
483 m_jobQueues[i].init(m_threadSupport, &m_jobQueues);
484 }
485 m_perThreadJobQueues.resize(m_numThreads);
486 for (int i = 0; i < m_numThreads; i++)
487 {
488 JobQueue* jq = NULL;
489 // only worker threads get a job queue
490 if (i > 0)
491 {
492 if (numThreadsPerQueue == 1)
493 {
494 // one queue per worker thread
495 jq = &m_jobQueues[i - kFirstWorkerThreadId];
496 }
497 else
498 {
499 // 2 threads share each queue
500 jq = &m_jobQueues[i / numThreadsPerQueue];
501 }
502 }
503 m_perThreadJobQueues[i] = jq;
504 }
505 m_threadLocalStorage.resize(m_numThreads);
506 for (int i = 0; i < m_numThreads; i++)
507 {
508 ThreadLocalStorage& storage = m_threadLocalStorage[i];
509 storage.m_threadId = i;
510 storage.m_directive = m_workerDirective;
511 storage.m_status = WorkerThreadStatus::kSleeping;
512 storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
513 storage.m_clock = &m_clock;
514 storage.m_queue = m_perThreadJobQueues[i];
515 }
516 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep); // no work for them yet
517 setNumThreads(m_threadSupport->getCacheFriendlyNumThreads());
518 }
519
520 void setWorkerDirectives(WorkerThreadDirectives::Type dir)
521 {
522 m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
523 }
524
525 virtual int getMaxNumThreads() const BT_OVERRIDE
526 {
527 return m_maxNumThreads;
528 }
529
530 virtual int getNumThreads() const BT_OVERRIDE
531 {
532 return m_numThreads;
533 }
534
535 virtual void setNumThreads(int numThreads) BT_OVERRIDE
536 {
537 m_numThreads = btMax(btMin(numThreads, int(m_maxNumThreads)), 1);
538 m_numWorkerThreads = m_numThreads - 1;
539 m_numActiveJobQueues = 0;
540 // if there is at least 1 worker,
541 if (m_numWorkerThreads > 0)
542 {
543 // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
544 JobQueue* lastActiveContext = m_perThreadJobQueues[m_numThreads - 1];
545 int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
546 m_numActiveJobQueues = iLastActiveContext + 1;
547 for (int i = 0; i < m_jobQueues.size(); ++i)
548 {
549 m_jobQueues[i].setupJobStealing(&m_jobQueues, m_numActiveJobQueues);
550 }
551 }
552 m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
553 }
554
555 void waitJobs()
556 {
557 BT_PROFILE("waitJobs");
558 // have the main thread work until the job queues are empty
559 int numMainThreadJobsFinished = 0;
560 for (int i = 0; i < m_numActiveJobQueues; ++i)
561 {
562 while (IJob* job = m_jobQueues[i].consumeJob())
563 {
564 job->executeJob(0);
565 numMainThreadJobsFinished++;
566 }
567 }
568
569 // done with jobs for now, tell workers to rest (but not sleep)
570 setWorkerDirectives(WorkerThreadDirectives::kStayAwakeButIdle);
571
572 btU64 clockStart = m_clock.getTimeMicroseconds();
573 // wait for workers to finish any jobs in progress
574 while (true)
575 {
576 int numWorkerJobsFinished = 0;
577 for (int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread)
578 {
579 ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
580 storage->m_mutex.lock();
581 numWorkerJobsFinished += storage->m_numJobsFinished;
582 storage->m_mutex.unlock();
583 }
584 if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
585 {
586 break;
587 }
588 btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
589 btAssert(timeElapsed < 1000);
590 if (timeElapsed > 100000)
591 {
592 break;
593 }
594 btSpinPause();
595 }
596 }
597
598 void wakeWorkers(int numWorkersToWake)
599 {
600 BT_PROFILE("wakeWorkers");
601 btAssert(m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs);
602 int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
603 int numActiveWorkers = 0;
604 for (int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker)
605 {
606 // note this count of active workers is not necessarily totally reliable, because a worker thread could be
607 // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
608 ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
609 if (storage.m_status != WorkerThreadStatus::kSleeping)
610 {
611 numActiveWorkers++;
612 }
613 }
614 for (int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker)
615 {
616 ThreadLocalStorage& storage = m_threadLocalStorage[kFirstWorkerThreadId + iWorker];
617 if (storage.m_status == WorkerThreadStatus::kSleeping)
618 {
619 m_threadSupport->runTask(iWorker, &storage);
620 numActiveWorkers++;
621 }
622 }
623 }
624
625 void waitForWorkersToSleep()
626 {
627 BT_PROFILE("waitForWorkersToSleep");
628 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
629 m_threadSupport->waitForAllTasks();
630 for (int i = kFirstWorkerThreadId; i < m_numThreads; i++)
631 {
632 ThreadLocalStorage& storage = m_threadLocalStorage[i];
633 btAssert(storage.m_status == WorkerThreadStatus::kSleeping);
634 }
635 }
636
638 {
639 BT_PROFILE("sleepWorkerThreadsHint");
640 // hint the task scheduler that we may not be using these threads for a little while
641 setWorkerDirectives(WorkerThreadDirectives::kGoToSleep);
642 }
643
644 void prepareWorkerThreads()
645 {
646 for (int i = kFirstWorkerThreadId; i < m_numThreads; ++i)
647 {
648 ThreadLocalStorage& storage = m_threadLocalStorage[i];
649 storage.m_mutex.lock();
650 storage.m_numJobsFinished = 0;
651 storage.m_mutex.unlock();
652 }
653 setWorkerDirectives(WorkerThreadDirectives::kScanForJobs);
654 }
655
656 virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody& body) BT_OVERRIDE
657 {
658 BT_PROFILE("parallelFor_ThreadSupport");
659 btAssert(iEnd >= iBegin);
660 btAssert(grainSize >= 1);
661 int iterationCount = iEnd - iBegin;
662 if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
663 {
664 typedef ParallelForJob JobType;
665 int jobCount = (iterationCount + grainSize - 1) / grainSize;
666 m_numJobs = jobCount;
667 btAssert(jobCount >= 2); // need more than one job for multithreading
668 int jobSize = sizeof(JobType);
669
670 for (int i = 0; i < m_numActiveJobQueues; ++i)
671 {
672 m_jobQueues[i].clearQueue(jobCount, jobSize);
673 }
674 // prepare worker threads for incoming work
675 prepareWorkerThreads();
676 // submit all of the jobs
677 int iJob = 0;
678 int iThread = kFirstWorkerThreadId; // first worker thread
679 for (int i = iBegin; i < iEnd; i += grainSize)
680 {
681 btAssert(iJob < jobCount);
682 int iE = btMin(i + grainSize, iEnd);
683 JobQueue* jq = m_perThreadJobQueues[iThread];
684 btAssert(jq);
685 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
686 void* jobMem = jq->allocJobMem(jobSize);
687 JobType* job = new (jobMem) ParallelForJob(i, iE, body); // placement new
688 jq->submitJob(job);
689 iJob++;
690 iThread++;
691 if (iThread >= m_numThreads)
692 {
693 iThread = kFirstWorkerThreadId; // first worker thread
694 }
695 }
696 wakeWorkers(jobCount - 1);
697
698 // put the main thread to work on emptying the job queue and then wait for all workers to finish
699 waitJobs();
700 m_antiNestingLock.unlock();
701 }
702 else
703 {
704 BT_PROFILE("parallelFor_mainThread");
705 // just run on main thread
706 body.forLoop(iBegin, iEnd);
707 }
708 }
709 virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body) BT_OVERRIDE
710 {
711 BT_PROFILE("parallelSum_ThreadSupport");
712 btAssert(iEnd >= iBegin);
713 btAssert(grainSize >= 1);
714 int iterationCount = iEnd - iBegin;
715 if (iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock())
716 {
717 typedef ParallelSumJob JobType;
718 int jobCount = (iterationCount + grainSize - 1) / grainSize;
719 m_numJobs = jobCount;
720 btAssert(jobCount >= 2); // need more than one job for multithreading
721 int jobSize = sizeof(JobType);
722 for (int i = 0; i < m_numActiveJobQueues; ++i)
723 {
724 m_jobQueues[i].clearQueue(jobCount, jobSize);
725 }
726
727 // initialize summation
728 for (int iThread = 0; iThread < m_numThreads; ++iThread)
729 {
730 m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
731 }
732
733 // prepare worker threads for incoming work
734 prepareWorkerThreads();
735 // submit all of the jobs
736 int iJob = 0;
737 int iThread = kFirstWorkerThreadId; // first worker thread
738 for (int i = iBegin; i < iEnd; i += grainSize)
739 {
740 btAssert(iJob < jobCount);
741 int iE = btMin(i + grainSize, iEnd);
742 JobQueue* jq = m_perThreadJobQueues[iThread];
743 btAssert(jq);
744 btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
745 void* jobMem = jq->allocJobMem(jobSize);
746 JobType* job = new (jobMem) ParallelSumJob(i, iE, body, &m_threadLocalStorage[0]); // placement new
747 jq->submitJob(job);
748 iJob++;
749 iThread++;
750 if (iThread >= m_numThreads)
751 {
752 iThread = kFirstWorkerThreadId; // first worker thread
753 }
754 }
755 wakeWorkers(jobCount - 1);
756
757 // put the main thread to work on emptying the job queue and then wait for all workers to finish
758 waitJobs();
759
760 // add up all the thread sums
761 btScalar sum = btScalar(0);
762 for (int iThread = 0; iThread < m_numThreads; ++iThread)
763 {
764 sum += m_threadLocalStorage[iThread].m_sumResult;
765 }
766 m_antiNestingLock.unlock();
767 return sum;
768 }
769 else
770 {
771 BT_PROFILE("parallelSum_mainThread");
772 // just run on main thread
773 return body.sumLoop(iBegin, iEnd);
774 }
775 }
776};
777
779{
780 btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
781 ts->init();
782 return ts;
783}
784
785#else // #if BT_THREADSAFE
786
788{
789 return NULL;
790}
791
792#endif // #else // #if BT_THREADSAFE
#define btAlignedFree(ptr)
#define btAlignedAlloc(size, alignment)
const T & btMax(const T &a, const T &b)
Definition: btMinMax.h:27
const T & btMin(const T &a, const T &b)
Definition: btMinMax.h:21
#define BT_PROFILE(name)
Definition: btQuickprof.h:198
float btScalar
The btScalar type abstracts floating point numbers, to easily switch between double and single floati...
Definition: btScalar.h:314
#define ATTRIBUTE_ALIGNED64(a)
Definition: btScalar.h:100
#define btAssert(x)
Definition: btScalar.h:153
static T sum(const btAlignedObjectArray< T > &items)
btITaskScheduler * btCreateDefaultTaskScheduler()
#define BT_OVERRIDE
Definition: btThreads.h:26
const unsigned int BT_MAX_THREAD_COUNT
Definition: btThreads.h:31
The btAlignedObjectArray template class uses a subset of the stl::vector interface for its methods It...
void resizeNoInitialize(int newsize)
resize changes the number of elements in the array.
int size() const
return the number of elements in the array
void resize(int newsize, const T &fillData=T())
void push_back(const T &_Val)
int capacity() const
return the pre-allocated (reserved) elements, this is at least as large as the total number of elemen...
The btClock is a portable basic clock that measures accurate time in seconds, use for profiling.
Definition: btQuickprof.h:23
unsigned long long int getTimeMicroseconds()
Returns the time in us since the last call to reset or since the Clock was created.
virtual void lock()=0
virtual void unlock()=0
virtual void forLoop(int iBegin, int iEnd) const =0
virtual btScalar sumLoop(int iBegin, int iEnd) const =0
virtual int getNumThreads() const =0
virtual int getMaxNumThreads() const =0
virtual void parallelFor(int iBegin, int iEnd, int grainSize, const btIParallelForBody &body)=0
virtual void sleepWorkerThreadsHint()
Definition: btThreads.h:135
virtual void setNumThreads(int numThreads)=0
virtual btScalar parallelSum(int iBegin, int iEnd, int grainSize, const btIParallelSumBody &body)=0
btSpinMutex – lightweight spin-mutex implemented with atomic ops, never puts a thread to sleep becaus...
Definition: btThreads.h:46
void lock()
Definition: btThreads.cpp:196
bool tryLock()
Definition: btThreads.cpp:206
void unlock()
Definition: btThreads.cpp:201
virtual int getCacheFriendlyNumThreads() const =0
virtual int getLogicalToPhysicalCoreRatio() const =0
virtual void waitForAllTasks()=0
static btThreadSupportInterface * create(const ConstructionInfo &info)
virtual void runTask(int threadIndex, void *userData)=0
virtual int getNumWorkerThreads() const =0
virtual void deleteCriticalSection(btCriticalSection *criticalSection)=0
virtual btCriticalSection * createCriticalSection()=0