| 1 | /* |
| 2 | * Copyright (C) 2010 Apple Inc. All rights reserved. |
| 3 | * Copyright (C) 2017 Sony Interactive Entertainment Inc. |
| 4 | * |
| 5 | * Redistribution and use in source and binary forms, with or without |
| 6 | * modification, are permitted provided that the following conditions |
| 7 | * are met: |
| 8 | * 1. Redistributions of source code must retain the above copyright |
| 9 | * notice, this list of conditions and the following disclaimer. |
| 10 | * 2. Redistributions in binary form must reproduce the above copyright |
| 11 | * notice, this list of conditions and the following disclaimer in the |
| 12 | * documentation and/or other materials provided with the distribution. |
| 13 | * |
| 14 | * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS'' |
| 15 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, |
| 16 | * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| 17 | * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS |
| 18 | * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| 19 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| 20 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| 21 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| 22 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| 23 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF |
| 24 | * THE POSSIBILITY OF SUCH DAMAGE. |
| 25 | */ |
| 26 | |
| 27 | #include "config.h" |
| 28 | #include <wtf/WorkQueue.h> |
| 29 | |
| 30 | #include <mutex> |
| 31 | #include <wtf/Condition.h> |
| 32 | #include <wtf/Deque.h> |
| 33 | #include <wtf/Function.h> |
| 34 | #include <wtf/Lock.h> |
| 35 | #include <wtf/NeverDestroyed.h> |
| 36 | #include <wtf/NumberOfCores.h> |
| 37 | #include <wtf/Ref.h> |
| 38 | #include <wtf/Threading.h> |
| 39 | #include <wtf/text/StringConcatenateNumbers.h> |
| 40 | |
| 41 | namespace WTF { |
| 42 | |
| 43 | Ref<WorkQueue> WorkQueue::create(const char* name, Type type, QOS qos) |
| 44 | { |
| 45 | return adoptRef(*new WorkQueue(name, type, qos)); |
| 46 | } |
| 47 | |
| 48 | WorkQueue::WorkQueue(const char* name, Type type, QOS qos) |
| 49 | { |
| 50 | platformInitialize(name, type, qos); |
| 51 | } |
| 52 | |
| 53 | WorkQueue::~WorkQueue() |
| 54 | { |
| 55 | platformInvalidate(); |
| 56 | } |
| 57 | |
| 58 | #if !PLATFORM(COCOA) |
| 59 | void WorkQueue::concurrentApply(size_t iterations, WTF::Function<void (size_t index)>&& function) |
| 60 | { |
| 61 | if (!iterations) |
| 62 | return; |
| 63 | |
| 64 | if (iterations == 1) { |
| 65 | function(0); |
| 66 | return; |
| 67 | } |
| 68 | |
| 69 | class ThreadPool { |
| 70 | public: |
| 71 | ThreadPool() |
| 72 | { |
| 73 | // We don't need a thread for the current core. |
| 74 | unsigned threadCount = numberOfProcessorCores() - 1; |
| 75 | |
| 76 | m_workers.reserveInitialCapacity(threadCount); |
| 77 | for (unsigned i = 0; i < threadCount; ++i) { |
| 78 | m_workers.append(Thread::create("ThreadPool Worker" , [this] { |
| 79 | threadBody(); |
| 80 | })); |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | size_t workerCount() const { return m_workers.size(); } |
| 85 | |
| 86 | void dispatch(const WTF::Function<void ()>* function) |
| 87 | { |
| 88 | LockHolder holder(m_lock); |
| 89 | |
| 90 | m_queue.append(function); |
| 91 | m_condition.notifyOne(); |
| 92 | } |
| 93 | |
| 94 | private: |
| 95 | NO_RETURN void threadBody() |
| 96 | { |
| 97 | while (true) { |
| 98 | const WTF::Function<void ()>* function; |
| 99 | |
| 100 | { |
| 101 | LockHolder holder(m_lock); |
| 102 | |
| 103 | m_condition.wait(m_lock, [this] { |
| 104 | return !m_queue.isEmpty(); |
| 105 | }); |
| 106 | |
| 107 | function = m_queue.takeFirst(); |
| 108 | } |
| 109 | |
| 110 | (*function)(); |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | Lock m_lock; |
| 115 | Condition m_condition; |
| 116 | Deque<const WTF::Function<void ()>*> m_queue; |
| 117 | |
| 118 | Vector<Ref<Thread>> m_workers; |
| 119 | }; |
| 120 | |
| 121 | static LazyNeverDestroyed<ThreadPool> threadPool; |
| 122 | static std::once_flag onceFlag; |
| 123 | std::call_once(onceFlag, [] { |
| 124 | threadPool.construct(); |
| 125 | }); |
| 126 | |
| 127 | // Cap the worker count to the number of iterations (excluding this thread) |
| 128 | const size_t workerCount = std::min(iterations - 1, threadPool->workerCount()); |
| 129 | |
| 130 | std::atomic<size_t> currentIndex(0); |
| 131 | std::atomic<size_t> activeThreads(workerCount + 1); |
| 132 | |
| 133 | Condition condition; |
| 134 | Lock lock; |
| 135 | |
| 136 | WTF::Function<void ()> applier = [&, function = WTFMove(function)] { |
| 137 | size_t index; |
| 138 | |
| 139 | // Call the function for as long as there are iterations left. |
| 140 | while ((index = currentIndex++) < iterations) |
| 141 | function(index); |
| 142 | |
| 143 | // If there are no active threads left, signal the caller. |
| 144 | if (!--activeThreads) { |
| 145 | LockHolder holder(lock); |
| 146 | condition.notifyOne(); |
| 147 | } |
| 148 | }; |
| 149 | |
| 150 | for (size_t i = 0; i < workerCount; ++i) |
| 151 | threadPool->dispatch(&applier); |
| 152 | applier(); |
| 153 | |
| 154 | LockHolder holder(lock); |
| 155 | condition.wait(lock, [&] { return !activeThreads; }); |
| 156 | } |
| 157 | #endif |
| 158 | |
| 159 | } |
| 160 | |