1 | /* |
2 | * Copyright (C) 2011, 2012 Google Inc. All rights reserved. |
3 | * |
4 | * Redistribution and use in source and binary forms, with or without |
5 | * modification, are permitted provided that the following conditions are |
6 | * met: |
7 | * |
8 | * * Redistributions of source code must retain the above copyright |
9 | * notice, this list of conditions and the following disclaimer. |
10 | * * Redistributions in binary form must reproduce the above |
11 | * copyright notice, this list of conditions and the following disclaimer |
12 | * in the documentation and/or other materials provided with the |
13 | * distribution. |
14 | * * Neither the name of Google Inc. nor the names of its |
15 | * contributors may be used to endorse or promote products derived from |
16 | * this software without specific prior written permission. |
17 | * |
18 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
19 | * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
20 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
21 | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
22 | * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
23 | * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
24 | * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
25 | * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
26 | * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
27 | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
28 | * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
29 | */ |
30 | |
31 | #include "config.h" |
32 | #include "WorkerThreadableWebSocketChannel.h" |
33 | |
34 | #include "Blob.h" |
35 | #include "Document.h" |
36 | #include "ScriptExecutionContext.h" |
37 | #include "SocketProvider.h" |
38 | #include "ThreadableWebSocketChannelClientWrapper.h" |
39 | #include "WebSocketChannel.h" |
40 | #include "WebSocketChannelClient.h" |
41 | #include "WorkerGlobalScope.h" |
42 | #include "WorkerLoaderProxy.h" |
43 | #include "WorkerRunLoop.h" |
44 | #include "WorkerThread.h" |
45 | #include <JavaScriptCore/ArrayBuffer.h> |
46 | #include <wtf/MainThread.h> |
47 | #include <wtf/text/WTFString.h> |
48 | |
49 | namespace WebCore { |
50 | |
51 | WorkerThreadableWebSocketChannel::WorkerThreadableWebSocketChannel(WorkerGlobalScope& context, WebSocketChannelClient& client, const String& taskMode, SocketProvider& provider) |
52 | : m_workerGlobalScope(context) |
53 | , m_workerClientWrapper(ThreadableWebSocketChannelClientWrapper::create(context, client)) |
54 | , m_bridge(Bridge::create(m_workerClientWrapper.copyRef(), m_workerGlobalScope.copyRef(), taskMode, provider)) |
55 | , m_socketProvider(provider) |
56 | { |
57 | m_bridge->initialize(); |
58 | } |
59 | |
60 | WorkerThreadableWebSocketChannel::~WorkerThreadableWebSocketChannel() |
61 | { |
62 | if (m_bridge) |
63 | m_bridge->disconnect(); |
64 | } |
65 | |
66 | void WorkerThreadableWebSocketChannel::connect(const URL& url, const String& protocol) |
67 | { |
68 | if (m_bridge) |
69 | m_bridge->connect(url, protocol); |
70 | } |
71 | |
72 | String WorkerThreadableWebSocketChannel::subprotocol() |
73 | { |
74 | return m_workerClientWrapper->subprotocol(); |
75 | } |
76 | |
77 | String WorkerThreadableWebSocketChannel::extensions() |
78 | { |
79 | return m_workerClientWrapper->extensions(); |
80 | } |
81 | |
82 | ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const String& message) |
83 | { |
84 | if (!m_bridge) |
85 | return ThreadableWebSocketChannel::SendFail; |
86 | return m_bridge->send(message); |
87 | } |
88 | |
89 | ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) |
90 | { |
91 | if (!m_bridge) |
92 | return ThreadableWebSocketChannel::SendFail; |
93 | return m_bridge->send(binaryData, byteOffset, byteLength); |
94 | } |
95 | |
96 | ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::send(Blob& binaryData) |
97 | { |
98 | if (!m_bridge) |
99 | return ThreadableWebSocketChannel::SendFail; |
100 | return m_bridge->send(binaryData); |
101 | } |
102 | |
103 | unsigned WorkerThreadableWebSocketChannel::bufferedAmount() const |
104 | { |
105 | if (!m_bridge) |
106 | return 0; |
107 | return m_bridge->bufferedAmount(); |
108 | } |
109 | |
110 | void WorkerThreadableWebSocketChannel::close(int code, const String& reason) |
111 | { |
112 | if (m_bridge) |
113 | m_bridge->close(code, reason); |
114 | } |
115 | |
116 | void WorkerThreadableWebSocketChannel::fail(const String& reason) |
117 | { |
118 | if (m_bridge) |
119 | m_bridge->fail(reason); |
120 | } |
121 | |
122 | void WorkerThreadableWebSocketChannel::disconnect() |
123 | { |
124 | m_bridge->disconnect(); |
125 | m_bridge = nullptr; |
126 | } |
127 | |
128 | void WorkerThreadableWebSocketChannel::suspend() |
129 | { |
130 | m_workerClientWrapper->suspend(); |
131 | if (m_bridge) |
132 | m_bridge->suspend(); |
133 | } |
134 | |
135 | void WorkerThreadableWebSocketChannel::resume() |
136 | { |
137 | m_workerClientWrapper->resume(); |
138 | if (m_bridge) |
139 | m_bridge->resume(); |
140 | } |
141 | |
142 | WorkerThreadableWebSocketChannel::Peer::Peer(Ref<ThreadableWebSocketChannelClientWrapper>&& clientWrapper, WorkerLoaderProxy& loaderProxy, ScriptExecutionContext& context, const String& taskMode, SocketProvider& provider) |
143 | : m_workerClientWrapper(WTFMove(clientWrapper)) |
144 | , m_loaderProxy(loaderProxy) |
145 | , m_mainWebSocketChannel(WebSocketChannel::create(downcast<Document>(context), *this, provider)) |
146 | , m_taskMode(taskMode) |
147 | { |
148 | ASSERT(isMainThread()); |
149 | } |
150 | |
151 | WorkerThreadableWebSocketChannel::Peer::~Peer() |
152 | { |
153 | ASSERT(isMainThread()); |
154 | if (m_mainWebSocketChannel) |
155 | m_mainWebSocketChannel->disconnect(); |
156 | } |
157 | |
158 | void WorkerThreadableWebSocketChannel::Peer::connect(const URL& url, const String& protocol) |
159 | { |
160 | ASSERT(isMainThread()); |
161 | if (!m_mainWebSocketChannel) |
162 | return; |
163 | m_mainWebSocketChannel->connect(url, protocol); |
164 | } |
165 | |
166 | void WorkerThreadableWebSocketChannel::Peer::send(const String& message) |
167 | { |
168 | ASSERT(isMainThread()); |
169 | if (!m_mainWebSocketChannel) |
170 | return; |
171 | |
172 | ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(message); |
173 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable { |
174 | workerClientWrapper->setSendRequestResult(sendRequestResult); |
175 | }, m_taskMode); |
176 | } |
177 | |
178 | void WorkerThreadableWebSocketChannel::Peer::send(const ArrayBuffer& binaryData) |
179 | { |
180 | ASSERT(isMainThread()); |
181 | if (!m_mainWebSocketChannel) |
182 | return; |
183 | |
184 | ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData, 0, binaryData.byteLength()); |
185 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable { |
186 | workerClientWrapper->setSendRequestResult(sendRequestResult); |
187 | }, m_taskMode); |
188 | } |
189 | |
190 | void WorkerThreadableWebSocketChannel::Peer::send(Blob& binaryData) |
191 | { |
192 | ASSERT(isMainThread()); |
193 | if (!m_mainWebSocketChannel) |
194 | return; |
195 | |
196 | ThreadableWebSocketChannel::SendResult sendRequestResult = m_mainWebSocketChannel->send(binaryData); |
197 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), sendRequestResult](ScriptExecutionContext&) mutable { |
198 | workerClientWrapper->setSendRequestResult(sendRequestResult); |
199 | }, m_taskMode); |
200 | } |
201 | |
202 | void WorkerThreadableWebSocketChannel::Peer::bufferedAmount() |
203 | { |
204 | ASSERT(isMainThread()); |
205 | if (!m_mainWebSocketChannel) |
206 | return; |
207 | |
208 | unsigned bufferedAmount = m_mainWebSocketChannel->bufferedAmount(); |
209 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), bufferedAmount](ScriptExecutionContext& context) mutable { |
210 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
211 | workerClientWrapper->setBufferedAmount(bufferedAmount); |
212 | }, m_taskMode); |
213 | } |
214 | |
215 | void WorkerThreadableWebSocketChannel::Peer::close(int code, const String& reason) |
216 | { |
217 | ASSERT(isMainThread()); |
218 | if (!m_mainWebSocketChannel) |
219 | return; |
220 | m_mainWebSocketChannel->close(code, reason); |
221 | } |
222 | |
223 | void WorkerThreadableWebSocketChannel::Peer::fail(const String& reason) |
224 | { |
225 | ASSERT(isMainThread()); |
226 | if (!m_mainWebSocketChannel) |
227 | return; |
228 | m_mainWebSocketChannel->fail(reason); |
229 | } |
230 | |
231 | void WorkerThreadableWebSocketChannel::Peer::disconnect() |
232 | { |
233 | ASSERT(isMainThread()); |
234 | if (!m_mainWebSocketChannel) |
235 | return; |
236 | m_mainWebSocketChannel->disconnect(); |
237 | m_mainWebSocketChannel = nullptr; |
238 | } |
239 | |
240 | void WorkerThreadableWebSocketChannel::Peer::suspend() |
241 | { |
242 | ASSERT(isMainThread()); |
243 | if (!m_mainWebSocketChannel) |
244 | return; |
245 | m_mainWebSocketChannel->suspend(); |
246 | } |
247 | |
248 | void WorkerThreadableWebSocketChannel::Peer::resume() |
249 | { |
250 | ASSERT(isMainThread()); |
251 | if (!m_mainWebSocketChannel) |
252 | return; |
253 | m_mainWebSocketChannel->resume(); |
254 | } |
255 | |
256 | void WorkerThreadableWebSocketChannel::Peer::didConnect() |
257 | { |
258 | ASSERT(isMainThread()); |
259 | |
260 | String subprotocol = m_mainWebSocketChannel->subprotocol(); |
261 | String extensions = m_mainWebSocketChannel->extensions(); |
262 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), subprotocol = subprotocol.isolatedCopy(), extensions = extensions.isolatedCopy()](ScriptExecutionContext& context) mutable { |
263 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
264 | workerClientWrapper->setSubprotocol(subprotocol); |
265 | workerClientWrapper->setExtensions(extensions); |
266 | workerClientWrapper->didConnect(); |
267 | }, m_taskMode); |
268 | } |
269 | |
270 | void WorkerThreadableWebSocketChannel::Peer::didReceiveMessage(const String& message) |
271 | { |
272 | ASSERT(isMainThread()); |
273 | |
274 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), message = message.isolatedCopy()](ScriptExecutionContext& context) mutable { |
275 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
276 | workerClientWrapper->didReceiveMessage(message); |
277 | }, m_taskMode); |
278 | } |
279 | |
280 | void WorkerThreadableWebSocketChannel::Peer::didReceiveBinaryData(Vector<uint8_t>&& binaryData) |
281 | { |
282 | ASSERT(isMainThread()); |
283 | |
284 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), binaryData = WTFMove(binaryData)](ScriptExecutionContext& context) mutable { |
285 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
286 | workerClientWrapper->didReceiveBinaryData(WTFMove(binaryData)); |
287 | }, m_taskMode); |
288 | } |
289 | |
290 | void WorkerThreadableWebSocketChannel::Peer::didUpdateBufferedAmount(unsigned bufferedAmount) |
291 | { |
292 | ASSERT(isMainThread()); |
293 | |
294 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), bufferedAmount](ScriptExecutionContext& context) mutable { |
295 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
296 | workerClientWrapper->didUpdateBufferedAmount(bufferedAmount); |
297 | }, m_taskMode); |
298 | } |
299 | |
300 | void WorkerThreadableWebSocketChannel::Peer::didStartClosingHandshake() |
301 | { |
302 | ASSERT(isMainThread()); |
303 | |
304 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable { |
305 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
306 | workerClientWrapper->didStartClosingHandshake(); |
307 | }, m_taskMode); |
308 | } |
309 | |
310 | void WorkerThreadableWebSocketChannel::Peer::didClose(unsigned unhandledBufferedAmount, ClosingHandshakeCompletionStatus closingHandshakeCompletion, unsigned short code, const String& reason) |
311 | { |
312 | ASSERT(isMainThread()); |
313 | m_mainWebSocketChannel = nullptr; |
314 | |
315 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef(), unhandledBufferedAmount, closingHandshakeCompletion, code, reason = reason.isolatedCopy()](ScriptExecutionContext& context) mutable { |
316 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
317 | workerClientWrapper->didClose(unhandledBufferedAmount, closingHandshakeCompletion, code, reason); |
318 | }, m_taskMode); |
319 | } |
320 | |
321 | void WorkerThreadableWebSocketChannel::Peer::didReceiveMessageError() |
322 | { |
323 | ASSERT(isMainThread()); |
324 | |
325 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable { |
326 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
327 | workerClientWrapper->didReceiveMessageError(); |
328 | }, m_taskMode); |
329 | } |
330 | |
331 | void WorkerThreadableWebSocketChannel::Peer::didUpgradeURL() |
332 | { |
333 | ASSERT(isMainThread()); |
334 | |
335 | m_loaderProxy.postTaskForModeToWorkerGlobalScope([workerClientWrapper = m_workerClientWrapper.copyRef()](ScriptExecutionContext& context) mutable { |
336 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
337 | workerClientWrapper->didUpgradeURL(); |
338 | }, m_taskMode); |
339 | } |
340 | |
341 | WorkerThreadableWebSocketChannel::Bridge::Bridge(Ref<ThreadableWebSocketChannelClientWrapper>&& workerClientWrapper, Ref<WorkerGlobalScope>&& workerGlobalScope, const String& taskMode, Ref<SocketProvider>&& socketProvider) |
342 | : m_workerClientWrapper(WTFMove(workerClientWrapper)) |
343 | , m_workerGlobalScope(WTFMove(workerGlobalScope)) |
344 | , m_loaderProxy(m_workerGlobalScope->thread().workerLoaderProxy()) |
345 | , m_taskMode(taskMode) |
346 | , m_socketProvider(WTFMove(socketProvider)) |
347 | { |
348 | } |
349 | |
350 | WorkerThreadableWebSocketChannel::Bridge::~Bridge() |
351 | { |
352 | disconnect(); |
353 | } |
354 | |
355 | void WorkerThreadableWebSocketChannel::Bridge::mainThreadInitialize(ScriptExecutionContext& context, WorkerLoaderProxy& loaderProxy, Ref<ThreadableWebSocketChannelClientWrapper>&& clientWrapper, const String& taskMode, Ref<SocketProvider>&& provider) |
356 | { |
357 | ASSERT(isMainThread()); |
358 | ASSERT(context.isDocument()); |
359 | |
360 | bool sent = loaderProxy.postTaskForModeToWorkerGlobalScope({ |
361 | ScriptExecutionContext::Task::CleanupTask, |
362 | [clientWrapper = clientWrapper.copyRef(), &loaderProxy, peer = std::make_unique<Peer>(clientWrapper.copyRef(), loaderProxy, context, taskMode, WTFMove(provider))](ScriptExecutionContext& context) mutable { |
363 | ASSERT_UNUSED(context, context.isWorkerGlobalScope()); |
364 | if (clientWrapper->failedWebSocketChannelCreation()) { |
365 | // If Bridge::initialize() quitted earlier, we need to kick mainThreadDestroy() to delete the peer. |
366 | loaderProxy.postTaskToLoader([peer = WTFMove(peer)](ScriptExecutionContext& context) { |
367 | ASSERT(isMainThread()); |
368 | ASSERT_UNUSED(context, context.isDocument()); |
369 | }); |
370 | } else |
371 | clientWrapper->didCreateWebSocketChannel(peer.release()); |
372 | } |
373 | }, taskMode); |
374 | |
375 | if (!sent) |
376 | clientWrapper->clearPeer(); |
377 | } |
378 | |
379 | void WorkerThreadableWebSocketChannel::Bridge::initialize() |
380 | { |
381 | ASSERT(!m_peer); |
382 | setMethodNotCompleted(); |
383 | Ref<Bridge> protectedThis(*this); |
384 | |
385 | m_loaderProxy.postTaskToLoader([&loaderProxy = m_loaderProxy, workerClientWrapper = m_workerClientWrapper.copyRef(), taskMode = m_taskMode.isolatedCopy(), provider = m_socketProvider.copyRef()](ScriptExecutionContext& context) mutable { |
386 | mainThreadInitialize(context, loaderProxy, WTFMove(workerClientWrapper), taskMode, WTFMove(provider)); |
387 | }); |
388 | waitForMethodCompletion(); |
389 | |
390 | // m_peer may be null when the nested runloop exited before a peer has created. |
391 | m_peer = m_workerClientWrapper->peer(); |
392 | if (!m_peer) |
393 | m_workerClientWrapper->setFailedWebSocketChannelCreation(); |
394 | } |
395 | |
396 | void WorkerThreadableWebSocketChannel::Bridge::connect(const URL& url, const String& protocol) |
397 | { |
398 | if (!m_peer) |
399 | return; |
400 | |
401 | m_loaderProxy.postTaskToLoader([peer = m_peer, url = url.isolatedCopy(), protocol = protocol.isolatedCopy()](ScriptExecutionContext& context) { |
402 | ASSERT(isMainThread()); |
403 | ASSERT_UNUSED(context, context.isDocument()); |
404 | ASSERT(peer); |
405 | |
406 | peer->connect(url, protocol); |
407 | }); |
408 | } |
409 | |
410 | ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const String& message) |
411 | { |
412 | if (!m_peer) |
413 | return ThreadableWebSocketChannel::SendFail; |
414 | setMethodNotCompleted(); |
415 | |
416 | m_loaderProxy.postTaskToLoader([peer = m_peer, message = message.isolatedCopy()](ScriptExecutionContext& context) { |
417 | ASSERT(isMainThread()); |
418 | ASSERT_UNUSED(context, context.isDocument()); |
419 | ASSERT(peer); |
420 | |
421 | peer->send(message); |
422 | }); |
423 | |
424 | Ref<Bridge> protectedThis(*this); |
425 | waitForMethodCompletion(); |
426 | return m_workerClientWrapper->sendRequestResult(); |
427 | } |
428 | |
429 | ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(const ArrayBuffer& binaryData, unsigned byteOffset, unsigned byteLength) |
430 | { |
431 | if (!m_peer) |
432 | return ThreadableWebSocketChannel::SendFail; |
433 | |
434 | // ArrayBuffer isn't thread-safe, hence the content of ArrayBuffer is copied into Vector<char>. |
435 | Vector<char> data(byteLength); |
436 | if (binaryData.byteLength()) |
437 | memcpy(data.data(), static_cast<const char*>(binaryData.data()) + byteOffset, byteLength); |
438 | setMethodNotCompleted(); |
439 | |
440 | m_loaderProxy.postTaskToLoader([peer = m_peer, data = WTFMove(data)](ScriptExecutionContext& context) { |
441 | ASSERT(isMainThread()); |
442 | ASSERT_UNUSED(context, context.isDocument()); |
443 | ASSERT(peer); |
444 | |
445 | auto arrayBuffer = ArrayBuffer::create(data.data(), data.size()); |
446 | peer->send(arrayBuffer); |
447 | }); |
448 | |
449 | Ref<Bridge> protectedThis(*this); |
450 | waitForMethodCompletion(); |
451 | return m_workerClientWrapper->sendRequestResult(); |
452 | } |
453 | |
454 | ThreadableWebSocketChannel::SendResult WorkerThreadableWebSocketChannel::Bridge::send(Blob& binaryData) |
455 | { |
456 | if (!m_peer) |
457 | return ThreadableWebSocketChannel::SendFail; |
458 | setMethodNotCompleted(); |
459 | |
460 | m_loaderProxy.postTaskToLoader([peer = m_peer, url = binaryData.url().isolatedCopy(), type = binaryData.type().isolatedCopy(), size = binaryData.size()](ScriptExecutionContext& context) { |
461 | ASSERT(isMainThread()); |
462 | ASSERT_UNUSED(context, context.isDocument()); |
463 | ASSERT(peer); |
464 | |
465 | peer->send(Blob::deserialize(url, type, size, { })); |
466 | }); |
467 | |
468 | Ref<Bridge> protectedThis(*this); |
469 | waitForMethodCompletion(); |
470 | return m_workerClientWrapper->sendRequestResult(); |
471 | } |
472 | |
473 | unsigned WorkerThreadableWebSocketChannel::Bridge::bufferedAmount() |
474 | { |
475 | if (!m_peer) |
476 | return 0; |
477 | setMethodNotCompleted(); |
478 | |
479 | m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) { |
480 | ASSERT(isMainThread()); |
481 | ASSERT_UNUSED(context, context.isDocument()); |
482 | ASSERT(peer); |
483 | |
484 | peer->bufferedAmount(); |
485 | }); |
486 | |
487 | Ref<Bridge> protectedThis(*this); |
488 | waitForMethodCompletion(); |
489 | return m_workerClientWrapper->bufferedAmount(); |
490 | } |
491 | |
492 | void WorkerThreadableWebSocketChannel::Bridge::close(int code, const String& reason) |
493 | { |
494 | if (!m_peer) |
495 | return; |
496 | |
497 | m_loaderProxy.postTaskToLoader([peer = m_peer, code, reason = reason.isolatedCopy()](ScriptExecutionContext& context) { |
498 | ASSERT(isMainThread()); |
499 | ASSERT_UNUSED(context, context.isDocument()); |
500 | ASSERT(peer); |
501 | |
502 | peer->close(code, reason); |
503 | }); |
504 | } |
505 | |
506 | void WorkerThreadableWebSocketChannel::Bridge::fail(const String& reason) |
507 | { |
508 | if (!m_peer) |
509 | return; |
510 | |
511 | m_loaderProxy.postTaskToLoader([peer = m_peer, reason = reason.isolatedCopy()](ScriptExecutionContext& context) { |
512 | ASSERT(isMainThread()); |
513 | ASSERT_UNUSED(context, context.isDocument()); |
514 | ASSERT(peer); |
515 | |
516 | peer->fail(reason); |
517 | }); |
518 | } |
519 | |
520 | void WorkerThreadableWebSocketChannel::Bridge::disconnect() |
521 | { |
522 | clearClientWrapper(); |
523 | if (m_peer) { |
524 | m_loaderProxy.postTaskToLoader([peer = std::unique_ptr<Peer>(m_peer)](ScriptExecutionContext& context) { |
525 | ASSERT(isMainThread()); |
526 | ASSERT_UNUSED(context, context.isDocument()); |
527 | }); |
528 | m_peer = nullptr; |
529 | } |
530 | m_workerGlobalScope = nullptr; |
531 | } |
532 | |
533 | void WorkerThreadableWebSocketChannel::Bridge::suspend() |
534 | { |
535 | if (!m_peer) |
536 | return; |
537 | |
538 | m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) { |
539 | ASSERT(isMainThread()); |
540 | ASSERT_UNUSED(context, context.isDocument()); |
541 | ASSERT(peer); |
542 | |
543 | peer->suspend(); |
544 | }); |
545 | } |
546 | |
547 | void WorkerThreadableWebSocketChannel::Bridge::resume() |
548 | { |
549 | if (!m_peer) |
550 | return; |
551 | |
552 | m_loaderProxy.postTaskToLoader([peer = m_peer](ScriptExecutionContext& context) { |
553 | ASSERT(isMainThread()); |
554 | ASSERT_UNUSED(context, context.isDocument()); |
555 | ASSERT(peer); |
556 | |
557 | peer->resume(); |
558 | }); |
559 | } |
560 | |
561 | void WorkerThreadableWebSocketChannel::Bridge::clearClientWrapper() |
562 | { |
563 | m_workerClientWrapper->clearClient(); |
564 | } |
565 | |
566 | void WorkerThreadableWebSocketChannel::Bridge::setMethodNotCompleted() |
567 | { |
568 | m_workerClientWrapper->clearSyncMethodDone(); |
569 | } |
570 | |
571 | // Caller of this function should hold a reference to the bridge, because this function may call WebSocket::didClose() in the end, |
572 | // which causes the bridge to get disconnected from the WebSocket and deleted if there is no other reference. |
573 | void WorkerThreadableWebSocketChannel::Bridge::waitForMethodCompletion() |
574 | { |
575 | if (!m_workerGlobalScope) |
576 | return; |
577 | WorkerRunLoop& runLoop = m_workerGlobalScope->thread().runLoop(); |
578 | MessageQueueWaitResult result = MessageQueueMessageReceived; |
579 | ThreadableWebSocketChannelClientWrapper* clientWrapper = m_workerClientWrapper.ptr(); |
580 | while (m_workerGlobalScope && clientWrapper && !clientWrapper->syncMethodDone() && result != MessageQueueTerminated) { |
581 | result = runLoop.runInMode(m_workerGlobalScope.get(), m_taskMode); // May cause this bridge to get disconnected, which makes m_workerGlobalScope become null. |
582 | clientWrapper = m_workerClientWrapper.ptr(); |
583 | } |
584 | } |
585 | |
586 | } // namespace WebCore |
587 | |