1/*
2 * Copyright (C) 2010-2018 Apple Inc. All rights reserved.
3 * Copyright (C) 2010 Nokia Corporation and/or its subsidiary(-ies)
4 * Portions Copyright (c) 2010 Motorola Mobility, Inc. All rights reserved.
5 * Copyright (C) 2017 Sony Interactive Entertainment Inc.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY APPLE INC. AND ITS CONTRIBUTORS ``AS IS''
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
18 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL APPLE INC. OR ITS CONTRIBUTORS
20 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
26 * THE POSSIBILITY OF SUCH DAMAGE.
27 */
28
29#pragma once
30
31#include "Decoder.h"
32#include "Encoder.h"
33#include "HandleMessage.h"
34#include "MessageReceiver.h"
35#include <WebCore/ScriptDisallowedScope.h>
36#include <atomic>
37#include <wtf/Condition.h>
38#include <wtf/Deque.h>
39#include <wtf/Forward.h>
40#include <wtf/HashMap.h>
41#include <wtf/Lock.h>
42#include <wtf/ObjectIdentifier.h>
43#include <wtf/OptionSet.h>
44#include <wtf/RunLoop.h>
45#include <wtf/WorkQueue.h>
46#include <wtf/text/CString.h>
47
48#if OS(DARWIN) && !USE(UNIX_DOMAIN_SOCKETS)
49#include <mach/mach_port.h>
50#include <wtf/OSObjectPtr.h>
51#include <wtf/spi/darwin/XPCSPI.h>
52#endif
53
54#if USE(GLIB)
55#include "GSocketMonitor.h"
56#endif
57
58namespace IPC {
59
60enum class SendOption {
61 // Whether this message should be dispatched when waiting for a sync reply.
62 // This is the default for synchronous messages.
63 DispatchMessageEvenWhenWaitingForSyncReply = 1 << 0,
64 IgnoreFullySynchronousMode = 1 << 1,
65};
66
67enum class SendSyncOption {
68 // Use this to inform that this sync call will suspend this process until the user responds with input.
69 InformPlatformProcessWillSuspend = 1 << 0,
70 UseFullySynchronousModeForTesting = 1 << 1,
71};
72
73enum class WaitForOption {
74 // Use this to make waitForMessage be interrupted immediately by any incoming sync messages.
75 InterruptWaitingIfSyncMessageArrives = 1 << 0,
76};
77
78#define MESSAGE_CHECK_BASE(assertion, connection) do \
79 if (!(assertion)) { \
80 ASSERT(assertion); \
81 (connection)->markCurrentlyDispatchedMessageAsInvalid(); \
82 return; \
83 } \
84while (0)
85
86class MachMessage;
87class UnixMessage;
88
89class Connection : public ThreadSafeRefCounted<Connection, WTF::DestructionThread::Main> {
90public:
91 class Client : public MessageReceiver {
92 public:
93 virtual void didClose(Connection&) = 0;
94 virtual void didReceiveInvalidMessage(Connection&, StringReference messageReceiverName, StringReference messageName) = 0;
95
96 protected:
97 virtual ~Client() { }
98 };
99
100 class WorkQueueMessageReceiver : public MessageReceiver, public ThreadSafeRefCounted<WorkQueueMessageReceiver> {
101 };
102
103#if USE(UNIX_DOMAIN_SOCKETS)
104 typedef int Identifier;
105 static bool identifierIsValid(Identifier identifier) { return identifier != -1; }
106
107 struct SocketPair {
108 int client;
109 int server;
110 };
111
112 enum ConnectionOptions {
113 SetCloexecOnClient = 1 << 0,
114 SetCloexecOnServer = 1 << 1,
115 };
116
117 static Connection::SocketPair createPlatformConnection(unsigned options = SetCloexecOnClient | SetCloexecOnServer);
118#elif OS(DARWIN)
119 struct Identifier {
120 Identifier()
121 {
122 }
123
124 Identifier(mach_port_t port)
125 : port(port)
126 {
127 }
128
129 Identifier(mach_port_t port, OSObjectPtr<xpc_connection_t> xpcConnection)
130 : port(port)
131 , xpcConnection(WTFMove(xpcConnection))
132 {
133 }
134
135 mach_port_t port { MACH_PORT_NULL };
136 OSObjectPtr<xpc_connection_t> xpcConnection;
137 };
138 static bool identifierIsValid(Identifier identifier) { return MACH_PORT_VALID(identifier.port); }
139 xpc_connection_t xpcConnection() const { return m_xpcConnection.get(); }
140 Optional<audit_token_t> getAuditToken();
141 pid_t remoteProcessID() const;
142#elif OS(WINDOWS)
143 typedef HANDLE Identifier;
144 static bool createServerAndClientIdentifiers(Identifier& serverIdentifier, Identifier& clientIdentifier);
145 static bool identifierIsValid(Identifier identifier) { return !!identifier; }
146#endif
147
148 static Ref<Connection> createServerConnection(Identifier, Client&);
149 static Ref<Connection> createClientConnection(Identifier, Client&);
150 ~Connection();
151
152 Client& client() const { return m_client; }
153
154 enum UniqueIDType { };
155 using UniqueID = ObjectIdentifier<UniqueIDType>;
156
157 static Connection* connection(UniqueID);
158 UniqueID uniqueID() const { return m_uniqueID; }
159
160 void setOnlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage(bool);
161 void setShouldExitOnSyncMessageSendFailure(bool);
162
163 // The set callback will be called on the connection work queue when the connection is closed,
164 // before didCall is called on the client thread. Must be called before the connection is opened.
165 // In the future we might want a more generic way to handle sync or async messages directly
166 // on the work queue, for example if we want to handle them on some other thread we could avoid
167 // handling the message on the client thread first.
168 typedef void (*DidCloseOnConnectionWorkQueueCallback)(Connection*);
169 void setDidCloseOnConnectionWorkQueueCallback(DidCloseOnConnectionWorkQueueCallback);
170
171 void addWorkQueueMessageReceiver(StringReference messageReceiverName, WorkQueue&, WorkQueueMessageReceiver*);
172 void removeWorkQueueMessageReceiver(StringReference messageReceiverName);
173
174 bool open();
175 void invalidate();
176 void markCurrentlyDispatchedMessageAsInvalid();
177
178 void postConnectionDidCloseOnConnectionWorkQueue();
179
180 template<typename T, typename C> void sendWithAsyncReply(T&& message, C&& completionHandler, uint64_t destinationID = 0);
181 template<typename T> bool send(T&& message, uint64_t destinationID, OptionSet<SendOption> sendOptions = { });
182 template<typename T> void sendWithReply(T&& message, uint64_t destinationID, FunctionDispatcher& replyDispatcher, Function<void(Optional<typename CodingType<typename T::Reply>::Type>)>&& replyHandler);
183 template<typename T> bool sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, Seconds timeout = Seconds::infinity(), OptionSet<SendSyncOption> sendSyncOptions = { });
184 template<typename T> bool waitForAndDispatchImmediately(uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions = { });
185
186 bool sendMessage(std::unique_ptr<Encoder>, OptionSet<SendOption> sendOptions);
187 void sendMessageWithReply(uint64_t requestID, std::unique_ptr<Encoder>, FunctionDispatcher& replyDispatcher, Function<void(std::unique_ptr<Decoder>)>&& replyHandler);
188 std::unique_ptr<Encoder> createSyncMessageEncoder(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, uint64_t& syncRequestID);
189 std::unique_ptr<Decoder> sendSyncMessage(uint64_t syncRequestID, std::unique_ptr<Encoder>, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions);
190 bool sendSyncReply(std::unique_ptr<Encoder>);
191
192 void wakeUpRunLoop();
193
194 void incrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { ++m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; }
195 void decrementDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount() { --m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount; }
196
197 bool inSendSync() const { return m_inSendSyncCount; }
198
199 Identifier identifier() const;
200
201#if PLATFORM(COCOA)
202 bool kill();
203 void terminateSoon(Seconds);
204#endif
205
206 bool isValid() const { return m_isValid; }
207
208#if HAVE(QOS_CLASSES)
209 void setShouldBoostMainThreadOnSyncMessage(bool b) { m_shouldBoostMainThreadOnSyncMessage = b; }
210#endif
211
212 uint64_t installIncomingSyncMessageCallback(WTF::Function<void()>&&);
213 void uninstallIncomingSyncMessageCallback(uint64_t);
214 bool hasIncomingSyncMessage();
215
216 void allowFullySynchronousModeForTesting() { m_fullySynchronousModeIsAllowedForTesting = true; }
217
218 void ignoreTimeoutsForTesting() { m_ignoreTimeoutsForTesting = true; }
219
220 void enableIncomingMessagesThrottling();
221
222private:
223 Connection(Identifier, bool isServer, Client&);
224 void platformInitialize(Identifier);
225 void platformInvalidate();
226
227 std::unique_ptr<Decoder> waitForMessage(StringReference messageReceiverName, StringReference messageName, uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption>);
228
229 std::unique_ptr<Decoder> waitForSyncReply(uint64_t syncRequestID, Seconds timeout, OptionSet<SendSyncOption>);
230
231 // Called on the connection work queue.
232 void processIncomingMessage(std::unique_ptr<Decoder>);
233 void processIncomingSyncReply(std::unique_ptr<Decoder>);
234
235 void dispatchWorkQueueMessageReceiverMessage(WorkQueueMessageReceiver&, Decoder&);
236
237 bool canSendOutgoingMessages() const;
238 bool platformCanSendOutgoingMessages() const;
239 void sendOutgoingMessages();
240 bool sendOutgoingMessage(std::unique_ptr<Encoder>);
241 void connectionDidClose();
242
243 // Called on the listener thread.
244 void dispatchOneIncomingMessage();
245 void dispatchIncomingMessages();
246 void dispatchMessage(std::unique_ptr<Decoder>);
247 void dispatchMessage(Decoder&);
248 void dispatchSyncMessage(Decoder&);
249 void dispatchDidReceiveInvalidMessage(const CString& messageReceiverNameString, const CString& messageNameString);
250 void didFailToSendSyncMessage();
251
252 // Can be called on any thread.
253 void enqueueIncomingMessage(std::unique_ptr<Decoder>);
254 size_t incomingMessagesDispatchingBatchSize() const;
255
256 void willSendSyncMessage(OptionSet<SendSyncOption>);
257 void didReceiveSyncReply(OptionSet<SendSyncOption>);
258
259 Seconds timeoutRespectingIgnoreTimeoutsForTesting(Seconds) const;
260
261#if PLATFORM(COCOA)
262 bool sendMessage(std::unique_ptr<MachMessage>);
263#endif
264
265 class MessagesThrottler {
266 WTF_MAKE_FAST_ALLOCATED;
267 public:
268 typedef void (Connection::*DispatchMessagesFunction)();
269 MessagesThrottler(Connection&, DispatchMessagesFunction);
270
271 size_t numberOfMessagesToProcess(size_t totalMessages);
272 void scheduleMessagesDispatch();
273
274 private:
275 RunLoop::Timer<Connection> m_dispatchMessagesTimer;
276 Connection& m_connection;
277 DispatchMessagesFunction m_dispatchMessages;
278 unsigned m_throttlingLevel { 0 };
279 };
280
281 Client& m_client;
282 UniqueID m_uniqueID;
283 bool m_isServer;
284 std::atomic<bool> m_isValid { true };
285 std::atomic<uint64_t> m_syncRequestID;
286
287 bool m_onlySendMessagesAsDispatchWhenWaitingForSyncReplyWhenProcessingSuchAMessage;
288 bool m_shouldExitOnSyncMessageSendFailure;
289 DidCloseOnConnectionWorkQueueCallback m_didCloseOnConnectionWorkQueueCallback;
290
291 bool m_isConnected;
292 Ref<WorkQueue> m_connectionQueue;
293
294 HashMap<StringReference, std::pair<RefPtr<WorkQueue>, RefPtr<WorkQueueMessageReceiver>>> m_workQueueMessageReceivers;
295
296 unsigned m_inSendSyncCount;
297 unsigned m_inDispatchMessageCount;
298 unsigned m_inDispatchMessageMarkedDispatchWhenWaitingForSyncReplyCount;
299 unsigned m_inDispatchMessageMarkedToUseFullySynchronousModeForTesting { 0 };
300 bool m_fullySynchronousModeIsAllowedForTesting { false };
301 bool m_ignoreTimeoutsForTesting { false };
302 bool m_didReceiveInvalidMessage;
303
304 // Incoming messages.
305 Lock m_incomingMessagesMutex;
306 Deque<std::unique_ptr<Decoder>> m_incomingMessages;
307 std::unique_ptr<MessagesThrottler> m_incomingMessagesThrottler;
308
309 // Outgoing messages.
310 Lock m_outgoingMessagesMutex;
311 Deque<std::unique_ptr<Encoder>> m_outgoingMessages;
312
313 Condition m_waitForMessageCondition;
314 Lock m_waitForMessageMutex;
315
316 struct ReplyHandler;
317
318 Lock m_replyHandlersLock;
319 HashMap<uint64_t, ReplyHandler> m_replyHandlers;
320
321 struct WaitForMessageState;
322 WaitForMessageState* m_waitingForMessage;
323
324 class SyncMessageState;
325
326 Lock m_syncReplyStateMutex;
327 bool m_shouldWaitForSyncReplies;
328 struct PendingSyncReply;
329 Vector<PendingSyncReply> m_pendingSyncReplies;
330
331 Lock m_incomingSyncMessageCallbackMutex;
332 HashMap<uint64_t, WTF::Function<void()>> m_incomingSyncMessageCallbacks;
333 RefPtr<WorkQueue> m_incomingSyncMessageCallbackQueue;
334 uint64_t m_nextIncomingSyncMessageCallbackID { 0 };
335
336#if HAVE(QOS_CLASSES)
337 pthread_t m_mainThread { 0 };
338 bool m_shouldBoostMainThreadOnSyncMessage { false };
339#endif
340
341#if USE(UNIX_DOMAIN_SOCKETS)
342 // Called on the connection queue.
343 void readyReadHandler();
344 bool processMessage();
345 bool sendOutputMessage(UnixMessage&);
346
347 Vector<uint8_t> m_readBuffer;
348 Vector<int> m_fileDescriptors;
349 int m_socketDescriptor;
350 std::unique_ptr<UnixMessage> m_pendingOutputMessage;
351#if USE(GLIB)
352 GRefPtr<GSocket> m_socket;
353 GSocketMonitor m_readSocketMonitor;
354 GSocketMonitor m_writeSocketMonitor;
355#endif
356#elif OS(DARWIN)
357 // Called on the connection queue.
358 void receiveSourceEventHandler();
359 void initializeSendSource();
360 void resumeSendSource();
361 void cancelReceiveSource();
362
363 mach_port_t m_sendPort { MACH_PORT_NULL };
364 dispatch_source_t m_sendSource { nullptr };
365
366 mach_port_t m_receivePort { MACH_PORT_NULL };
367 dispatch_source_t m_receiveSource { nullptr };
368
369 std::unique_ptr<MachMessage> m_pendingOutgoingMachMessage;
370 bool m_isInitializingSendSource { false };
371
372 OSObjectPtr<xpc_connection_t> m_xpcConnection;
373 bool m_wasKilled { false };
374#elif OS(WINDOWS)
375 // Called on the connection queue.
376 void readEventHandler();
377 void writeEventHandler();
378 void invokeReadEventHandler();
379 void invokeWriteEventHandler();
380
381 class EventListener {
382 public:
383 void open(Function<void()>&&);
384 void close();
385
386 OVERLAPPED& state() { return m_state; }
387
388 private:
389 static void callback(void*, BOOLEAN);
390
391 OVERLAPPED m_state;
392 HANDLE m_waitHandle { INVALID_HANDLE_VALUE };
393 Function<void()> m_handler;
394 };
395
396 Vector<uint8_t> m_readBuffer;
397 EventListener m_readListener;
398 std::unique_ptr<Encoder> m_pendingWriteEncoder;
399 EventListener m_writeListener;
400 HANDLE m_connectionPipe { INVALID_HANDLE_VALUE };
401#endif
402};
403
404template<typename T>
405bool Connection::send(T&& message, uint64_t destinationID, OptionSet<SendOption> sendOptions)
406{
407 COMPILE_ASSERT(!T::isSync, AsyncMessageExpected);
408
409 auto encoder = std::make_unique<Encoder>(T::receiverName(), T::name(), destinationID);
410 encoder->encode(message.arguments());
411
412 return sendMessage(WTFMove(encoder), sendOptions);
413}
414
415uint64_t nextAsyncReplyHandlerID();
416void addAsyncReplyHandler(Connection&, uint64_t, CompletionHandler<void(Decoder*)>&&);
417CompletionHandler<void(Decoder*)> takeAsyncReplyHandler(Connection&, uint64_t);
418
419template<typename T, typename C>
420void Connection::sendWithAsyncReply(T&& message, C&& completionHandler, uint64_t destinationID)
421{
422 COMPILE_ASSERT(!T::isSync, AsyncMessageExpected);
423
424 auto encoder = std::make_unique<Encoder>(T::receiverName(), T::name(), destinationID);
425 uint64_t listenerID = nextAsyncReplyHandlerID();
426 encoder->encode(listenerID);
427 encoder->encode(message.arguments());
428 sendMessage(WTFMove(encoder), { });
429 addAsyncReplyHandler(*this, listenerID, [completionHandler = WTFMove(completionHandler)] (Decoder* decoder) mutable {
430 if (decoder && !decoder->isInvalid())
431 T::callReply(*decoder, WTFMove(completionHandler));
432 else
433 T::cancelReply(WTFMove(completionHandler));
434 });
435}
436
437template<typename T>
438void Connection::sendWithReply(T&& message, uint64_t destinationID, FunctionDispatcher& replyDispatcher, Function<void(Optional<typename CodingType<typename T::Reply>::Type>)>&& replyHandler)
439{
440 uint64_t requestID = 0;
441 std::unique_ptr<Encoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, requestID);
442
443 encoder->encode(message.arguments());
444
445 sendMessageWithReply(requestID, WTFMove(encoder), replyDispatcher, [replyHandler = WTFMove(replyHandler)](std::unique_ptr<Decoder> decoder) {
446 if (decoder) {
447 Optional<typename CodingType<typename T::Reply>::Type> reply;
448 *decoder >> reply;
449 if (reply) {
450 replyHandler(WTFMove(*reply));
451 return;
452 }
453 }
454
455 replyHandler(WTF::nullopt);
456 });
457}
458
459template<size_t i, typename A, typename B> struct TupleMover {
460 static void move(A&& a, B& b)
461 {
462 std::get<i - 1>(b) = WTFMove(std::get<i - 1>(a));
463 TupleMover<i - 1, A, B>::move(WTFMove(a), b);
464 }
465};
466
467template<typename A, typename B> struct TupleMover<0, A, B> {
468 static void move(A&&, B&) { }
469};
470
471template<typename... A, typename... B>
472void moveTuple(std::tuple<A...>&& a, std::tuple<B...>& b)
473{
474 static_assert(sizeof...(A) == sizeof...(B), "Should be used with two tuples of same size");
475 TupleMover<sizeof...(A), std::tuple<A...>, std::tuple<B...>>::move(WTFMove(a), b);
476}
477
478template<typename T> bool Connection::sendSync(T&& message, typename T::Reply&& reply, uint64_t destinationID, Seconds timeout, OptionSet<SendSyncOption> sendSyncOptions)
479{
480 COMPILE_ASSERT(T::isSync, SyncMessageExpected);
481
482 uint64_t syncRequestID = 0;
483 std::unique_ptr<Encoder> encoder = createSyncMessageEncoder(T::receiverName(), T::name(), destinationID, syncRequestID);
484
485 if (sendSyncOptions.contains(SendSyncOption::UseFullySynchronousModeForTesting)) {
486 encoder->setFullySynchronousModeForTesting();
487 m_fullySynchronousModeIsAllowedForTesting = true;
488 }
489
490 // Encode the rest of the input arguments.
491 encoder->encode(message.arguments());
492
493 // Now send the message and wait for a reply.
494 std::unique_ptr<Decoder> replyDecoder = sendSyncMessage(syncRequestID, WTFMove(encoder), timeout, sendSyncOptions);
495 if (!replyDecoder)
496 return false;
497
498 // Decode the reply.
499 Optional<typename T::ReplyArguments> replyArguments;
500 *replyDecoder >> replyArguments;
501 if (!replyArguments)
502 return false;
503 moveTuple(WTFMove(*replyArguments), reply);
504 return true;
505}
506
507template<typename T> bool Connection::waitForAndDispatchImmediately(uint64_t destinationID, Seconds timeout, OptionSet<WaitForOption> waitForOptions)
508{
509 std::unique_ptr<Decoder> decoder = waitForMessage(T::receiverName(), T::name(), destinationID, timeout, waitForOptions);
510 if (!decoder)
511 return false;
512
513 ASSERT(decoder->destinationID() == destinationID);
514 m_client.didReceiveMessage(*this, *decoder);
515 return true;
516}
517
518} // namespace IPC
519