1/*
2 * Copyright (C) 2016, 2017 Metrological Group B.V.
3 * Copyright (C) 2016, 2017 Igalia S.L
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Library General Public
7 * License as published by the Free Software Foundation; either
8 * version 2 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Library General Public License for more details.
14 *
15 * You should have received a copy of the GNU Library General Public License
16 * aint with this library; see the file COPYING.LIB. If not, write to
17 * the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 * Boston, MA 02110-1301, USA.
19 */
20
21#include "config.h"
22#include "AppendPipeline.h"
23
24#if ENABLE(VIDEO) && USE(GSTREAMER) && ENABLE(MEDIA_SOURCE)
25
26#include "AudioTrackPrivateGStreamer.h"
27#include "GStreamerCommon.h"
28#include "GStreamerEMEUtilities.h"
29#include "GStreamerMediaDescription.h"
30#include "GStreamerRegistryScannerMSE.h"
31#include "MediaSampleGStreamer.h"
32#include "InbandTextTrackPrivateGStreamer.h"
33#include "MediaDescription.h"
34#include "SourceBufferPrivateGStreamer.h"
35#include "VideoTrackPrivateGStreamer.h"
36#include <functional>
37#include <gst/app/gstappsink.h>
38#include <gst/app/gstappsrc.h>
39#include <gst/gst.h>
40#include <gst/pbutils/pbutils.h>
41#include <gst/video/video.h>
42#include <wtf/Condition.h>
43#include <wtf/glib/GLibUtilities.h>
44#include <wtf/glib/RunLoopSourcePriority.h>
45#include <wtf/text/StringConcatenateNumbers.h>
46
47GST_DEBUG_CATEGORY_EXTERN(webkit_mse_debug);
48#define GST_CAT_DEFAULT webkit_mse_debug
49
50namespace WebCore {
51
52GType AppendPipeline::s_endOfAppendMetaType = 0;
53const GstMetaInfo* AppendPipeline::s_webKitEndOfAppendMetaInfo = nullptr;
54std::once_flag AppendPipeline::s_staticInitializationFlag;
55
56struct EndOfAppendMeta {
57 GstMeta base;
58 static gboolean init(GstMeta*, void*, GstBuffer*) { return TRUE; }
59 static gboolean transform(GstBuffer*, GstMeta*, GstBuffer*, GQuark, void*) { g_return_val_if_reached(FALSE); }
60 static void free(GstMeta*, GstBuffer*) { }
61};
62
63void AppendPipeline::staticInitialization()
64{
65 ASSERT(isMainThread());
66
67 const char* tags[] = { nullptr };
68 s_endOfAppendMetaType = gst_meta_api_type_register("WebKitEndOfAppendMetaAPI", tags);
69 s_webKitEndOfAppendMetaInfo = gst_meta_register(s_endOfAppendMetaType, "WebKitEndOfAppendMeta", sizeof(EndOfAppendMeta), EndOfAppendMeta::init, EndOfAppendMeta::free, EndOfAppendMeta::transform);
70}
71
72#if !LOG_DISABLED
73static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
74#endif
75
76#if ENABLE(ENCRYPTED_MEDIA)
77static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo*, struct PadProbeInformation*);
78#endif
79
80static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo*, gpointer);
81
82static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo*, void*);
83
84// Wrapper for gst_element_set_state() that emits a critical if the state change fails or is not synchronous.
85static void assertedElementSetState(GstElement* element, GstState desiredState)
86{
87 GstState oldState;
88 gst_element_get_state(element, &oldState, nullptr, 0);
89
90 GstStateChangeReturn result = gst_element_set_state(element, desiredState);
91
92 GstState newState;
93 gst_element_get_state(element, &newState, nullptr, 0);
94
95 if (desiredState != newState || result != GST_STATE_CHANGE_SUCCESS) {
96 GST_ERROR("AppendPipeline state change failed (returned %d): %" GST_PTR_FORMAT " %d -> %d (expected %d)",
97 static_cast<int>(result), element, static_cast<int>(oldState), static_cast<int>(newState), static_cast<int>(desiredState));
98 ASSERT_NOT_REACHED();
99 }
100}
101
102AppendPipeline::AppendPipeline(Ref<MediaSourceClientGStreamerMSE> mediaSourceClient, Ref<SourceBufferPrivateGStreamer> sourceBufferPrivate, MediaPlayerPrivateGStreamerMSE& playerPrivate)
103 : m_mediaSourceClient(mediaSourceClient.get())
104 , m_sourceBufferPrivate(sourceBufferPrivate.get())
105 , m_playerPrivate(&playerPrivate)
106 , m_id(0)
107 , m_wasBusAlreadyNotifiedOfAvailableSamples(false)
108 , m_streamType(Unknown)
109{
110 ASSERT(isMainThread());
111 std::call_once(s_staticInitializationFlag, AppendPipeline::staticInitialization);
112
113 GST_TRACE("Creating AppendPipeline (%p)", this);
114
115 // FIXME: give a name to the pipeline, maybe related with the track it's managing.
116 // The track name is still unknown at this time, though.
117 static size_t appendPipelineCount = 0;
118 String pipelineName = makeString("append-pipeline-",
119 m_sourceBufferPrivate->type().containerType().replace("/", "-"), '-', appendPipelineCount++);
120 m_pipeline = gst_pipeline_new(pipelineName.utf8().data());
121
122 m_bus = adoptGRef(gst_pipeline_get_bus(GST_PIPELINE(m_pipeline.get())));
123 gst_bus_add_signal_watch_full(m_bus.get(), RunLoopSourcePriority::RunLoopDispatcher);
124 gst_bus_enable_sync_message_emission(m_bus.get());
125
126 g_signal_connect(m_bus.get(), "sync-message::error", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
127 appendPipeline->handleErrorSyncMessage(message);
128 }), this);
129 g_signal_connect(m_bus.get(), "sync-message::need-context", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
130 appendPipeline->handleNeedContextSyncMessage(message);
131 }), this);
132 g_signal_connect(m_bus.get(), "message::state-changed", G_CALLBACK(+[](GstBus*, GstMessage* message, AppendPipeline* appendPipeline) {
133 appendPipeline->handleStateChangeMessage(message);
134 }), this);
135
136 // We assign the created instances here instead of adoptRef() because gst_bin_add_many()
137 // below will already take the initial reference and we need an additional one for us.
138 m_appsrc = gst_element_factory_make("appsrc", nullptr);
139
140 GRefPtr<GstPad> appsrcPad = adoptGRef(gst_element_get_static_pad(m_appsrc.get(), "src"));
141 gst_pad_add_probe(appsrcPad.get(), GST_PAD_PROBE_TYPE_BUFFER, [](GstPad*, GstPadProbeInfo* padProbeInfo, void* userData) {
142 return static_cast<AppendPipeline*>(userData)->appsrcEndOfAppendCheckerProbe(padProbeInfo);
143 }, this, nullptr);
144
145 const String& type = m_sourceBufferPrivate->type().containerType();
146 GST_DEBUG("SourceBuffer containerType: %s", type.utf8().data());
147 if (type.endsWith("mp4") || type.endsWith("aac"))
148 m_demux = gst_element_factory_make("qtdemux", nullptr);
149 else if (type.endsWith("webm"))
150 m_demux = gst_element_factory_make("matroskademux", nullptr);
151 else
152 ASSERT_NOT_REACHED();
153
154 m_appsink = gst_element_factory_make("appsink", nullptr);
155
156 gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), TRUE);
157 gst_base_sink_set_sync(GST_BASE_SINK(m_appsink.get()), FALSE);
158 gst_base_sink_set_async_enabled(GST_BASE_SINK(m_appsink.get()), FALSE); // No prerolls, no async state changes.
159 gst_base_sink_set_drop_out_of_segment(GST_BASE_SINK(m_appsink.get()), FALSE);
160 gst_base_sink_set_last_sample_enabled(GST_BASE_SINK(m_appsink.get()), FALSE);
161
162 GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
163 g_signal_connect(appsinkPad.get(), "notify::caps", G_CALLBACK(+[](GObject*, GParamSpec*, AppendPipeline* appendPipeline) {
164 if (isMainThread()) {
165 // When changing the pipeline state down to READY the demuxer is unlinked and this triggers a caps notification
166 // because the appsink loses its previously negotiated caps. We are not interested in these unnegotiated caps.
167#ifndef NDEBUG
168 GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(appendPipeline->m_appsink.get(), "sink"));
169 GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
170 ASSERT(!caps);
171#endif
172 return;
173 }
174
175 // The streaming thread has just received a new caps and is about to let samples using the
176 // new caps flow. Let's block it until the main thread has consumed the samples with the old
177 // caps and has processed the caps change.
178 appendPipeline->m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([appendPipeline]() {
179 appendPipeline->appsinkCapsChanged();
180 return AbortableTaskQueue::Void();
181 });
182 }), this);
183
184#if !LOG_DISABLED
185 GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
186 m_demuxerDataEnteringPadProbeInformation.appendPipeline = this;
187 m_demuxerDataEnteringPadProbeInformation.description = "demuxer data entering";
188 m_demuxerDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(demuxerPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_demuxerDataEnteringPadProbeInformation, nullptr);
189 m_appsinkDataEnteringPadProbeInformation.appendPipeline = this;
190 m_appsinkDataEnteringPadProbeInformation.description = "appsink data entering";
191 m_appsinkDataEnteringPadProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelinePadProbeDebugInformation), &m_appsinkDataEnteringPadProbeInformation, nullptr);
192#endif
193
194#if ENABLE(ENCRYPTED_MEDIA)
195 m_appsinkPadEventProbeInformation.appendPipeline = this;
196 m_appsinkPadEventProbeInformation.description = "appsink event probe";
197 m_appsinkPadEventProbeInformation.probeId = gst_pad_add_probe(appsinkPad.get(), GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, reinterpret_cast<GstPadProbeCallback>(appendPipelineAppsinkPadEventProbe), &m_appsinkPadEventProbeInformation, nullptr);
198#endif
199
200 // These signals won't be connected outside of the lifetime of "this".
201 g_signal_connect(m_demux.get(), "pad-added", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
202 appendPipeline->connectDemuxerSrcPadToAppsinkFromStreamingThread(demuxerSrcPad);
203 }), this);
204 g_signal_connect(m_demux.get(), "pad-removed", G_CALLBACK(+[](GstElement*, GstPad* demuxerSrcPad, AppendPipeline* appendPipeline) {
205 appendPipeline->disconnectDemuxerSrcPadFromAppsinkFromAnyThread(demuxerSrcPad);
206 }), this);
207 g_signal_connect(m_demux.get(), "no-more-pads", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
208 ASSERT(!isMainThread());
209 GST_DEBUG("Posting no-more-pads task to main thread");
210 appendPipeline->m_taskQueue.enqueueTask([appendPipeline]() {
211 appendPipeline->didReceiveInitializationSegment();
212 });
213 }), this);
214 g_signal_connect(m_appsink.get(), "new-sample", G_CALLBACK(+[](GstElement* appsink, AppendPipeline* appendPipeline) -> GstFlowReturn {
215 appendPipeline->handleAppsinkNewSampleFromStreamingThread(appsink);
216 return GST_FLOW_OK;
217 }), this);
218 g_signal_connect(m_appsink.get(), "eos", G_CALLBACK(+[](GstElement*, AppendPipeline* appendPipeline) {
219 // basesrc will emit an EOS after it has received a GST_FLOW_ERROR. That's the only case we are expecting.
220 if (!appendPipeline->m_errorReceived) {
221 GST_ERROR("Unexpected appsink EOS in AppendPipeline");
222 ASSERT_NOT_REACHED();
223 }
224 }), this);
225
226 // Add_many will take ownership of a reference. That's why we used an assignment before.
227 gst_bin_add_many(GST_BIN(m_pipeline.get()), m_appsrc.get(), m_demux.get(), nullptr);
228 gst_element_link(m_appsrc.get(), m_demux.get());
229
230 assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
231}
232
233AppendPipeline::~AppendPipeline()
234{
235 GST_DEBUG_OBJECT(m_pipeline.get(), "Destructing AppendPipeline (%p)", this);
236 ASSERT(isMainThread());
237
238 // Forget all pending tasks and unblock the streaming thread if it was blocked.
239 m_taskQueue.startAborting();
240
241 // Disconnect all synchronous event handlers and probes susceptible of firing from the main thread
242 // when changing the pipeline state.
243
244 if (m_pipeline) {
245 ASSERT(m_bus);
246 g_signal_handlers_disconnect_by_data(m_bus.get(), this);
247 gst_bus_disable_sync_message_emission(m_bus.get());
248 gst_bus_remove_signal_watch(m_bus.get());
249 }
250
251 if (m_appsrc)
252 g_signal_handlers_disconnect_by_data(m_appsrc.get(), this);
253
254 if (m_demux) {
255#if !LOG_DISABLED
256 GRefPtr<GstPad> demuxerPad = adoptGRef(gst_element_get_static_pad(m_demux.get(), "sink"));
257 gst_pad_remove_probe(demuxerPad.get(), m_demuxerDataEnteringPadProbeInformation.probeId);
258#endif
259
260 g_signal_handlers_disconnect_by_data(m_demux.get(), this);
261 }
262
263 if (m_appsink) {
264 GRefPtr<GstPad> appsinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
265 g_signal_handlers_disconnect_by_data(appsinkPad.get(), this);
266 g_signal_handlers_disconnect_by_data(m_appsink.get(), this);
267
268#if !LOG_DISABLED
269 gst_pad_remove_probe(appsinkPad.get(), m_appsinkDataEnteringPadProbeInformation.probeId);
270#endif
271
272#if ENABLE(ENCRYPTED_MEDIA)
273 gst_pad_remove_probe(appsinkPad.get(), m_appsinkPadEventProbeInformation.probeId);
274#endif
275 }
276
277 // We can tear down the pipeline safely now.
278 if (m_pipeline)
279 gst_element_set_state(m_pipeline.get(), GST_STATE_NULL);
280}
281
282void AppendPipeline::handleErrorSyncMessage(GstMessage* message)
283{
284 ASSERT(!isMainThread());
285 GST_WARNING_OBJECT(m_pipeline.get(), "Demuxing error: %" GST_PTR_FORMAT, message);
286 // Notify the main thread that the append has a decode error.
287 auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this]() {
288 m_errorReceived = true;
289 // appendParsingFailed() will cause resetParserState() to be called.
290 m_sourceBufferPrivate->appendParsingFailed();
291 return AbortableTaskQueue::Void();
292 });
293 // The streaming thread has now been unblocked because we are aborting in the main thread.
294 ASSERT(!response);
295}
296
297GstPadProbeReturn AppendPipeline::appsrcEndOfAppendCheckerProbe(GstPadProbeInfo* padProbeInfo)
298{
299 ASSERT(!isMainThread());
300 m_streamingThread = &WTF::Thread::current();
301
302 GstBuffer* buffer = GST_BUFFER(padProbeInfo->data);
303 ASSERT(GST_IS_BUFFER(buffer));
304
305 GST_TRACE_OBJECT(m_pipeline.get(), "Buffer entered appsrcEndOfAppendCheckerProbe: %" GST_PTR_FORMAT, buffer);
306
307 EndOfAppendMeta* endOfAppendMeta = reinterpret_cast<EndOfAppendMeta*>(gst_buffer_get_meta(buffer, s_endOfAppendMetaType));
308 if (!endOfAppendMeta) {
309 // Normal buffer, nothing to do.
310 return GST_PAD_PROBE_OK;
311 }
312
313 GST_TRACE_OBJECT(m_pipeline.get(), "Posting end-of-append task to the main thread");
314 m_taskQueue.enqueueTask([this]() {
315 handleEndOfAppend();
316 });
317 return GST_PAD_PROBE_DROP;
318}
319
320void AppendPipeline::handleNeedContextSyncMessage(GstMessage* message)
321{
322 const gchar* contextType = nullptr;
323 gst_message_parse_context_type(message, &contextType);
324 GST_TRACE("context type: %s", contextType);
325
326 // MediaPlayerPrivateGStreamerBase will take care of setting up encryption.
327 m_playerPrivate->handleSyncMessage(message);
328}
329
330void AppendPipeline::handleStateChangeMessage(GstMessage* message)
331{
332 ASSERT(isMainThread());
333
334 if (GST_MESSAGE_SRC(message) == reinterpret_cast<GstObject*>(m_pipeline.get())) {
335 GstState currentState, newState;
336 gst_message_parse_state_changed(message, &currentState, &newState, nullptr);
337 CString sourceBufferType = String(m_sourceBufferPrivate->type().raw())
338 .replace("/", "_").replace(" ", "_")
339 .replace("\"", "").replace("\'", "").utf8();
340 CString dotFileName = makeString("webkit-append-",
341 sourceBufferType.data(), '-',
342 gst_element_state_get_name(currentState), '_',
343 gst_element_state_get_name(newState)).utf8();
344 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.data());
345 }
346}
347
348gint AppendPipeline::id()
349{
350 ASSERT(isMainThread());
351
352 if (m_id)
353 return m_id;
354
355 static gint s_totalAudio = 0;
356 static gint s_totalVideo = 0;
357 static gint s_totalText = 0;
358
359 switch (m_streamType) {
360 case Audio:
361 m_id = ++s_totalAudio;
362 break;
363 case Video:
364 m_id = ++s_totalVideo;
365 break;
366 case Text:
367 m_id = ++s_totalText;
368 break;
369 case Unknown:
370 case Invalid:
371 GST_ERROR("Trying to get id for a pipeline of Unknown/Invalid type");
372 ASSERT_NOT_REACHED();
373 break;
374 }
375
376 GST_DEBUG("streamType=%d, id=%d", static_cast<int>(m_streamType), m_id);
377
378 return m_id;
379}
380
381void AppendPipeline::parseDemuxerSrcPadCaps(GstCaps* demuxerSrcPadCaps)
382{
383 ASSERT(isMainThread());
384
385 m_demuxerSrcPadCaps = adoptGRef(demuxerSrcPadCaps);
386 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Unknown;
387
388 const char* originalMediaType = capsMediaType(m_demuxerSrcPadCaps.get());
389 auto& gstRegistryScanner = GStreamerRegistryScannerMSE::singleton();
390 if (!gstRegistryScanner.isCodecSupported(originalMediaType)) {
391 m_presentationSize = WebCore::FloatSize();
392 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Invalid;
393 } else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
394 Optional<FloatSize> size = getVideoResolutionFromCaps(m_demuxerSrcPadCaps.get());
395 if (size.hasValue())
396 m_presentationSize = size.value();
397 else
398 m_presentationSize = WebCore::FloatSize();
399
400 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Video;
401 } else {
402 m_presentationSize = WebCore::FloatSize();
403 if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_AUDIO_CAPS_TYPE_PREFIX))
404 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Audio;
405 else if (doCapsHaveType(m_demuxerSrcPadCaps.get(), GST_TEXT_CAPS_TYPE_PREFIX))
406 m_streamType = WebCore::MediaSourceStreamTypeGStreamer::Text;
407 }
408}
409
410void AppendPipeline::appsinkCapsChanged()
411{
412 ASSERT(isMainThread());
413
414 // Consume any pending samples with the previous caps.
415 consumeAppsinkAvailableSamples();
416
417 GRefPtr<GstPad> pad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
418 GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(pad.get()));
419
420 if (!caps)
421 return;
422
423 if (doCapsHaveType(caps.get(), GST_VIDEO_CAPS_TYPE_PREFIX)) {
424 Optional<FloatSize> size = getVideoResolutionFromCaps(caps.get());
425 if (size.hasValue())
426 m_presentationSize = size.value();
427 }
428
429 // This means that we're right after a new track has appeared. Otherwise, it's a caps change inside the same track.
430 bool previousCapsWereNull = !m_appsinkCaps;
431
432 if (m_appsinkCaps != caps) {
433 m_appsinkCaps = WTFMove(caps);
434 m_playerPrivate->trackDetected(this, m_track, previousCapsWereNull);
435 }
436}
437
438void AppendPipeline::handleEndOfAppend()
439{
440 ASSERT(isMainThread());
441 consumeAppsinkAvailableSamples();
442 GST_TRACE_OBJECT(m_pipeline.get(), "Notifying SourceBufferPrivate the append is complete");
443 sourceBufferPrivate()->didReceiveAllPendingSamples();
444}
445
446void AppendPipeline::appsinkNewSample(GRefPtr<GstSample>&& sample)
447{
448 ASSERT(isMainThread());
449
450 if (UNLIKELY(!gst_sample_get_buffer(sample.get()))) {
451 GST_WARNING("Received sample without buffer from appsink.");
452 return;
453 }
454
455 auto mediaSample = WebCore::MediaSampleGStreamer::create(WTFMove(sample), m_presentationSize, trackId());
456
457 GST_TRACE("append: trackId=%s PTS=%s DTS=%s DUR=%s presentationSize=%.0fx%.0f",
458 mediaSample->trackID().string().utf8().data(),
459 mediaSample->presentationTime().toString().utf8().data(),
460 mediaSample->decodeTime().toString().utf8().data(),
461 mediaSample->duration().toString().utf8().data(),
462 mediaSample->presentationSize().width(), mediaSample->presentationSize().height());
463
464 // If we're beyond the duration, ignore this sample.
465 MediaTime duration = m_mediaSourceClient->duration();
466 if (duration.isValid() && !duration.indefiniteTime() && mediaSample->presentationTime() > duration) {
467 GST_DEBUG_OBJECT(m_pipeline.get(), "Detected sample (%s) beyond the duration (%s), discarding", mediaSample->presentationTime().toString().utf8().data(), duration.toString().utf8().data());
468 return;
469 }
470
471 // Add a gap sample if a gap is detected before the first sample.
472 if (mediaSample->decodeTime() == MediaTime::zeroTime() && mediaSample->presentationTime() > MediaTime::zeroTime() && mediaSample->presentationTime() <= MediaTime(1, 10)) {
473 GST_DEBUG("Adding gap offset");
474 mediaSample->applyPtsOffset(MediaTime::zeroTime());
475 }
476
477 m_sourceBufferPrivate->didReceiveSample(mediaSample.get());
478}
479
480void AppendPipeline::didReceiveInitializationSegment()
481{
482 ASSERT(isMainThread());
483
484 WebCore::SourceBufferPrivateClient::InitializationSegment initializationSegment;
485
486 GST_DEBUG("Notifying SourceBuffer for track %s", (m_track) ? m_track->id().string().utf8().data() : nullptr);
487 initializationSegment.duration = m_mediaSourceClient->duration();
488
489 switch (m_streamType) {
490 case Audio: {
491 WebCore::SourceBufferPrivateClient::InitializationSegment::AudioTrackInformation info;
492 info.track = static_cast<AudioTrackPrivateGStreamer*>(m_track.get());
493 info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
494 initializationSegment.audioTracks.append(info);
495 break;
496 }
497 case Video: {
498 WebCore::SourceBufferPrivateClient::InitializationSegment::VideoTrackInformation info;
499 info.track = static_cast<VideoTrackPrivateGStreamer*>(m_track.get());
500 info.description = WebCore::GStreamerMediaDescription::create(m_demuxerSrcPadCaps.get());
501 initializationSegment.videoTracks.append(info);
502 break;
503 }
504 default:
505 GST_ERROR("Unsupported stream type or codec");
506 break;
507 }
508
509 m_sourceBufferPrivate->didReceiveInitializationSegment(initializationSegment);
510}
511
512AtomicString AppendPipeline::trackId()
513{
514 ASSERT(isMainThread());
515
516 if (!m_track)
517 return AtomicString();
518
519 return m_track->id();
520}
521
522void AppendPipeline::consumeAppsinkAvailableSamples()
523{
524 ASSERT(isMainThread());
525
526 GRefPtr<GstSample> sample;
527 int batchedSampleCount = 0;
528 // In some cases each frame increases the duration of the movie.
529 // Batch duration changes so that if we pick 100 of such samples we don't have to run 100 times
530 // layout for the video controls, but only once.
531 m_playerPrivate->blockDurationChanges();
532 while ((sample = adoptGRef(gst_app_sink_try_pull_sample(GST_APP_SINK(m_appsink.get()), 0)))) {
533 appsinkNewSample(WTFMove(sample));
534 batchedSampleCount++;
535 }
536 m_playerPrivate->unblockDurationChanges();
537
538 GST_TRACE_OBJECT(m_pipeline.get(), "batchedSampleCount = %d", batchedSampleCount);
539}
540
541void AppendPipeline::resetParserState()
542{
543 ASSERT(isMainThread());
544 GST_DEBUG_OBJECT(m_pipeline.get(), "Handling resetParserState() in AppendPipeline by resetting the pipeline");
545
546 // FIXME: Implement a flush event-based resetParserState() implementation would allow the initialization segment to
547 // survive, in accordance with the spec.
548
549 // This function restores the GStreamer pipeline to the same state it was when the AppendPipeline constructor
550 // finished. All previously enqueued data is lost and the demuxer is reset, losing all pads and track data.
551
552 // Unlock the streaming thread.
553 m_taskQueue.startAborting();
554
555 // Reset the state of all elements in the pipeline.
556 assertedElementSetState(m_pipeline.get(), GST_STATE_READY);
557
558 // The parser is tear down automatically when the demuxer is reset (see disconnectDemuxerSrcPadFromAppsinkFromAnyThread()).
559 ASSERT(!m_parser);
560
561 // Set the pipeline to PLAYING so that it can be used again.
562 assertedElementSetState(m_pipeline.get(), GST_STATE_PLAYING);
563
564 // All processing related to the previous append has been aborted and the pipeline is idle.
565 // We can listen again to new requests coming from the streaming thread.
566 m_taskQueue.finishAborting();
567
568#if (!(LOG_DISABLED || defined(GST_DISABLE_GST_DEBUG)))
569 {
570 static unsigned i = 0;
571 // This is here for debugging purposes. It does not make sense to have it as class member.
572 WTF::String dotFileName = makeString("reset-pipeline-", ++i);
573 gst_debug_bin_to_dot_file(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, dotFileName.utf8().data());
574 }
575#endif
576}
577
578void AppendPipeline::pushNewBuffer(GRefPtr<GstBuffer>&& buffer)
579{
580 GST_TRACE_OBJECT(m_pipeline.get(), "pushing data buffer %" GST_PTR_FORMAT, buffer.get());
581 GstFlowReturn pushDataBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), buffer.leakRef());
582 // Pushing buffers to appsrc can only fail if the appsrc is flushing, in EOS or stopped. Neither of these should
583 // be true at this point.
584 if (pushDataBufferRet != GST_FLOW_OK) {
585 GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push data buffer into appsrc.");
586 ASSERT_NOT_REACHED();
587 }
588
589 // Push an additional empty buffer that marks the end of the append.
590 // This buffer is detected and consumed by appsrcEndOfAppendCheckerProbe(), which uses it to signal the successful
591 // completion of the append.
592 //
593 // This works based on how push mode scheduling works in GStreamer. Note there is a single streaming thread for the
594 // AppendPipeline, and within a stream (the portion of a pipeline covered by the same streaming thread, in this case
595 // the whole pipeline) a buffer is guaranteed not to be processed by downstream until processing of the previous
596 // buffer has completed.
597
598 GstBuffer* endOfAppendBuffer = gst_buffer_new();
599 gst_buffer_add_meta(endOfAppendBuffer, s_webKitEndOfAppendMetaInfo, nullptr);
600
601 GST_TRACE_OBJECT(m_pipeline.get(), "pushing end-of-append buffer %" GST_PTR_FORMAT, endOfAppendBuffer);
602 GstFlowReturn pushEndOfAppendBufferRet = gst_app_src_push_buffer(GST_APP_SRC(m_appsrc.get()), endOfAppendBuffer);
603 if (pushEndOfAppendBufferRet != GST_FLOW_OK) {
604 GST_ERROR_OBJECT(m_pipeline.get(), "Failed to push end-of-append buffer into appsrc.");
605 ASSERT_NOT_REACHED();
606 }
607}
608
609void AppendPipeline::handleAppsinkNewSampleFromStreamingThread(GstElement*)
610{
611 ASSERT(!isMainThread());
612 if (&WTF::Thread::current() != m_streamingThread) {
613 // m_streamingThreadId has been initialized in appsrcEndOfAppendCheckerProbe().
614 // For a buffer to reach the appsink, a buffer must have passed through appsrcEndOfAppendCheckerProbe() first.
615 // This error will only raise if someone modifies the pipeline to include more than one streaming thread or
616 // removes the appsrcEndOfAppendCheckerProbe(). Either way, the end-of-append detection would be broken.
617 // AppendPipeline should have only one streaming thread. Otherwise we can't detect reliably when an appends has
618 // been demuxed completely.;
619 GST_ERROR_OBJECT(m_pipeline.get(), "Appsink received a sample in a different thread than appsrcEndOfAppendCheckerProbe run.");
620 ASSERT_NOT_REACHED();
621 }
622
623 if (!m_wasBusAlreadyNotifiedOfAvailableSamples.test_and_set()) {
624 GST_TRACE("Posting appsink-new-sample task to the main thread");
625 m_taskQueue.enqueueTask([this]() {
626 m_wasBusAlreadyNotifiedOfAvailableSamples.clear();
627 consumeAppsinkAvailableSamples();
628 });
629 }
630}
631
632static GRefPtr<GstElement>
633createOptionalParserForFormat(GstPad* demuxerSrcPad)
634{
635 GRefPtr<GstCaps> padCaps = adoptGRef(gst_pad_get_current_caps(demuxerSrcPad));
636 GstStructure* structure = gst_caps_get_structure(padCaps.get(), 0);
637 const char* mediaType = gst_structure_get_name(structure);
638
639 GUniquePtr<char> demuxerPadName(gst_pad_get_name(demuxerSrcPad));
640 GUniquePtr<char> parserName(g_strdup_printf("%s_parser", demuxerPadName.get()));
641
642 if (!g_strcmp0(mediaType, "audio/x-opus")) {
643 GstElement* opusparse = gst_element_factory_make("opusparse", parserName.get());
644 ASSERT(opusparse);
645 g_return_val_if_fail(opusparse, nullptr);
646 return GRefPtr<GstElement>(opusparse);
647 }
648 if (!g_strcmp0(mediaType, "audio/x-vorbis")) {
649 GstElement* vorbisparse = gst_element_factory_make("vorbisparse", parserName.get());
650 ASSERT(vorbisparse);
651 g_return_val_if_fail(vorbisparse, nullptr);
652 return GRefPtr<GstElement>(vorbisparse);
653 }
654 if (!g_strcmp0(mediaType, "video/x-h264")) {
655 GstElement* h264parse = gst_element_factory_make("h264parse", parserName.get());
656 ASSERT(h264parse);
657 g_return_val_if_fail(h264parse, nullptr);
658 return GRefPtr<GstElement>(h264parse);
659 }
660
661 return nullptr;
662}
663
664void AppendPipeline::connectDemuxerSrcPadToAppsinkFromStreamingThread(GstPad* demuxerSrcPad)
665{
666 ASSERT(!isMainThread());
667
668 GST_DEBUG("connecting to appsink");
669
670 if (m_demux->numsrcpads > 1) {
671 GST_WARNING("Only one stream per SourceBuffer is allowed! Ignoring stream %d by adding a black hole probe.", m_demux->numsrcpads);
672 gulong probeId = gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_BUFFER, reinterpret_cast<GstPadProbeCallback>(appendPipelineDemuxerBlackHolePadProbe), nullptr, nullptr);
673 g_object_set_data(G_OBJECT(demuxerSrcPad), "blackHoleProbeId", GULONG_TO_POINTER(probeId));
674 return;
675 }
676
677 GRefPtr<GstPad> appsinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
678
679 // Only one stream per demuxer is supported.
680 ASSERT(!gst_pad_is_linked(appsinkSinkPad.get()));
681
682 gint64 timeLength = 0;
683 if (gst_element_query_duration(m_demux.get(), GST_FORMAT_TIME, &timeLength)
684 && static_cast<guint64>(timeLength) != GST_CLOCK_TIME_NONE)
685 m_initialDuration = MediaTime(GST_TIME_AS_USECONDS(timeLength), G_USEC_PER_SEC);
686 else
687 m_initialDuration = MediaTime::positiveInfiniteTime();
688
689 GST_DEBUG("Requesting demuxer-connect-to-appsink to main thread");
690 auto response = m_taskQueue.enqueueTaskAndWait<AbortableTaskQueue::Void>([this, demuxerSrcPad]() {
691 connectDemuxerSrcPadToAppsink(demuxerSrcPad);
692 return AbortableTaskQueue::Void();
693 });
694 if (!response) {
695 // The AppendPipeline has been destroyed or aborted before we received a response.
696 return;
697 }
698
699 // Must be done in the thread we were called from (usually streaming thread).
700 bool isData = (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Audio)
701 || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Video)
702 || (m_streamType == WebCore::MediaSourceStreamTypeGStreamer::Text);
703
704 if (isData) {
705 GRefPtr<GstObject> parent = adoptGRef(gst_element_get_parent(m_appsink.get()));
706 if (!parent)
707 gst_bin_add(GST_BIN(m_pipeline.get()), m_appsink.get());
708
709 // Current head of the pipeline being built.
710 GRefPtr<GstPad> currentSrcPad = demuxerSrcPad;
711
712 // Some audio files unhelpfully omit the duration of frames in the container. We need to parse
713 // the contained audio streams in order to know the duration of the frames.
714 // This is known to be an issue with YouTube WebM files containing Opus audio as of YTTV2018.
715 m_parser = createOptionalParserForFormat(currentSrcPad.get());
716 if (m_parser) {
717 gst_bin_add(GST_BIN(m_pipeline.get()), m_parser.get());
718 gst_element_sync_state_with_parent(m_parser.get());
719
720 GRefPtr<GstPad> parserSinkPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "sink"));
721 GRefPtr<GstPad> parserSrcPad = adoptGRef(gst_element_get_static_pad(m_parser.get(), "src"));
722
723 gst_pad_link(currentSrcPad.get(), parserSinkPad.get());
724 currentSrcPad = parserSrcPad;
725 }
726
727 gst_pad_link(currentSrcPad.get(), appsinkSinkPad.get());
728
729 gst_element_sync_state_with_parent(m_appsink.get());
730
731 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "webkit-after-link");
732 }
733}
734
735void AppendPipeline::connectDemuxerSrcPadToAppsink(GstPad* demuxerSrcPad)
736{
737 ASSERT(isMainThread());
738 GST_DEBUG("Connecting to appsink");
739
740 const String& type = m_sourceBufferPrivate->type().containerType();
741 if (type.endsWith("webm"))
742 gst_pad_add_probe(demuxerSrcPad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, matroskademuxForceSegmentStartToEqualZero, nullptr, nullptr);
743
744 GRefPtr<GstPad> sinkSinkPad = adoptGRef(gst_element_get_static_pad(m_appsink.get(), "sink"));
745
746 // Only one stream per demuxer is supported.
747 ASSERT(!gst_pad_is_linked(sinkSinkPad.get()));
748
749 GRefPtr<GstCaps> caps = adoptGRef(gst_pad_get_current_caps(GST_PAD(demuxerSrcPad)));
750
751#ifndef GST_DISABLE_GST_DEBUG
752 {
753 GUniquePtr<gchar> strcaps(gst_caps_to_string(caps.get()));
754 GST_DEBUG("%s", strcaps.get());
755 }
756#endif
757
758 if (m_mediaSourceClient->duration().isInvalid() && m_initialDuration > MediaTime::zeroTime())
759 m_mediaSourceClient->durationChanged(m_initialDuration);
760
761 parseDemuxerSrcPadCaps(gst_caps_ref(caps.get()));
762
763 switch (m_streamType) {
764 case WebCore::MediaSourceStreamTypeGStreamer::Audio:
765 m_track = WebCore::AudioTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
766 break;
767 case WebCore::MediaSourceStreamTypeGStreamer::Video:
768 m_track = WebCore::VideoTrackPrivateGStreamer::create(makeWeakPtr(*m_playerPrivate), id(), sinkSinkPad.get());
769 break;
770 case WebCore::MediaSourceStreamTypeGStreamer::Text:
771 m_track = WebCore::InbandTextTrackPrivateGStreamer::create(id(), sinkSinkPad.get());
772 break;
773 case WebCore::MediaSourceStreamTypeGStreamer::Invalid:
774 GST_WARNING_OBJECT(m_pipeline.get(), "Unsupported track codec: %" GST_PTR_FORMAT, caps.get());
775 // 3.5.7 Initialization Segment Received
776 // 5.1. If the initialization segment contains tracks with codecs the user agent does not support, then run the
777 // append error algorithm and abort these steps.
778
779 // appendParsingFailed() will immediately cause a resetParserState() which will stop demuxing, then the
780 // AppendPipeline will be destroyed.
781 m_sourceBufferPrivate->appendParsingFailed();
782 return;
783 default:
784 GST_WARNING_OBJECT(m_pipeline.get(), "Pad has unknown track type, ignoring: %" GST_PTR_FORMAT, caps.get());
785 break;
786 }
787
788 m_appsinkCaps = WTFMove(caps);
789 m_playerPrivate->trackDetected(this, m_track, true);
790}
791
792void AppendPipeline::disconnectDemuxerSrcPadFromAppsinkFromAnyThread(GstPad*)
793{
794 // Note: This function can be called either from the streaming thread (e.g. if a strange initialization segment with
795 // incompatible tracks is appended and the srcpad disconnected) or -- more usually -- from the main thread, when
796 // a state change is made to bring the demuxer down. (State change operations run in the main thread.)
797 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-before");
798
799 GST_DEBUG("Disconnecting appsink");
800
801 if (m_parser) {
802 assertedElementSetState(m_parser.get(), GST_STATE_NULL);
803 gst_bin_remove(GST_BIN(m_pipeline.get()), m_parser.get());
804 m_parser = nullptr;
805 }
806
807 GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS(GST_BIN(m_pipeline.get()), GST_DEBUG_GRAPH_SHOW_ALL, "pad-removed-after");
808}
809
810#if !LOG_DISABLED
811static GstPadProbeReturn appendPipelinePadProbeDebugInformation(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation* padProbeInformation)
812{
813 ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
814 GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
815 GST_TRACE("%s: buffer of size %" G_GSIZE_FORMAT " going thru", padProbeInformation->description, gst_buffer_get_size(buffer));
816 return GST_PAD_PROBE_OK;
817}
818#endif
819
820#if ENABLE(ENCRYPTED_MEDIA)
821static GstPadProbeReturn appendPipelineAppsinkPadEventProbe(GstPad*, GstPadProbeInfo* info, struct PadProbeInformation *padProbeInformation)
822{
823 ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
824 GstEvent* event = gst_pad_probe_info_get_event(info);
825 GST_DEBUG("Handling event %s on append pipeline appsinkPad", GST_EVENT_TYPE_NAME(event));
826 WebCore::AppendPipeline* appendPipeline = padProbeInformation->appendPipeline;
827
828 switch (GST_EVENT_TYPE(event)) {
829 case GST_EVENT_PROTECTION:
830 if (appendPipeline && appendPipeline->playerPrivate())
831 appendPipeline->playerPrivate()->handleProtectionEvent(event);
832 return GST_PAD_PROBE_DROP;
833 default:
834 break;
835 }
836
837 return GST_PAD_PROBE_OK;
838}
839#endif
840
841static GstPadProbeReturn appendPipelineDemuxerBlackHolePadProbe(GstPad*, GstPadProbeInfo* info, gpointer)
842{
843 ASSERT(GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_BUFFER);
844 GstBuffer* buffer = GST_PAD_PROBE_INFO_BUFFER(info);
845 GST_TRACE("buffer of size %" G_GSIZE_FORMAT " ignored", gst_buffer_get_size(buffer));
846 return GST_PAD_PROBE_DROP;
847}
848
849static GstPadProbeReturn matroskademuxForceSegmentStartToEqualZero(GstPad*, GstPadProbeInfo* info, void*)
850{
851 // matroskademux sets GstSegment.start to the PTS of the first frame.
852 //
853 // This way in the unlikely case a user made a .mkv or .webm file where a certain portion of the movie is skipped
854 // (e.g. by concatenating a MSE initialization segment with any MSE media segment other than the first) and opened
855 // it with a regular player, playback would start immediately. GstSegment.duration is not modified in any case.
856 //
857 // Leaving the usefulness of that feature aside, the fact that it uses GstSegment.start is problematic for MSE.
858 // In MSE is not unusual to process unordered MSE media segments. In this case, a frame may have
859 // PTS <<< GstSegment.start and be discarded by downstream. This happens for instance in elements derived from
860 // audiobasefilter, such as opusparse.
861 //
862 // This probe remedies the problem by setting GstSegment.start to 0 in all cases, not only when the PTS of the first
863 // frame is zero.
864
865 ASSERT(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM);
866 GstEvent* event = static_cast<GstEvent*>(info->data);
867 if (event->type == GST_EVENT_SEGMENT) {
868 GstSegment segment;
869 gst_event_copy_segment(event, &segment);
870
871 segment.start = 0;
872
873 GRefPtr<GstEvent> newEvent = adoptGRef(gst_event_new_segment(&segment));
874 gst_event_replace(reinterpret_cast<GstEvent**>(&info->data), newEvent.get());
875 }
876 return GST_PAD_PROBE_OK;
877}
878
879} // namespace WebCore.
880
881#endif // USE(GSTREAMER)
882