1/*
2 * Copyright (C) 2010-2016 Apple Inc. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * 2. Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 *
13 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
14 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
15 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
16 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
17 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
18 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
19 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
21 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
23 * THE POSSIBILITY OF SUCH DAMAGE.
24 */
25
26#include "config.h"
27#include "Connection.h"
28
29#include "Logging.h"
30#include <memory>
31#include <wtf/HashSet.h>
32#include <wtf/NeverDestroyed.h>
33#include <wtf/RunLoop.h>
34#include <wtf/text/WTFString.h>
35#include <wtf/threads/BinarySemaphore.h>
36
37#if PLATFORM(COCOA)
38#include "MachMessage.h"
39#endif
40
41#if USE(UNIX_DOMAIN_SOCKETS)
42#include "UnixMessage.h"
43#endif
44
45namespace IPC {
46
47#if PLATFORM(COCOA)
48// The IPC connection gets killed if the incoming message queue reaches 50000 messages before the main thread has a chance to dispatch them.
49const size_t maxPendingIncomingMessagesKillingThreshold { 50000 };
50#endif
51
52struct Connection::ReplyHandler {
53 RefPtr<FunctionDispatcher> dispatcher;
54 Function<void (std::unique_ptr<Decoder>)> handler;
55};
56
57struct Connection::WaitForMessageState {
58 WaitForMessageState(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, OptionSet<WaitForOption> waitForOptions)
59 : messageReceiverName(messageReceiverName)
60 , messageName(messageName)
61 , destinationID(destinationID)
62 , waitForOptions(waitForOptions)
63 {
64 }
65
66 StringReference messageReceiverName;
67 StringReference messageName;
68 uint64_t destinationID;
69
70 OptionSet<WaitForOption> waitForOptions;
71 bool messageWaitingInterrupted = false;
72
73 std::unique_ptr<Decoder> decoder;
74};
75
76class Connection::SyncMessageState {
77public:
78 static SyncMessageState& singleton();
79
80 SyncMessageState();
81 ~SyncMessageState() = delete;
82
83 void wakeUpClientRunLoop()
84 {
85 m_waitForSyncReplySemaphore.signal();
86 }
87
88 bool wait(TimeWithDynamicClockType absoluteTime)
89 {
90 return m_waitForSyncReplySemaphore.waitUntil(absoluteTime);
91 }
92
93 // Returns true if this message will be handled on a client thread that is currently
94 // waiting for a reply to a synchronous message.
95 bool processIncomingMessage(Connection&, std::unique_ptr<Decoder>&);
96
97 // Dispatch pending sync messages. if allowedConnection is not null, will only dispatch messages
98 // from that connection and put the other messages back in the queue.
99 void dispatchMessages(Connection* allowedConnection);
100
101private:
102 void dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection&);
103
104 BinarySemaphore m_waitForSyncReplySemaphore;
105
106 // Protects m_didScheduleDispatchMessagesWorkSet and m_messagesToDispatchWhileWaitingForSyncReply.
107 Lock m_mutex;
108
109 // The set of connections for which we've scheduled a call to dispatchMessageAndResetDidScheduleDispatchMessagesForConnection.
110 HashSet<RefPtr<Connection>> m_didScheduleDispatchMessagesWorkSet;
111
112 struct ConnectionAndIncomingMessage {
113 Ref<Connection> connection;
114 std::unique_ptr<Decoder> message;
115 };
116 Vector<ConnectionAndIncomingMessage> m_messagesToDispatchWhileWaitingForSyncReply;
117};
118
119Connection::SyncMessageState& Connection::SyncMessageState::singleton()
120{
121 static std::once_flag onceFlag;
122 static LazyNeverDestroyed<SyncMessageState> syncMessageState;
123
124 std::call_once(onceFlag, [] {
125 syncMessageState.construct();
126 });
127
128 return syncMessageState;
129}
130
131Connection::SyncMessageState::SyncMessageState()
132{
133}
134
135bool Connection::SyncMessageState::processIncomingMessage(Connection& connection, std::unique_ptr<Decoder>& message)
136{
137 if (!message->shouldDispatchMessageWhenWaitingForSyncReply())
138 return false;
139
140 ConnectionAndIncomingMessage connectionAndIncomingMessage { connection, WTFMove(message) };
141
142 {
143 std::lock_guard<Lock> lock(m_mutex);
144
145 if (m_didScheduleDispatchMessagesWorkSet.add(&connection).isNewEntry) {
146 RunLoop::main().dispatch([this, protectedConnection = Ref<Connection>(connection)]() mutable {
147 dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(protectedConnection);
148 });
149 }
150
151 m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(connectionAndIncomingMessage));
152 }
153
154 wakeUpClientRunLoop();
155
156 return true;
157}
158
159void Connection::SyncMessageState::dispatchMessages(Connection* allowedConnection)
160{
161 ASSERT(RunLoop::isMain());
162
163 Vector<ConnectionAndIncomingMessage> messagesToDispatchWhileWaitingForSyncReply;
164
165 {
166 std::lock_guard<Lock> lock(m_mutex);
167 m_messagesToDispatchWhileWaitingForSyncReply.swap(messagesToDispatchWhileWaitingForSyncReply);
168 }
169
170 Vector<ConnectionAndIncomingMessage> messagesToPutBack;
171
172 for (size_t i = 0; i < messagesToDispatchWhileWaitingForSyncReply.size(); ++i) {
173 ConnectionAndIncomingMessage& connectionAndIncomingMessage = messagesToDispatchWhileWaitingForSyncReply[i];
174
175 if (allowedConnection && allowedConnection != connectionAndIncomingMessage.connection.ptr()) {
176 // This incoming message belongs to another connection and we don't want to dispatch it now
177 // so mark it to be put back in the message queue.
178 messagesToPutBack.append(WTFMove(connectionAndIncomingMessage));
179 continue;
180 }
181
182 connectionAndIncomingMessage.connection->dispatchMessage(WTFMove(connectionAndIncomingMessage.message));
183 }
184
185 if (!messagesToPutBack.isEmpty()) {
186 std::lock_guard<Lock> lock(m_mutex);
187
188 for (auto& message : messagesToPutBack)
189 m_messagesToDispatchWhileWaitingForSyncReply.append(WTFMove(message));
190 }
191}
192
193void Connection::SyncMessageState::dispatchMessageAndResetDidScheduleDispatchMessagesForConnection(Connection& connection)
194{
195 {
196 std::lock_guard<Lock> lock(m_mutex);
197 ASSERT(m_didScheduleDispatchMessagesWorkSet.contains(&connection));
198 m_didScheduleDispatchMessagesWorkSet.remove(&connection);
199 }
200
201 dispatchMessages(&connection);
202}
203
204// Represents a sync request for which we're waiting on a reply.
205struct Connection::PendingSyncReply {
206 // The request ID.
207 uint64_t syncRequestID { 0 };
208
209 // The reply decoder, will be null if there was an error processing the sync
210 // message on the other side.
211 std::unique_ptr<Decoder> replyDecoder;
212
213 // Will be set to true once a reply has been received.
214 bool didReceiveReply { false };
215
216 PendingSyncReply() = default;
217
218 explicit PendingSyncReply(uint64_t syncRequestID)
219 : syncRequestID(syncRequestID)
220 {
221 }
222};
223
224Ref<Connection> Connection::createServerConnection(Identifier identifier, Client& client)
225{
226 return adoptRef(*new Connection(identifier, true, client));
227}
228
229Ref<Connection> Connection::createClientConnection(Identifier identifier, Client& client)
230{
231 return adoptRef(*new Connection(identifier, false, client));
232}
233
234static HashMap<IPC::Connection::UniqueID, Connection*>& allConnections()
235{
236 static NeverDestroyed<HashMap<IPC::Connection::UniqueID, Connection*>> map;
237 return map;
238}
239
240static HashMap<uintptr_t, HashMap<uint64_t, CompletionHandler<void(Decoder*)>>>& asyncReplyHandlerMap()
241{
242 static NeverDestroyed<HashMap<uintptr_t, HashMap<uint64_t, CompletionHandler<void(Decoder*)>>>> map;
243 return map.get();
244}
245
246Connection::Connection(Identifier identifier, bool isServer, Client& client)
247 : m_client(client)
248 , m_uniqueID(UniqueID::generate())
249 , m_isServer(isServer)
250 , m_syncRequestID(0)
251 , m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(false)
252 , m_shouldExitOnSyncMessageSendFailure(false)
253 , m_didCloseOnConnectionWorkQueueCallback(0)
254 , m_isConnected(false)
255 , m_connectionQueue(WorkQueue::create("com.apple.IPC.ReceiveQueue"))
256 , m_inSendSyncCount(0)
257 , m_inDispatchMessageCount(0)
258 , m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount(0)
259 , m_didReceiveInvalidMessage(false)
260 , m_waitingForMessage(nullptr)
261 , m_shouldWaitForSyncReplies(true)
262{
263 ASSERT(RunLoop::isMain());
264 allConnections().add(m_uniqueID, this);
265
266 platformInitialize(identifier);
267
268#if HAVE(QOS_CLASSES)
269 ASSERT(pthread_main_np());
270 m_mainThread = pthread_self();
271#endif
272}
273
274Connection::~Connection()
275{
276 ASSERT(RunLoop::isMain());
277 ASSERT(!isValid());
278
279 allConnections().remove(m_uniqueID);
280
281 auto map = asyncReplyHandlerMap().take(reinterpret_cast<uintptr_t>(this));
282 for (auto& handler : map.values()) {
283 if (handler)
284 handler(nullptr);
285 }
286}
287
288Connection* Connection::connection(UniqueID uniqueID)
289{
290 ASSERT(RunLoop::isMain());
291 return allConnections().get(uniqueID);
292}
293
294void Connection::setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool flag)
295{
296 ASSERT(!m_isConnected);
297
298 m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage = flag;
299}
300
301void Connection::setShouldExitOnSyncMessageSendFailure(bool shouldExitOnSyncMessageSendFailure)
302{
303 ASSERT(!m_isConnected);
304
305 m_shouldExitOnSyncMessageSendFailure = shouldExitOnSyncMessageSendFailure;
306}
307
308void Connection::addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue& workQueue, WorkQueueMessageReceiver* workQueueMessageReceiver)
309{
310 ASSERT(RunLoop::isMain());
311
312 m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), workQueue = &workQueue, workQueueMessageReceiver]() mutable {
313 ASSERT(!protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
314
315 protectedThis->m_workQueueMessageReceivers.add(messageReceiverName, std::make_pair(workQueue, workQueueMessageReceiver));
316 });
317}
318
319void Connection::removeWorkQueueMessageReceiver(StringReference messageReceiverName)
320{
321 ASSERT(RunLoop::isMain());
322
323 m_connectionQueue->dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName)]() mutable {
324 ASSERT(protectedThis->m_workQueueMessageReceivers.contains(messageReceiverName));
325 protectedThis->m_workQueueMessageReceivers.remove(messageReceiverName);
326 });
327}
328
329void Connection::dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver& workQueueMessageReceiver, Decoder& decoder)
330{
331 if (!decoder.isSyncMessage()) {
332 workQueueMessageReceiver.didReceiveMessage(*this, decoder);
333 return;
334 }
335
336 uint64_t syncRequestID = 0;
337 if (!decoder.decode(syncRequestID) || !syncRequestID) {
338 // We received an invalid sync message.
339 // FIXME: Handle this.
340 decoder.markInvalid();
341 return;
342 }
343
344 auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
345
346 // Hand off both the decoder and encoder to the work queue message receiver.
347 workQueueMessageReceiver.didReceiveSyncMessage(*this, decoder, replyEncoder);
348
349 // FIXME: If the message was invalid, we should send back a SyncMessageError.
350 ASSERT(!decoder.isInvalid());
351
352 if (replyEncoder)
353 sendSyncReply(WTFMove(replyEncoder));
354}
355
356void Connection::setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback callback)
357{
358 ASSERT(!m_isConnected);
359
360 m_didCloseOnConnectionWorkQueueCallback = callback;
361}
362
363void Connection::invalidate()
364{
365 ASSERT(RunLoop::isMain());
366
367 if (!isValid()) {
368 // Someone already called invalidate().
369 return;
370 }
371
372 m_isValid = false;
373
374 {
375 std::lock_guard<Lock> lock(m_replyHandlersLock);
376 for (auto& replyHandler : m_replyHandlers.values()) {
377 replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
378 handler(nullptr);
379 });
380 }
381
382 m_replyHandlers.clear();
383 }
384
385 m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
386 protectedThis->platformInvalidate();
387 });
388}
389
390void Connection::markCurrentlyDispatchedMessageAsInvalid()
391{
392 // This should only be called while processing a message.
393 ASSERT(m_inDispatchMessageCount > 0);
394
395 m_didReceiveInvalidMessage = true;
396}
397
398std::unique_ptr<Encoder> Connection::createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID)
399{
400 auto encoder = std::make_unique<Encoder>(messageReceiverName, messageName, destinationID);
401 encoder->setIsSyncMessage(true);
402
403 // Encode the sync request ID.
404 syncRequestID = ++m_syncRequestID;
405 *encoder << syncRequestID;
406
407 return encoder;
408}
409
410bool Connection::sendMessage(std::unique_ptr<Encoder> encoder, OptionSet<SendOption> sendOptions)
411{
412 if (!isValid())
413 return false;
414
415 if (isMainThread() && m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting && !encoder->isSyncMessage() && !(encoder->messageReceiverName() == "IPC") && !sendOptions.contains(SendOption::IgnoreFullySynchronousMode)) {
416 uint64_t syncRequestID;
417 auto wrappedMessage = createSyncMessageEncoder("IPC", "WrappedAsyncMessageForTesting", encoder->destinationID(), syncRequestID);
418 wrappedMessage->setFullySynchronousModeForTesting();
419 wrappedMessage->wrapForTesting(WTFMove(encoder));
420 return static_cast<bool>(sendSyncMessage(syncRequestID, WTFMove(wrappedMessage), Seconds::infinity(), { }));
421 }
422
423 if (sendOptions.contains(SendOption::DispatchMessageEvenWhenWaitingForSyncReply)
424 && (!m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage
425 || m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount))
426 encoder->setShouldDispatchMessageWhenWaitingForSyncReply(true);
427
428 {
429 std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
430 m_outgoingMessages.append(WTFMove(encoder));
431 }
432
433 // FIXME: We should add a boolean flag so we don't call this when work has already been scheduled.
434 m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
435 protectedThis->sendOutgoingMessages();
436 });
437 return true;
438}
439
440void Connection::sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder> encoder, FunctionDispatcher& replyDispatcher, Function<void (std::unique_ptr<Decoder>)>&& replyHandler)
441{
442 {
443 std::lock_guard<Lock> lock(m_replyHandlersLock);
444
445 if (!isValid()) {
446 replyDispatcher.dispatch([replyHandler = WTFMove(replyHandler)] {
447 replyHandler(nullptr);
448 });
449 return;
450 }
451
452 ASSERT(!m_replyHandlers.contains(requestID));
453 m_replyHandlers.set(requestID, ReplyHandler { &replyDispatcher, WTFMove(replyHandler) });
454 }
455
456 sendMessage(WTFMove(encoder), { });
457}
458
459bool Connection::sendSyncReply(std::unique_ptr<Encoder> encoder)
460{
461 return sendMessage(WTFMove(encoder), { });
462}
463
464Seconds Connection::timeoutRespectingIgnoreTimeoutsForTesting(Seconds timeout) const
465{
466 return m_ignoreTimeoutsForTesting ? Seconds::infinity() : timeout;
467}
468
469std::unique_ptr<Decoder> Connection::waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions)
470{
471 ASSERT(RunLoop::isMain());
472
473 timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
474
475 bool hasIncomingSynchronousMessage = false;
476
477 // First, check if this message is already in the incoming messages queue.
478 {
479 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
480
481 for (auto it = m_incomingMessages.begin(), end = m_incomingMessages.end(); it != end; ++it) {
482 std::unique_ptr<Decoder>& message = *it;
483
484 if (message->messageReceiverName() == messageReceiverName && message->messageName() == messageName && message->destinationID() == destinationID) {
485 std::unique_ptr<Decoder> returnedMessage = WTFMove(message);
486
487 m_incomingMessages.remove(it);
488 return returnedMessage;
489 }
490
491 if (message->isSyncMessage())
492 hasIncomingSynchronousMessage = true;
493 }
494 }
495
496 // Don't even start waiting if we have InterruptWaitingIfSyncMessageArrives and there's a sync message already in the queue.
497 if (hasIncomingSynchronousMessage && waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives)) {
498 m_waitingForMessage = nullptr;
499 return nullptr;
500 }
501
502 WaitForMessageState waitingForMessage(messageReceiverName, messageName, destinationID, waitForOptions);
503
504 {
505 std::lock_guard<Lock> lock(m_waitForMessageMutex);
506
507 // We don't support having multiple clients waiting for messages.
508 ASSERT(!m_waitingForMessage);
509
510 m_waitingForMessage = &waitingForMessage;
511 }
512
513 MonotonicTime absoluteTimeout = MonotonicTime::now() + timeout;
514
515 // Now wait for it to be set.
516 while (true) {
517 // Handle any messages that are blocked on a response from us.
518 SyncMessageState::singleton().dispatchMessages(nullptr);
519
520 std::unique_lock<Lock> lock(m_waitForMessageMutex);
521
522 if (m_waitingForMessage->decoder) {
523 auto decoder = WTFMove(m_waitingForMessage->decoder);
524 m_waitingForMessage = nullptr;
525 return decoder;
526 }
527
528 // Now we wait.
529 bool didTimeout = !m_waitForMessageCondition.waitUntil(lock, absoluteTimeout);
530 // We timed out, lost our connection, or a sync message came in with InterruptWaitingIfSyncMessageArrives, so stop waiting.
531 if (didTimeout || m_waitingForMessage->messageWaitingInterrupted) {
532 m_waitingForMessage = nullptr;
533 break;
534 }
535 }
536
537 return nullptr;
538}
539
540std::unique_ptr<Decoder> Connection::sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder> encoder, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
541{
542 ASSERT(RunLoop::isMain());
543
544 if (!isValid()) {
545 didFailToSendSyncMessage();
546 return nullptr;
547 }
548
549 // Push the pending sync reply information on our stack.
550 {
551 LockHolder locker(m_syncReplyStateMutex);
552 if (!m_shouldWaitForSyncReplies) {
553 didFailToSendSyncMessage();
554 return nullptr;
555 }
556
557 m_pendingSyncReplies.append(PendingSyncReply(syncRequestID));
558 }
559
560 ++m_inSendSyncCount;
561
562 // First send the message.
563 sendMessage(WTFMove(encoder), IPC::SendOption::DispatchMessageEvenWhenWaitingForSyncReply);
564
565 // Then wait for a reply. Waiting for a reply could involve dispatching incoming sync messages, so
566 // keep an extra reference to the connection here in case it's invalidated.
567 Ref<Connection> protect(*this);
568 std::unique_ptr<Decoder> reply = waitForSyncReply(syncRequestID, timeout, sendSyncOptions);
569
570 --m_inSendSyncCount;
571
572 // Finally, pop the pending sync reply information.
573 {
574 LockHolder locker(m_syncReplyStateMutex);
575 ASSERT(m_pendingSyncReplies.last().syncRequestID == syncRequestID);
576 m_pendingSyncReplies.removeLast();
577 }
578
579 if (!reply)
580 didFailToSendSyncMessage();
581
582 return reply;
583}
584
585std::unique_ptr<Decoder> Connection::waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
586{
587 timeout = timeoutRespectingIgnoreTimeoutsForTesting(timeout);
588 WallTime absoluteTime = WallTime::now() + timeout;
589
590 willSendSyncMessage(sendSyncOptions);
591
592 bool timedOut = false;
593 while (!timedOut) {
594 // First, check if we have any messages that we need to process.
595 SyncMessageState::singleton().dispatchMessages(nullptr);
596
597 {
598 LockHolder locker(m_syncReplyStateMutex);
599
600 // Second, check if there is a sync reply at the top of the stack.
601 ASSERT(!m_pendingSyncReplies.isEmpty());
602
603 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies.last();
604 ASSERT_UNUSED(syncRequestID, pendingSyncReply.syncRequestID == syncRequestID);
605
606 // We found the sync reply, or the connection was closed.
607 if (pendingSyncReply.didReceiveReply || !m_shouldWaitForSyncReplies) {
608 didReceiveSyncReply(sendSyncOptions);
609 return WTFMove(pendingSyncReply.replyDecoder);
610 }
611 }
612
613 // Processing a sync message could cause the connection to be invalidated.
614 // (If the handler ends up calling Connection::invalidate).
615 // If that happens, we need to stop waiting, or we'll hang since we won't get
616 // any more incoming messages.
617 if (!isValid()) {
618 RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Connection no longer valid, id = %" PRIu64, syncRequestID);
619 didReceiveSyncReply(sendSyncOptions);
620 return nullptr;
621 }
622
623 // We didn't find a sync reply yet, keep waiting.
624 // This allows the WebProcess to still serve clients while waiting for the message to return.
625 // Notably, it can continue to process accessibility requests, which are on the main thread.
626 timedOut = !SyncMessageState::singleton().wait(absoluteTime);
627 }
628
629 RELEASE_LOG_ERROR(IPC, "Connection::waitForSyncReply: Timed-out while waiting for reply, id = %" PRIu64, syncRequestID);
630 didReceiveSyncReply(sendSyncOptions);
631
632 return nullptr;
633}
634
635void Connection::processIncomingSyncReply(std::unique_ptr<Decoder> decoder)
636{
637 {
638 LockHolder locker(m_syncReplyStateMutex);
639
640 // Go through the stack of sync requests that have pending replies and see which one
641 // this reply is for.
642 for (size_t i = m_pendingSyncReplies.size(); i > 0; --i) {
643 PendingSyncReply& pendingSyncReply = m_pendingSyncReplies[i - 1];
644
645 if (pendingSyncReply.syncRequestID != decoder->destinationID())
646 continue;
647
648 ASSERT(!pendingSyncReply.replyDecoder);
649
650 pendingSyncReply.replyDecoder = WTFMove(decoder);
651 pendingSyncReply.didReceiveReply = true;
652
653 // We got a reply to the last send message, wake up the client run loop so it can be processed.
654 if (i == m_pendingSyncReplies.size())
655 SyncMessageState::singleton().wakeUpClientRunLoop();
656
657 return;
658 }
659 }
660
661 {
662 LockHolder locker(m_replyHandlersLock);
663
664 auto replyHandler = m_replyHandlers.take(decoder->destinationID());
665 if (replyHandler.dispatcher) {
666 replyHandler.dispatcher->dispatch([protectedThis = makeRef(*this), handler = WTFMove(replyHandler.handler), decoder = WTFMove(decoder)] () mutable {
667 if (!protectedThis->isValid()) {
668 handler(nullptr);
669 return;
670 }
671
672 handler(WTFMove(decoder));
673 });
674 }
675 }
676
677 // If we get here, it means we got a reply for a message that wasn't in the sync request stack or map.
678 // This can happen if the send timed out, so it's fine to ignore.
679}
680
681void Connection::processIncomingMessage(std::unique_ptr<Decoder> message)
682{
683 ASSERT(!message->messageReceiverName().isEmpty());
684 ASSERT(!message->messageName().isEmpty());
685
686 if (message->messageReceiverName() == "IPC" && message->messageName() == "SyncMessageReply") {
687 processIncomingSyncReply(WTFMove(message));
688 return;
689 }
690
691 if (!m_workQueueMessageReceivers.isValidKey(message->messageReceiverName())) {
692 RefPtr<Connection> protectedThis(this);
693 StringReference messageReceiverNameReference = message->messageReceiverName();
694 String messageReceiverName(messageReceiverNameReference.isEmpty() ? "<unknown message receiver>" : String(messageReceiverNameReference.data(), messageReceiverNameReference.size()));
695 StringReference messageNameReference = message->messageName();
696 String messageName(messageNameReference.isEmpty() ? "<unknown message>" : String(messageNameReference.data(), messageNameReference.size()));
697
698 RunLoop::main().dispatch([protectedThis = makeRef(*this), messageReceiverName = WTFMove(messageReceiverName), messageName = WTFMove(messageName)]() mutable {
699 protectedThis->dispatchDidReceiveInvalidMessage(messageReceiverName.utf8(), messageName.utf8());
700 });
701 return;
702 }
703
704 auto it = m_workQueueMessageReceivers.find(message->messageReceiverName());
705 if (it != m_workQueueMessageReceivers.end()) {
706 it->value.first->dispatch([protectedThis = makeRef(*this), workQueueMessageReceiver = it->value.second, decoder = WTFMove(message)]() mutable {
707 protectedThis->dispatchWorkQueueMessageReceiverMessage(*workQueueMessageReceiver, *decoder);
708 });
709 return;
710 }
711
712#if HAVE(QOS_CLASSES)
713 if (message->isSyncMessage() && m_shouldBoostMainThreadOnSyncMessage) {
714 pthread_override_t override = pthread_override_qos_class_start_np(m_mainThread, Thread::adjustedQOSClass(QOS_CLASS_USER_INTERACTIVE), 0);
715 message->setQOSClassOverride(override);
716 }
717#endif
718
719 if (message->isSyncMessage()) {
720 std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
721
722 for (auto& callback : m_incomingSyncMessageCallbacks.values())
723 m_incomingSyncMessageCallbackQueue->dispatch(WTFMove(callback));
724
725 m_incomingSyncMessageCallbacks.clear();
726 }
727
728 // Check if we're waiting for this message, or if we need to interrupt waiting due to an incoming sync message.
729 {
730 std::lock_guard<Lock> lock(m_waitForMessageMutex);
731
732 if (m_waitingForMessage && !m_waitingForMessage->decoder) {
733 if (m_waitingForMessage->messageReceiverName == message->messageReceiverName() && m_waitingForMessage->messageName == message->messageName() && m_waitingForMessage->destinationID == message->destinationID()) {
734 m_waitingForMessage->decoder = WTFMove(message);
735 ASSERT(m_waitingForMessage->decoder);
736 m_waitForMessageCondition.notifyOne();
737 return;
738 }
739
740 if (m_waitingForMessage->waitForOptions.contains(WaitForOption::InterruptWaitingIfSyncMessageArrives) && message->isSyncMessage()) {
741 m_waitingForMessage->messageWaitingInterrupted = true;
742 m_waitForMessageCondition.notifyOne();
743 enqueueIncomingMessage(WTFMove(message));
744 return;
745 }
746 }
747 }
748
749 // Check if this is a sync message or if it's a message that should be dispatched even when waiting for
750 // a sync reply. If it is, and we're waiting for a sync reply this message needs to be dispatched.
751 // If we don't we'll end up with a deadlock where both sync message senders are stuck waiting for a reply.
752 if (SyncMessageState::singleton().processIncomingMessage(*this, message))
753 return;
754
755 enqueueIncomingMessage(WTFMove(message));
756}
757
758uint64_t Connection::installIncomingSyncMessageCallback(WTF::Function<void ()>&& callback)
759{
760 std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
761
762 m_nextIncomingSyncMessageCallbackID++;
763
764 if (!m_incomingSyncMessageCallbackQueue)
765 m_incomingSyncMessageCallbackQueue = WorkQueue::create("com.apple.WebKit.IPC.IncomingSyncMessageCallbackQueue");
766
767 m_incomingSyncMessageCallbacks.add(m_nextIncomingSyncMessageCallbackID, WTFMove(callback));
768
769 return m_nextIncomingSyncMessageCallbackID;
770}
771
772void Connection::uninstallIncomingSyncMessageCallback(uint64_t callbackID)
773{
774 std::lock_guard<Lock> lock(m_incomingSyncMessageCallbackMutex);
775 m_incomingSyncMessageCallbacks.remove(callbackID);
776}
777
778bool Connection::hasIncomingSyncMessage()
779{
780 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
781
782 for (auto& message : m_incomingMessages) {
783 if (message->isSyncMessage())
784 return true;
785 }
786
787 return false;
788}
789
790void Connection::enableIncomingMessagesThrottling()
791{
792 if (m_incomingMessagesThrottler)
793 return;
794
795 m_incomingMessagesThrottler = std::make_unique<MessagesThrottler>(*this, &Connection::dispatchIncomingMessages);
796}
797
798void Connection::postConnectionDidCloseOnConnectionWorkQueue()
799{
800 m_connectionQueue->dispatch([protectedThis = makeRef(*this)]() mutable {
801 protectedThis->connectionDidClose();
802 });
803}
804
805void Connection::connectionDidClose()
806{
807 // The connection is now invalid.
808 platformInvalidate();
809
810 {
811 LockHolder locker(m_replyHandlersLock);
812 for (auto& replyHandler : m_replyHandlers.values()) {
813 replyHandler.dispatcher->dispatch([handler = WTFMove(replyHandler.handler)] {
814 handler(nullptr);
815 });
816 }
817
818 m_replyHandlers.clear();
819 }
820
821 {
822 LockHolder locker(m_syncReplyStateMutex);
823
824 ASSERT(m_shouldWaitForSyncReplies);
825 m_shouldWaitForSyncReplies = false;
826
827 if (!m_pendingSyncReplies.isEmpty())
828 SyncMessageState::singleton().wakeUpClientRunLoop();
829 }
830
831 {
832 std::lock_guard<Lock> lock(m_waitForMessageMutex);
833 if (m_waitingForMessage)
834 m_waitingForMessage->messageWaitingInterrupted = true;
835 }
836 m_waitForMessageCondition.notifyAll();
837
838 if (m_didCloseOnConnectionWorkQueueCallback)
839 m_didCloseOnConnectionWorkQueueCallback(this);
840
841 RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
842 // If the connection has been explicitly invalidated before dispatchConnectionDidClose was called,
843 // then the connection will be invalid here.
844 if (!protectedThis->isValid())
845 return;
846
847 // Set m_isValid to false before calling didClose, otherwise, sendSync will try to send a message
848 // to the connection and will then wait indefinitely for a reply.
849 protectedThis->m_isValid = false;
850
851 protectedThis->m_client.didClose(protectedThis.get());
852 });
853}
854
855bool Connection::canSendOutgoingMessages() const
856{
857 return m_isConnected && platformCanSendOutgoingMessages();
858}
859
860void Connection::sendOutgoingMessages()
861{
862 if (!canSendOutgoingMessages())
863 return;
864
865 while (true) {
866 std::unique_ptr<Encoder> message;
867
868 {
869 std::lock_guard<Lock> lock(m_outgoingMessagesMutex);
870 if (m_outgoingMessages.isEmpty())
871 break;
872 message = m_outgoingMessages.takeFirst();
873 }
874
875 if (!sendOutgoingMessage(WTFMove(message)))
876 break;
877 }
878}
879
880void Connection::dispatchSyncMessage(Decoder& decoder)
881{
882 ASSERT(decoder.isSyncMessage());
883
884 uint64_t syncRequestID = 0;
885 if (!decoder.decode(syncRequestID) || !syncRequestID) {
886 // We received an invalid sync message.
887 decoder.markInvalid();
888 return;
889 }
890
891 auto replyEncoder = std::make_unique<Encoder>("IPC", "SyncMessageReply", syncRequestID);
892
893 if (decoder.messageReceiverName() == "IPC" && decoder.messageName() == "WrappedAsyncMessageForTesting") {
894 if (!m_fullySynchronousModeIsAllowedForTesting) {
895 decoder.markInvalid();
896 return;
897 }
898 std::unique_ptr<Decoder> unwrappedDecoder = Decoder::unwrapForTesting(decoder);
899 RELEASE_ASSERT(unwrappedDecoder);
900 processIncomingMessage(WTFMove(unwrappedDecoder));
901
902 SyncMessageState::singleton().dispatchMessages(nullptr);
903 } else {
904 // Hand off both the decoder and encoder to the client.
905 m_client.didReceiveSyncMessage(*this, decoder, replyEncoder);
906 }
907
908 // FIXME: If the message was invalid, we should send back a SyncMessageError.
909 ASSERT(!decoder.isInvalid());
910
911 if (replyEncoder)
912 sendSyncReply(WTFMove(replyEncoder));
913}
914
915void Connection::dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString)
916{
917 ASSERT(RunLoop::isMain());
918
919 if (!isValid())
920 return;
921
922 m_client.didReceiveInvalidMessage(*this, StringReference(messageReceiverNameString.data(), messageReceiverNameString.length()), StringReference(messageNameString.data(), messageNameString.length()));
923}
924
925void Connection::didFailToSendSyncMessage()
926{
927 if (!m_shouldExitOnSyncMessageSendFailure)
928 return;
929
930 exit(0);
931}
932
933void Connection::enqueueIncomingMessage(std::unique_ptr<Decoder> incomingMessage)
934{
935 {
936 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
937
938#if PLATFORM(COCOA)
939 if (m_wasKilled)
940 return;
941
942 if (m_incomingMessages.size() >= maxPendingIncomingMessagesKillingThreshold) {
943 if (kill()) {
944 RELEASE_LOG_ERROR(IPC, "%p - Connection::enqueueIncomingMessage: Over %zu incoming messages have been queued without the main thread processing them, killing the connection as the remote process seems to be misbehaving", this, maxPendingIncomingMessagesKillingThreshold);
945 m_incomingMessages.clear();
946 }
947 return;
948 }
949#endif
950
951 m_incomingMessages.append(WTFMove(incomingMessage));
952
953 if (m_incomingMessagesThrottler && m_incomingMessages.size() != 1)
954 return;
955 }
956
957 RunLoop::main().dispatch([protectedThis = makeRef(*this)]() mutable {
958 if (protectedThis->m_incomingMessagesThrottler)
959 protectedThis->dispatchIncomingMessages();
960 else
961 protectedThis->dispatchOneIncomingMessage();
962 });
963}
964
965void Connection::dispatchMessage(Decoder& decoder)
966{
967 RELEASE_ASSERT(isValid());
968 if (decoder.messageReceiverName() == "AsyncReply") {
969 Optional<uint64_t> listenerID;
970 decoder >> listenerID;
971 if (!listenerID) {
972 ASSERT_NOT_REACHED();
973 return;
974 }
975 auto handler = takeAsyncReplyHandler(*this, *listenerID);
976 if (!handler) {
977 ASSERT_NOT_REACHED();
978 return;
979 }
980 handler(&decoder);
981 return;
982 }
983 m_client.didReceiveMessage(*this, decoder);
984}
985
986void Connection::dispatchMessage(std::unique_ptr<Decoder> message)
987{
988 if (!isValid())
989 return;
990
991 if (message->shouldUseFullySynchronousModeForTesting()) {
992 if (!m_fullySynchronousModeIsAllowedForTesting) {
993 m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
994 return;
995 }
996 m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting++;
997 }
998
999 m_inDispatchMessageCount++;
1000
1001 if (message->shouldDispatchMessageWhenWaitingForSyncReply())
1002 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount++;
1003
1004 bool oldDidReceiveInvalidMessage = m_didReceiveInvalidMessage;
1005 m_didReceiveInvalidMessage = false;
1006
1007 if (message->isSyncMessage())
1008 dispatchSyncMessage(*message);
1009 else
1010 dispatchMessage(*message);
1011
1012 m_didReceiveInvalidMessage |= message->isInvalid();
1013 m_inDispatchMessageCount--;
1014
1015 // FIXME: For synchronous messages, we should not decrement the counter until we send a response.
1016 // Otherwise, we would deadlock if processing the message results in a sync message back after we exit this function.
1017 if (message->shouldDispatchMessageWhenWaitingForSyncReply())
1018 m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount--;
1019
1020 if (message->shouldUseFullySynchronousModeForTesting())
1021 m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting--;
1022
1023 if (m_didReceiveInvalidMessage && isValid())
1024 m_client.didReceiveInvalidMessage(*this, message->messageReceiverName(), message->messageName());
1025
1026 m_didReceiveInvalidMessage = oldDidReceiveInvalidMessage;
1027}
1028
1029Connection::MessagesThrottler::MessagesThrottler(Connection& connection, DispatchMessagesFunction dispatchMessages)
1030 : m_dispatchMessagesTimer(RunLoop::main(), &connection, dispatchMessages)
1031 , m_connection(connection)
1032 , m_dispatchMessages(dispatchMessages)
1033{
1034 ASSERT(RunLoop::isMain());
1035}
1036
1037void Connection::MessagesThrottler::scheduleMessagesDispatch()
1038{
1039 ASSERT(RunLoop::isMain());
1040
1041 if (m_throttlingLevel) {
1042 m_dispatchMessagesTimer.startOneShot(0_s);
1043 return;
1044 }
1045 RunLoop::main().dispatch([this, protectedConnection = makeRefPtr(&m_connection)]() mutable {
1046 (protectedConnection.get()->*m_dispatchMessages)();
1047 });
1048}
1049
1050size_t Connection::MessagesThrottler::numberOfMessagesToProcess(size_t totalMessages)
1051{
1052 ASSERT(RunLoop::isMain());
1053
1054 // Never dispatch more than 600 messages without returning to the run loop, we can go as low as 60 with maximum throttling level.
1055 static const size_t maxIncomingMessagesDispatchingBatchSize { 600 };
1056 static const unsigned maxThrottlingLevel = 9;
1057
1058 size_t batchSize = maxIncomingMessagesDispatchingBatchSize / (m_throttlingLevel + 1);
1059
1060 if (totalMessages > maxIncomingMessagesDispatchingBatchSize)
1061 m_throttlingLevel = std::min(m_throttlingLevel + 1, maxThrottlingLevel);
1062 else if (m_throttlingLevel)
1063 --m_throttlingLevel;
1064
1065 return std::min(totalMessages, batchSize);
1066}
1067
1068void Connection::dispatchOneIncomingMessage()
1069{
1070 std::unique_ptr<Decoder> message;
1071 {
1072 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
1073 if (m_incomingMessages.isEmpty())
1074 return;
1075
1076 message = m_incomingMessages.takeFirst();
1077 }
1078
1079 dispatchMessage(WTFMove(message));
1080}
1081
1082void Connection::dispatchIncomingMessages()
1083{
1084 ASSERT(RunLoop::isMain());
1085
1086 std::unique_ptr<Decoder> message;
1087
1088 size_t messagesToProcess = 0;
1089 {
1090 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
1091 if (m_incomingMessages.isEmpty())
1092 return;
1093
1094 message = m_incomingMessages.takeFirst();
1095
1096 // Incoming messages may get adding to the queue by the IPC thread while we're dispatching the messages below.
1097 // To make sure dispatchIncomingMessages() yields, we only ever process messages that were in the queue when
1098 // dispatchIncomingMessages() was called. Additionally, the MessageThrottler may further cap the number of
1099 // messages to process to make sure we give the main run loop a chance to process other events.
1100 messagesToProcess = m_incomingMessagesThrottler->numberOfMessagesToProcess(m_incomingMessages.size());
1101 if (messagesToProcess < m_incomingMessages.size()) {
1102 RELEASE_LOG_ERROR(IPC, "%p - Connection::dispatchIncomingMessages: IPC throttling was triggered (has %zu pending incoming messages, will only process %zu before yielding)", this, m_incomingMessages.size(), messagesToProcess);
1103#if PLATFORM(COCOA)
1104 RELEASE_LOG_ERROR(IPC, "%p - Connection::dispatchIncomingMessages: first IPC message in queue is %{public}s::%{public}s", this, message->messageReceiverName().toString().data(), message->messageName().toString().data());
1105#endif
1106 }
1107
1108 // Re-schedule ourselves *before* we dispatch the messages because we want to process follow-up messages if the client
1109 // spins a nested run loop while we're dispatching a message. Note that this means we can re-enter this method.
1110 if (!m_incomingMessages.isEmpty())
1111 m_incomingMessagesThrottler->scheduleMessagesDispatch();
1112 }
1113
1114 dispatchMessage(WTFMove(message));
1115
1116 for (size_t i = 1; i < messagesToProcess; ++i) {
1117 {
1118 std::lock_guard<Lock> lock(m_incomingMessagesMutex);
1119 if (m_incomingMessages.isEmpty())
1120 return;
1121
1122 message = m_incomingMessages.takeFirst();
1123 }
1124 dispatchMessage(WTFMove(message));
1125 }
1126}
1127
1128uint64_t nextAsyncReplyHandlerID()
1129{
1130 static uint64_t identifier { 0 };
1131 return ++identifier;
1132}
1133
1134void addAsyncReplyHandler(Connection& connection, uint64_t identifier, CompletionHandler<void(Decoder*)>&& completionHandler)
1135{
1136 auto result = asyncReplyHandlerMap().ensure(reinterpret_cast<uintptr_t>(&connection), [] {
1137 return HashMap<uint64_t, CompletionHandler<void(Decoder*)>>();
1138 }).iterator->value.add(identifier, WTFMove(completionHandler));
1139 ASSERT_UNUSED(result, result.isNewEntry);
1140}
1141
1142CompletionHandler<void(Decoder*)> takeAsyncReplyHandler(Connection& connection, uint64_t identifier)
1143{
1144 auto iterator = asyncReplyHandlerMap().find(reinterpret_cast<uintptr_t>(&connection));
1145 if (iterator != asyncReplyHandlerMap().end()) {
1146 if (!iterator->value.isValidKey(identifier)) {
1147 ASSERT_NOT_REACHED();
1148 connection.markCurrentlyDispatchedMessageAsInvalid();
1149 return nullptr;
1150 }
1151 ASSERT(iterator->value.contains(identifier));
1152 return iterator->value.take(identifier);
1153 }
1154 ASSERT_NOT_REACHED();
1155 return nullptr;
1156}
1157
1158void Connection::wakeUpRunLoop()
1159{
1160 RunLoop::main().wakeUp();
1161}
1162
1163} // namespace IPC
1164