| 1 | /* |
| 2 | * Copyright (C) 2018 Igalia, S.L. |
| 3 | * Copyright (C) 2018 Metrological Group B.V. |
| 4 | * |
| 5 | * This library is free software; you can redistribute it and/or |
| 6 | * modify it under the terms of the GNU Library General Public |
| 7 | * License as published by the Free Software Foundation; either |
| 8 | * version 2 of the License, or (at your option) any later version. |
| 9 | * |
| 10 | * This library is distributed in the hope that it will be useful, |
| 11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 13 | * Library General Public License for more details. |
| 14 | * |
| 15 | * You should have received a copy of the GNU Library General Public License |
| 16 | * aint with this library; see the file COPYING.LIB. If not, write to |
| 17 | * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, |
| 18 | * Boston, MA 02110-1301, USA. |
| 19 | */ |
| 20 | |
| 21 | #pragma once |
| 22 | |
| 23 | #include <wtf/Condition.h> |
| 24 | #include <wtf/Deque.h> |
| 25 | #include <wtf/Function.h> |
| 26 | #include <wtf/Lock.h> |
| 27 | #include <wtf/RunLoop.h> |
| 28 | #include <wtf/StdLibExtras.h> |
| 29 | |
| 30 | namespace WebCore { |
| 31 | |
| 32 | /* AbortableTaskQueue is a high-level synchronization object for cases where abortable work is done in |
| 33 | * background thread(s) that sometimes needs to post tasks to the main thread. |
| 34 | * |
| 35 | * The tasks posted by the background thread(s) to the main thread may be asynchronous, using enqueueTask(), |
| 36 | * which returns immediately; or synchronous, using enqueueTaskAndWait(), which blocks the calling |
| 37 | * background thread until the task is run by the main thread (possibly returning a value). |
| 38 | * |
| 39 | * What makes AbortableTaskQueue different from other task queueing mechanisms is that it provides a two-phase |
| 40 | * protocol for aborting the work in the background thread in presence of queued tasks without deadlocks or |
| 41 | * late notification bugs. |
| 42 | * |
| 43 | * Without a two-phase design deadlocks would occur when attempting an abort if a background thread was |
| 44 | * blocked in a synchronous task and needed to return from there for the abort to be handled. Also, without |
| 45 | * a design like this, tasks already enqueued at that point or soon thereafter until the abort is complete |
| 46 | * would still be handled by the main thread, even though we don't want to anymore. |
| 47 | * |
| 48 | * Aborting background processing with AbortableTaskQueue is a several step process: |
| 49 | * |
| 50 | * 1. Call abortableTaskQueue.startAborting() -- This will make any current or future (until further notice) |
| 51 | * synchronous tasks fail immediately, so that we don't deadlock in the next step. Also, tasks of any kind |
| 52 | * already enqueued will not be run. |
| 53 | * |
| 54 | * 2. Send the abort signal to the background threads. This is completely application specific. For instance, |
| 55 | * in the AppendPipeline case you would flush or reset the GStreamer pipeline here. Wait until all the |
| 56 | * background threads have finished aborting. |
| 57 | * |
| 58 | * 3. Call abortableTaskQueue.finishAborting() -- This will allow new tasks queued from this point on to be |
| 59 | * handled just as before the abort was made. |
| 60 | * |
| 61 | * 4. After this, the background thread(s) can be put to work again safely. |
| 62 | * |
| 63 | * This class is used for handling demuxer events in AppendPipeline, taking into account demuxing can be |
| 64 | * aborted at any moment if SourceBuffer.abort() is called or the SourceBuffer is destroyed. */ |
| 65 | class AbortableTaskQueue final { |
| 66 | WTF_MAKE_NONCOPYABLE(AbortableTaskQueue); |
| 67 | public: |
| 68 | AbortableTaskQueue() |
| 69 | { |
| 70 | ASSERT(isMainThread()); |
| 71 | } |
| 72 | |
| 73 | ~AbortableTaskQueue() |
| 74 | { |
| 75 | ASSERT(isMainThread()); |
| 76 | ASSERT(!m_mutex.isHeld()); |
| 77 | ASSERT(m_channel.isEmpty()); |
| 78 | } |
| 79 | |
| 80 | // =========================== |
| 81 | // Methods for the main thread |
| 82 | // =========================== |
| 83 | |
| 84 | // Starts an abort process. |
| 85 | // |
| 86 | // Tasks already queued will be discarded. |
| 87 | // |
| 88 | // Until finishAborting is called, all present and future calls to enqueueTaskAndWait() will immediately |
| 89 | // return an empty optional. |
| 90 | // |
| 91 | // This method is idempotent. |
| 92 | void startAborting() |
| 93 | { |
| 94 | ASSERT(isMainThread()); |
| 95 | |
| 96 | { |
| 97 | LockHolder lockHolder(m_mutex); |
| 98 | m_aborting = true; |
| 99 | cancelAllTasks(); |
| 100 | } |
| 101 | m_abortedOrResponseSet.notifyAll(); |
| 102 | } |
| 103 | |
| 104 | // Declares the previous abort finished. |
| 105 | // |
| 106 | // In order to avoid race conditions the background threads must be unable to post tasks at this point. |
| 107 | void finishAborting() |
| 108 | { |
| 109 | ASSERT(isMainThread()); |
| 110 | |
| 111 | LockHolder lockHolder(m_mutex); |
| 112 | ASSERT(m_aborting); |
| 113 | m_aborting = false; |
| 114 | } |
| 115 | |
| 116 | // ================================== |
| 117 | // Methods for the background threads |
| 118 | // ================================== |
| 119 | |
| 120 | // Enqueue a task to be run on the main thread. The task may be cancelled if an abort starts before it's |
| 121 | // handled. |
| 122 | void enqueueTask(WTF::Function<void()>&& mainThreadTaskHandler) |
| 123 | { |
| 124 | ASSERT(!isMainThread()); |
| 125 | |
| 126 | LockHolder lockHolder(m_mutex); |
| 127 | if (m_aborting) |
| 128 | return; |
| 129 | |
| 130 | postTask(WTFMove(mainThreadTaskHandler)); |
| 131 | } |
| 132 | |
| 133 | // Enqueue a task to be run on the main thread and wait for it to return. The return value of the task is |
| 134 | // forwarded to the background thread, wrapped in an optional. |
| 135 | // |
| 136 | // If we are aborting, the call finishes immediately, returning an empty optional. |
| 137 | // |
| 138 | // It is allowed for the main thread task handler to abort the AbortableTaskQueue. In that case, the return |
| 139 | // value is discarded and the caller receives an empty optional. |
| 140 | template<typename R> |
| 141 | Optional<R> enqueueTaskAndWait(WTF::Function<R()>&& mainThreadTaskHandler) |
| 142 | { |
| 143 | // Don't deadlock the main thread with itself. |
| 144 | ASSERT(!isMainThread()); |
| 145 | |
| 146 | LockHolder lockHolder(m_mutex); |
| 147 | if (m_aborting) |
| 148 | return WTF::nullopt; |
| 149 | |
| 150 | Optional<R> response = WTF::nullopt; |
| 151 | postTask([this, &response, &mainThreadTaskHandler]() { |
| 152 | R responseValue = mainThreadTaskHandler(); |
| 153 | LockHolder lockHolder(m_mutex); |
| 154 | if (!m_aborting) |
| 155 | response = WTFMove(responseValue); |
| 156 | m_abortedOrResponseSet.notifyAll(); |
| 157 | }); |
| 158 | m_abortedOrResponseSet.wait(m_mutex, [this, &response]() { |
| 159 | return m_aborting || response; |
| 160 | }); |
| 161 | return response; |
| 162 | } |
| 163 | |
| 164 | // This is class is provided for convenience when you want to use enqueueTaskAndWait() but |
| 165 | // you don't need any particular data from the main thread in return and just knowing that it finished |
| 166 | // running the handler function is enough. |
| 167 | class Void { }; |
| 168 | |
| 169 | private: |
| 170 | // Protected state: |
| 171 | // Main thread: read-write. Writes must be made with the lock. |
| 172 | // Background threads: read only. Reads must be made with the lock. |
| 173 | class Task : public ThreadSafeRefCounted<Task> { |
| 174 | WTF_MAKE_NONCOPYABLE(Task); |
| 175 | WTF_MAKE_FAST_ALLOCATED(Task); |
| 176 | public: |
| 177 | static Ref<Task> create(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback) |
| 178 | { |
| 179 | return adoptRef(*new Task(taskQueue, WTFMove(taskCallback))); |
| 180 | } |
| 181 | |
| 182 | bool isCancelled() const |
| 183 | { |
| 184 | return !m_taskQueue; |
| 185 | } |
| 186 | |
| 187 | void cancel() |
| 188 | { |
| 189 | ASSERT(!isCancelled()); |
| 190 | m_taskCallback = nullptr; |
| 191 | m_taskQueue = nullptr; |
| 192 | } |
| 193 | |
| 194 | void dispatch() |
| 195 | { |
| 196 | ASSERT(isMainThread()); |
| 197 | if (isCancelled()) |
| 198 | return; |
| 199 | |
| 200 | LockHolder lock(m_taskQueue->m_mutex); |
| 201 | ASSERT(this == m_taskQueue->m_channel.first().ptr()); |
| 202 | m_taskQueue->m_channel.removeFirst(); |
| 203 | lock.unlockEarly(); |
| 204 | m_taskCallback(); |
| 205 | } |
| 206 | |
| 207 | private: |
| 208 | AbortableTaskQueue* m_taskQueue; |
| 209 | WTF::Function<void()> m_taskCallback; |
| 210 | |
| 211 | Task(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback) |
| 212 | : m_taskQueue(taskQueue), m_taskCallback(WTFMove(taskCallback)) |
| 213 | { } |
| 214 | }; |
| 215 | |
| 216 | void postTask(WTF::Function<void()>&& callback) |
| 217 | { |
| 218 | ASSERT(m_mutex.isHeld()); |
| 219 | Ref<Task> task = Task::create(this, WTFMove(callback)); |
| 220 | m_channel.append(task.copyRef()); |
| 221 | RunLoop::main().dispatch([task = WTFMove(task)]() { task->dispatch(); }); |
| 222 | } |
| 223 | |
| 224 | void cancelAllTasks() |
| 225 | { |
| 226 | ASSERT(isMainThread()); |
| 227 | ASSERT(m_mutex.isHeld()); |
| 228 | for (Ref<Task>& task : m_channel) |
| 229 | task->cancel(); |
| 230 | m_channel.clear(); |
| 231 | } |
| 232 | |
| 233 | bool m_aborting { false }; |
| 234 | Lock m_mutex; |
| 235 | Condition m_abortedOrResponseSet; |
| 236 | WTF::Deque<Ref<Task>> m_channel; |
| 237 | }; |
| 238 | |
| 239 | } // namespace WebCore |
| 240 | |