Files
pyscript/core/tests/python/tests/test_websocket.py
Nicholas Tollervey 9dad29ec17 Refactor repository. Fixes #2161 (#2192)
* Remove duplicate LICENSE.
* Remove un-userd pyscript.sw directory and its content.
* Remove ReadTheDocs settings (unused).
* Remove un-used pyproject.toml
* Remove now unused CHANGELOG. Changes now tracked via release notes on GitHub.
* Updated / cleaned release page template and associated GH actions.
* Update prettierignore to remove un-needed refs.
* Move troubleshooting into correct README.
* Add reason for the index.html
* Rename the "pyscript.core" directory to "core".
* Update PR template because CHANGELOG is no longer used.
* Codespell configuration in pyproject.toml.
* Update pyscript.core -> core in .githubignore
* Remove test-results/.last-run.json. This should be ignored by git.
* Pin nodejs version.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-09-30 10:29:26 +01:00

100 lines
2.6 KiB
Python

"""
Exercise the pyscript.Websocket class.
"""
import asyncio
from pyscript import WebSocket
async def test_websocket_with_attributes():
"""
Event handlers assigned via object attributes.
The Websocket class should be able to connect to a websocket server and
send and receive messages.
Use of echo.websocket.org means:
1) When connecting it responds with a "Request served by" message.
2) When sending a message it echos it back.
"""
connected_flag = False
closed_flag = False
messages = []
ready_to_test = asyncio.Event()
def on_open(event):
nonlocal connected_flag
connected_flag = True
ws.send("Hello, world!") # A message to echo.
def on_message(event):
messages.append(event.data)
if len(messages) == 2: # We're done.
ws.close()
def on_close(event):
nonlocal closed_flag
closed_flag = True
ready_to_test.set() # Finished!
ws = WebSocket(url="wss://echo.websocket.org")
ws.onopen = on_open
ws.onmessage = on_message
ws.onclose = on_close
# Wait for everything to be finished.
await ready_to_test.wait()
assert connected_flag is True
assert len(messages) == 2
assert "request served by" in messages[0].lower()
assert messages[1] == "Hello, world!"
assert closed_flag is True
async def test_websocket_with_init():
"""
Event handlers assigned via __init__ arguments.
The Websocket class should be able to connect to a websocket server and
send and receive messages.
Use of echo.websocket.org means:
1) When connecting it responds with a "Request served by" message.
2) When sending a message it echos it back.
"""
connected_flag = False
closed_flag = False
messages = []
ready_to_test = asyncio.Event()
def on_open(event):
nonlocal connected_flag
connected_flag = True
ws.send("Hello, world!") # A message to echo.
def on_message(event):
messages.append(event.data)
if len(messages) == 2: # We're done.
ws.close()
def on_close(event):
nonlocal closed_flag
closed_flag = True
ready_to_test.set() # Finished!
ws = WebSocket(
url="wss://echo.websocket.org",
onopen=on_open,
onmessage=on_message,
onclose=on_close,
)
# Wait for everything to be finished.
await ready_to_test.wait()
assert connected_flag is True
assert len(messages) == 2
assert "request served by" in messages[0].lower()
assert messages[1] == "Hello, world!"
assert closed_flag is True