1 | /* |
2 | * Copyright (C) 2018 Igalia, S.L. |
3 | * Copyright (C) 2018 Metrological Group B.V. |
4 | * |
5 | * This library is free software; you can redistribute it and/or |
6 | * modify it under the terms of the GNU Library General Public |
7 | * License as published by the Free Software Foundation; either |
8 | * version 2 of the License, or (at your option) any later version. |
9 | * |
10 | * This library is distributed in the hope that it will be useful, |
11 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
13 | * Library General Public License for more details. |
14 | * |
15 | * You should have received a copy of the GNU Library General Public License |
16 | * aint with this library; see the file COPYING.LIB. If not, write to |
17 | * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, |
18 | * Boston, MA 02110-1301, USA. |
19 | */ |
20 | |
21 | #pragma once |
22 | |
23 | #include <wtf/Condition.h> |
24 | #include <wtf/Deque.h> |
25 | #include <wtf/Function.h> |
26 | #include <wtf/Lock.h> |
27 | #include <wtf/RunLoop.h> |
28 | #include <wtf/StdLibExtras.h> |
29 | |
30 | namespace WebCore { |
31 | |
32 | /* AbortableTaskQueue is a high-level synchronization object for cases where abortable work is done in |
33 | * background thread(s) that sometimes needs to post tasks to the main thread. |
34 | * |
35 | * The tasks posted by the background thread(s) to the main thread may be asynchronous, using enqueueTask(), |
36 | * which returns immediately; or synchronous, using enqueueTaskAndWait(), which blocks the calling |
37 | * background thread until the task is run by the main thread (possibly returning a value). |
38 | * |
39 | * What makes AbortableTaskQueue different from other task queueing mechanisms is that it provides a two-phase |
40 | * protocol for aborting the work in the background thread in presence of queued tasks without deadlocks or |
41 | * late notification bugs. |
42 | * |
43 | * Without a two-phase design deadlocks would occur when attempting an abort if a background thread was |
44 | * blocked in a synchronous task and needed to return from there for the abort to be handled. Also, without |
45 | * a design like this, tasks already enqueued at that point or soon thereafter until the abort is complete |
46 | * would still be handled by the main thread, even though we don't want to anymore. |
47 | * |
48 | * Aborting background processing with AbortableTaskQueue is a several step process: |
49 | * |
50 | * 1. Call abortableTaskQueue.startAborting() -- This will make any current or future (until further notice) |
51 | * synchronous tasks fail immediately, so that we don't deadlock in the next step. Also, tasks of any kind |
52 | * already enqueued will not be run. |
53 | * |
54 | * 2. Send the abort signal to the background threads. This is completely application specific. For instance, |
55 | * in the AppendPipeline case you would flush or reset the GStreamer pipeline here. Wait until all the |
56 | * background threads have finished aborting. |
57 | * |
58 | * 3. Call abortableTaskQueue.finishAborting() -- This will allow new tasks queued from this point on to be |
59 | * handled just as before the abort was made. |
60 | * |
61 | * 4. After this, the background thread(s) can be put to work again safely. |
62 | * |
63 | * This class is used for handling demuxer events in AppendPipeline, taking into account demuxing can be |
64 | * aborted at any moment if SourceBuffer.abort() is called or the SourceBuffer is destroyed. */ |
65 | class AbortableTaskQueue final { |
66 | WTF_MAKE_NONCOPYABLE(AbortableTaskQueue); |
67 | public: |
68 | AbortableTaskQueue() |
69 | { |
70 | ASSERT(isMainThread()); |
71 | } |
72 | |
73 | ~AbortableTaskQueue() |
74 | { |
75 | ASSERT(isMainThread()); |
76 | ASSERT(!m_mutex.isHeld()); |
77 | ASSERT(m_channel.isEmpty()); |
78 | } |
79 | |
80 | // =========================== |
81 | // Methods for the main thread |
82 | // =========================== |
83 | |
84 | // Starts an abort process. |
85 | // |
86 | // Tasks already queued will be discarded. |
87 | // |
88 | // Until finishAborting is called, all present and future calls to enqueueTaskAndWait() will immediately |
89 | // return an empty optional. |
90 | // |
91 | // This method is idempotent. |
92 | void startAborting() |
93 | { |
94 | ASSERT(isMainThread()); |
95 | |
96 | { |
97 | LockHolder lockHolder(m_mutex); |
98 | m_aborting = true; |
99 | cancelAllTasks(); |
100 | } |
101 | m_abortedOrResponseSet.notifyAll(); |
102 | } |
103 | |
104 | // Declares the previous abort finished. |
105 | // |
106 | // In order to avoid race conditions the background threads must be unable to post tasks at this point. |
107 | void finishAborting() |
108 | { |
109 | ASSERT(isMainThread()); |
110 | |
111 | LockHolder lockHolder(m_mutex); |
112 | ASSERT(m_aborting); |
113 | m_aborting = false; |
114 | } |
115 | |
116 | // ================================== |
117 | // Methods for the background threads |
118 | // ================================== |
119 | |
120 | // Enqueue a task to be run on the main thread. The task may be cancelled if an abort starts before it's |
121 | // handled. |
122 | void enqueueTask(WTF::Function<void()>&& mainThreadTaskHandler) |
123 | { |
124 | ASSERT(!isMainThread()); |
125 | |
126 | LockHolder lockHolder(m_mutex); |
127 | if (m_aborting) |
128 | return; |
129 | |
130 | postTask(WTFMove(mainThreadTaskHandler)); |
131 | } |
132 | |
133 | // Enqueue a task to be run on the main thread and wait for it to return. The return value of the task is |
134 | // forwarded to the background thread, wrapped in an optional. |
135 | // |
136 | // If we are aborting, the call finishes immediately, returning an empty optional. |
137 | // |
138 | // It is allowed for the main thread task handler to abort the AbortableTaskQueue. In that case, the return |
139 | // value is discarded and the caller receives an empty optional. |
140 | template<typename R> |
141 | Optional<R> enqueueTaskAndWait(WTF::Function<R()>&& mainThreadTaskHandler) |
142 | { |
143 | // Don't deadlock the main thread with itself. |
144 | ASSERT(!isMainThread()); |
145 | |
146 | LockHolder lockHolder(m_mutex); |
147 | if (m_aborting) |
148 | return WTF::nullopt; |
149 | |
150 | Optional<R> response = WTF::nullopt; |
151 | postTask([this, &response, &mainThreadTaskHandler]() { |
152 | R responseValue = mainThreadTaskHandler(); |
153 | LockHolder lockHolder(m_mutex); |
154 | if (!m_aborting) |
155 | response = WTFMove(responseValue); |
156 | m_abortedOrResponseSet.notifyAll(); |
157 | }); |
158 | m_abortedOrResponseSet.wait(m_mutex, [this, &response]() { |
159 | return m_aborting || response; |
160 | }); |
161 | return response; |
162 | } |
163 | |
164 | // This is class is provided for convenience when you want to use enqueueTaskAndWait() but |
165 | // you don't need any particular data from the main thread in return and just knowing that it finished |
166 | // running the handler function is enough. |
167 | class Void { }; |
168 | |
169 | private: |
170 | // Protected state: |
171 | // Main thread: read-write. Writes must be made with the lock. |
172 | // Background threads: read only. Reads must be made with the lock. |
173 | class Task : public ThreadSafeRefCounted<Task> { |
174 | WTF_MAKE_NONCOPYABLE(Task); |
175 | WTF_MAKE_FAST_ALLOCATED(Task); |
176 | public: |
177 | static Ref<Task> create(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback) |
178 | { |
179 | return adoptRef(*new Task(taskQueue, WTFMove(taskCallback))); |
180 | } |
181 | |
182 | bool isCancelled() const |
183 | { |
184 | return !m_taskQueue; |
185 | } |
186 | |
187 | void cancel() |
188 | { |
189 | ASSERT(!isCancelled()); |
190 | m_taskCallback = nullptr; |
191 | m_taskQueue = nullptr; |
192 | } |
193 | |
194 | void dispatch() |
195 | { |
196 | ASSERT(isMainThread()); |
197 | if (isCancelled()) |
198 | return; |
199 | |
200 | LockHolder lock(m_taskQueue->m_mutex); |
201 | ASSERT(this == m_taskQueue->m_channel.first().ptr()); |
202 | m_taskQueue->m_channel.removeFirst(); |
203 | lock.unlockEarly(); |
204 | m_taskCallback(); |
205 | } |
206 | |
207 | private: |
208 | AbortableTaskQueue* m_taskQueue; |
209 | WTF::Function<void()> m_taskCallback; |
210 | |
211 | Task(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback) |
212 | : m_taskQueue(taskQueue), m_taskCallback(WTFMove(taskCallback)) |
213 | { } |
214 | }; |
215 | |
216 | void postTask(WTF::Function<void()>&& callback) |
217 | { |
218 | ASSERT(m_mutex.isHeld()); |
219 | Ref<Task> task = Task::create(this, WTFMove(callback)); |
220 | m_channel.append(task.copyRef()); |
221 | RunLoop::main().dispatch([task = WTFMove(task)]() { task->dispatch(); }); |
222 | } |
223 | |
224 | void cancelAllTasks() |
225 | { |
226 | ASSERT(isMainThread()); |
227 | ASSERT(m_mutex.isHeld()); |
228 | for (Ref<Task>& task : m_channel) |
229 | task->cancel(); |
230 | m_channel.clear(); |
231 | } |
232 | |
233 | bool m_aborting { false }; |
234 | Lock m_mutex; |
235 | Condition m_abortedOrResponseSet; |
236 | WTF::Deque<Ref<Task>> m_channel; |
237 | }; |
238 | |
239 | } // namespace WebCore |
240 | |