1/*
2 * Copyright (C) 2018 Igalia, S.L.
3 * Copyright (C) 2018 Metrological Group B.V.
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public License
16 * aint with this library; see the file COPYING.LIB. If not, write to
17 * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21#pragma once
22
23#include <wtf/Condition.h>
24#include <wtf/Deque.h>
25#include <wtf/Function.h>
26#include <wtf/Lock.h>
27#include <wtf/RunLoop.h>
28#include <wtf/StdLibExtras.h>
29
30namespace WebCore {
31
32/* AbortableTaskQueue is a high-level synchronization object for cases where abortable work is done in
33 * background thread(s) that sometimes needs to post tasks to the main thread.
34 *
35 * The tasks posted by the background thread(s) to the main thread may be asynchronous, using enqueueTask(),
36 * which returns immediately; or synchronous, using enqueueTaskAndWait(), which blocks the calling
37 * background thread until the task is run by the main thread (possibly returning a value).
38 *
39 * What makes AbortableTaskQueue different from other task queueing mechanisms is that it provides a two-phase
40 * protocol for aborting the work in the background thread in presence of queued tasks without deadlocks or
41 * late notification bugs.
42 *
43 * Without a two-phase design deadlocks would occur when attempting an abort if a background thread was
44 * blocked in a synchronous task and needed to return from there for the abort to be handled. Also, without
45 * a design like this, tasks already enqueued at that point or soon thereafter until the abort is complete
46 * would still be handled by the main thread, even though we don't want to anymore.
47 *
48 * Aborting background processing with AbortableTaskQueue is a several step process:
49 *
50 * 1. Call abortableTaskQueue.startAborting() -- This will make any current or future (until further notice)
51 * synchronous tasks fail immediately, so that we don't deadlock in the next step. Also, tasks of any kind
52 * already enqueued will not be run.
53 *
54 * 2. Send the abort signal to the background threads. This is completely application specific. For instance,
55 * in the AppendPipeline case you would flush or reset the GStreamer pipeline here. Wait until all the
56 * background threads have finished aborting.
57 *
58 * 3. Call abortableTaskQueue.finishAborting() -- This will allow new tasks queued from this point on to be
59 * handled just as before the abort was made.
60 *
61 * 4. After this, the background thread(s) can be put to work again safely.
62 *
63 * This class is used for handling demuxer events in AppendPipeline, taking into account demuxing can be
64 * aborted at any moment if SourceBuffer.abort() is called or the SourceBuffer is destroyed. */
65class AbortableTaskQueue final {
66 WTF_MAKE_NONCOPYABLE(AbortableTaskQueue);
67public:
68 AbortableTaskQueue()
69 {
70 ASSERT(isMainThread());
71 }
72
73 ~AbortableTaskQueue()
74 {
75 ASSERT(isMainThread());
76 ASSERT(!m_mutex.isHeld());
77 ASSERT(m_channel.isEmpty());
78 }
79
80 // ===========================
81 // Methods for the main thread
82 // ===========================
83
84 // Starts an abort process.
85 //
86 // Tasks already queued will be discarded.
87 //
88 // Until finishAborting is called, all present and future calls to enqueueTaskAndWait() will immediately
89 // return an empty optional.
90 //
91 // This method is idempotent.
92 void startAborting()
93 {
94 ASSERT(isMainThread());
95
96 {
97 LockHolder lockHolder(m_mutex);
98 m_aborting = true;
99 cancelAllTasks();
100 }
101 m_abortedOrResponseSet.notifyAll();
102 }
103
104 // Declares the previous abort finished.
105 //
106 // In order to avoid race conditions the background threads must be unable to post tasks at this point.
107 void finishAborting()
108 {
109 ASSERT(isMainThread());
110
111 LockHolder lockHolder(m_mutex);
112 ASSERT(m_aborting);
113 m_aborting = false;
114 }
115
116 // ==================================
117 // Methods for the background threads
118 // ==================================
119
120 // Enqueue a task to be run on the main thread. The task may be cancelled if an abort starts before it's
121 // handled.
122 void enqueueTask(WTF::Function<void()>&& mainThreadTaskHandler)
123 {
124 ASSERT(!isMainThread());
125
126 LockHolder lockHolder(m_mutex);
127 if (m_aborting)
128 return;
129
130 postTask(WTFMove(mainThreadTaskHandler));
131 }
132
133 // Enqueue a task to be run on the main thread and wait for it to return. The return value of the task is
134 // forwarded to the background thread, wrapped in an optional.
135 //
136 // If we are aborting, the call finishes immediately, returning an empty optional.
137 //
138 // It is allowed for the main thread task handler to abort the AbortableTaskQueue. In that case, the return
139 // value is discarded and the caller receives an empty optional.
140 template<typename R>
141 Optional<R> enqueueTaskAndWait(WTF::Function<R()>&& mainThreadTaskHandler)
142 {
143 // Don't deadlock the main thread with itself.
144 ASSERT(!isMainThread());
145
146 LockHolder lockHolder(m_mutex);
147 if (m_aborting)
148 return WTF::nullopt;
149
150 Optional<R> response = WTF::nullopt;
151 postTask([this, &response, &mainThreadTaskHandler]() {
152 R responseValue = mainThreadTaskHandler();
153 LockHolder lockHolder(m_mutex);
154 if (!m_aborting)
155 response = WTFMove(responseValue);
156 m_abortedOrResponseSet.notifyAll();
157 });
158 m_abortedOrResponseSet.wait(m_mutex, [this, &response]() {
159 return m_aborting || response;
160 });
161 return response;
162 }
163
164 // This is class is provided for convenience when you want to use enqueueTaskAndWait() but
165 // you don't need any particular data from the main thread in return and just knowing that it finished
166 // running the handler function is enough.
167 class Void { };
168
169private:
170 // Protected state:
171 // Main thread: read-write. Writes must be made with the lock.
172 // Background threads: read only. Reads must be made with the lock.
173 class Task : public ThreadSafeRefCounted<Task> {
174 WTF_MAKE_NONCOPYABLE(Task);
175 WTF_MAKE_FAST_ALLOCATED(Task);
176 public:
177 static Ref<Task> create(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback)
178 {
179 return adoptRef(*new Task(taskQueue, WTFMove(taskCallback)));
180 }
181
182 bool isCancelled() const
183 {
184 return !m_taskQueue;
185 }
186
187 void cancel()
188 {
189 ASSERT(!isCancelled());
190 m_taskCallback = nullptr;
191 m_taskQueue = nullptr;
192 }
193
194 void dispatch()
195 {
196 ASSERT(isMainThread());
197 if (isCancelled())
198 return;
199
200 LockHolder lock(m_taskQueue->m_mutex);
201 ASSERT(this == m_taskQueue->m_channel.first().ptr());
202 m_taskQueue->m_channel.removeFirst();
203 lock.unlockEarly();
204 m_taskCallback();
205 }
206
207 private:
208 AbortableTaskQueue* m_taskQueue;
209 WTF::Function<void()> m_taskCallback;
210
211 Task(AbortableTaskQueue* taskQueue, WTF::Function<void()>&& taskCallback)
212 : m_taskQueue(taskQueue), m_taskCallback(WTFMove(taskCallback))
213 { }
214 };
215
216 void postTask(WTF::Function<void()>&& callback)
217 {
218 ASSERT(m_mutex.isHeld());
219 Ref<Task> task = Task::create(this, WTFMove(callback));
220 m_channel.append(task.copyRef());
221 RunLoop::main().dispatch([task = WTFMove(task)]() { task->dispatch(); });
222 }
223
224 void cancelAllTasks()
225 {
226 ASSERT(isMainThread());
227 ASSERT(m_mutex.isHeld());
228 for (Ref<Task>& task : m_channel)
229 task->cancel();
230 m_channel.clear();
231 }
232
233 bool m_aborting { false };
234 Lock m_mutex;
235 Condition m_abortedOrResponseSet;
236 WTF::Deque<Ref<Task>> m_channel;
237};
238
239} // namespace WebCore
240