mirror of
https://github.com/pyscript/pyscript.git
synced 2026-02-12 13:00:31 -05:00
pyscript API/docstring refactor with comprehensive tests (#2414)
* Revise display module. TODO: more comprehensive tests. Especially around mimebundles. * Markdown corrections in example code in display.py docstrings. * Minor adjustments and a much more comprehensive test-suite for the display module. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Updated docstring in __init__.py. * Remove unused imports and black-ify the source. * Refactor, docs and tests for the Event class in events.py. * Refactored, simplified and documented @when decorator. * Extensive test suite for @when decorator. * Documentation and minor refactoring of the fetch.py module. TODO: Check tests. * Refactored and more comprehensive tests for the fetch module. * Add/clarify Event related interactions. Thanks @Neon22 for the suggestion. * Refactor, document ffi.py module. * More complete passing tests for ffi.py. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add docstrings to flatted.py. Since this is actually an external(ish) module, tests for it should be in the external repository from which this code is derived. * Minor docstring cleanup in ffi.py. * Added docstrings and clarifications to fs.py. * Add very limited test suite for fs.py. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Rename magic_js.py to context.py, add comprehensive docstrings, and rename certain internal things for readability and comprehension. * Fix dict check in ffi.py. * Rename test_js_modules to test_context. * Fix test configuration aftert rename. * Docs and refactor of media.py. * Comprehensive tests for media.py. * Refactor and docstrings for storage.py * Appease the ruff gods. * Further storage.py changes and a more complete test suite for storage. * Refactor and docstrings for the util.py module. Fixed a problem with is_awaitable not handling async bound methods. * More comprehensive test suite for util.py. Updated to latest upytest. * A major refactoring, documenting and simplification of the web.py module substantially reducing it in size and complexity with only a few minor (edge) behavioural changes. Softly breaking changes include: - An element's classes are just a set. - An element's styles are just a dict. - Explicitly use `update_all` with ElementCollections (simpler and greater flexibility). - Extract a child element by id with `my_container["#an-id"]` * Updates and additions for a more comprehensive test suite for the web.py module. All code paths are exercised and checked. * Black tidy-ups in test suite. * Refactor and documentation for websocket.py module. * Tests for websocket.py. Disabled due to playwright flakiness, but they all pass in a local browser. * Refactor and documentation of workers.py module. * Added tests for workers.py module. Updated related test suite to account for the new named worker in the test HTML. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Refactor away remaining "is not None" not caught before. * Remove check-docstring-first because it interferes with the auto-generated documentation (where triple quoted strings are used to document module attributes). * Careful Markdown changes so the docstrings render properly in the PyScript docs. * Typo correction. * More typo corrections and clarifications. * Add clarification about SVG handling to _render_image docstring. * Add DOM event options to the @when decorator (with new tests to exercise this functionality). * Fixes default value for options if no options passed into @when. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
90ae3cea95
commit
a02ff691d2
@@ -23,6 +23,8 @@
|
||||
|
||||
<div id="div-no-classes"></div>
|
||||
|
||||
<script type="py" worker name="testworker" src="./worker_functions.py"></script>
|
||||
|
||||
<div style="visibility: hidden;">
|
||||
<h2>Test Read and Write</h2>
|
||||
<div id="test_rr_div">Content test_rr_div</div>
|
||||
@@ -63,6 +65,7 @@
|
||||
<button>I'm another button you can click</button>
|
||||
<button id="a-third-button">2 is better than 3 :)</button>
|
||||
<button id="another-test-button">I'm another button to be clicked</button>
|
||||
<button id="button-for-event-testing">Button for event testing</button>
|
||||
|
||||
<div id="element-append-tests"></div>
|
||||
<p class="collection"></p>
|
||||
|
||||
@@ -1,27 +1,26 @@
|
||||
{
|
||||
"files": {
|
||||
"https://raw.githubusercontent.com/ntoll/upytest/1.0.9/upytest.py": "",
|
||||
"https://raw.githubusercontent.com/ntoll/upytest/1.0.11/upytest.py": "",
|
||||
"./tests/test_config.py": "tests/test_config.py",
|
||||
"./tests/test_context.py": "tests/test_context.py",
|
||||
"./tests/test_current_target.py": "tests/test_current_target.py",
|
||||
"./tests/test_display.py": "tests/test_display.py",
|
||||
"./tests/test_document.py": "tests/test_document.py",
|
||||
"./tests/test_events.py": "tests/test_events.py",
|
||||
"./tests/test_fetch.py": "tests/test_fetch.py",
|
||||
"./tests/test_ffi.py": "tests/test_ffi.py",
|
||||
"./tests/test_js_modules.py": "tests/test_js_modules.py",
|
||||
"./tests/test_fs.py": "tests/test_fs.py",
|
||||
"./tests/test_media.py": "tests/test_media.py",
|
||||
"./tests/test_storage.py": "tests/test_storage.py",
|
||||
"./tests/test_util.py": "tests/test_util.py",
|
||||
"./tests/test_running_in_worker.py": "tests/test_running_in_worker.py",
|
||||
"./tests/test_web.py": "tests/test_web.py",
|
||||
"./tests/test_websocket.py": "tests/test_websocket.py",
|
||||
"./tests/test_events.py": "tests/test_events.py",
|
||||
"./tests/test_window.py": "tests/test_window.py"
|
||||
"./tests/test_window.py": "tests/test_window.py",
|
||||
"./tests/test_workers.py": "tests/test_workers.py"
|
||||
},
|
||||
"js_modules": {
|
||||
"main": {
|
||||
"./example_js_module.js": "greeting"
|
||||
},
|
||||
"worker": {
|
||||
"./example_js_worker_module.js": "greeting_worker"
|
||||
}
|
||||
"main": {"./example_js_module.js": "greeting"},
|
||||
"worker": {"./example_js_worker_module.js": "greeting_worker"}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,29 +1,28 @@
|
||||
{
|
||||
"files": {
|
||||
"https://raw.githubusercontent.com/ntoll/upytest/1.0.9/upytest.py": "",
|
||||
"https://raw.githubusercontent.com/ntoll/upytest/1.0.11/upytest.py": "",
|
||||
"./tests/test_config.py": "tests/test_config.py",
|
||||
"./tests/test_context.py": "tests/test_context.py",
|
||||
"./tests/test_current_target.py": "tests/test_current_target.py",
|
||||
"./tests/test_display.py": "tests/test_display.py",
|
||||
"./tests/test_document.py": "tests/test_document.py",
|
||||
"./tests/test_events.py": "tests/test_events.py",
|
||||
"./tests/test_fetch.py": "tests/test_fetch.py",
|
||||
"./tests/test_ffi.py": "tests/test_ffi.py",
|
||||
"./tests/test_fs.py": "tests/test_fs.py",
|
||||
"./tests/test_media.py": "tests/test_media.py",
|
||||
"./tests/test_js_modules.py": "tests/test_js_modules.py",
|
||||
"./tests/test_storage.py": "tests/test_storage.py",
|
||||
"./tests/test_util.py": "tests/test_util.py",
|
||||
"./tests/test_running_in_worker.py": "tests/test_running_in_worker.py",
|
||||
"./tests/test_web.py": "tests/test_web.py",
|
||||
"./tests/test_websocket.py": "tests/test_websocket.py",
|
||||
"./tests/test_events.py": "tests/test_events.py",
|
||||
"./tests/test_window.py": "tests/test_window.py"
|
||||
"./tests/test_window.py": "tests/test_window.py",
|
||||
"./tests/test_workers.py": "tests/test_workers.py"
|
||||
},
|
||||
"js_modules": {
|
||||
"main": {
|
||||
"./example_js_module.js": "greeting"
|
||||
},
|
||||
"worker": {
|
||||
"./example_js_worker_module.js": "greeting_worker"
|
||||
}
|
||||
"main": {"./example_js_module.js": "greeting"},
|
||||
"worker": {"./example_js_worker_module.js": "greeting_worker"}
|
||||
},
|
||||
"packages": ["Pillow" ],
|
||||
"packages": ["Pillow"],
|
||||
"experimental_ffi_timeout": 0
|
||||
}
|
||||
|
||||
@@ -13,7 +13,7 @@ def test_current_target():
|
||||
"""
|
||||
expected = "py-0"
|
||||
if is_micropython:
|
||||
expected = "mpy-w0-target" if RUNNING_IN_WORKER else "mpy-0"
|
||||
expected = "mpy-w1-target" if RUNNING_IN_WORKER else "mpy-0"
|
||||
elif RUNNING_IN_WORKER:
|
||||
expected = "py-w0-target"
|
||||
expected = "py-w1-target"
|
||||
assert current_target() == expected, f"Expected {expected} got {current_target()}"
|
||||
|
||||
@@ -3,6 +3,7 @@ Tests for the display function in PyScript.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import upytest
|
||||
from pyscript import HTML, RUNNING_IN_WORKER, display, py_import, web
|
||||
@@ -107,20 +108,7 @@ def test_empty_string_target_raises_value_error():
|
||||
"""
|
||||
with upytest.raises(ValueError) as exc:
|
||||
display("hello world", target="")
|
||||
assert str(exc.exception) == "Cannot have an empty target"
|
||||
|
||||
|
||||
def test_non_string_target_values_raise_typerror():
|
||||
"""
|
||||
The target parameter must be a string.
|
||||
"""
|
||||
with upytest.raises(TypeError) as exc:
|
||||
display("hello world", target=True)
|
||||
assert str(exc.exception) == "target must be str or None, not bool"
|
||||
|
||||
with upytest.raises(TypeError) as exc:
|
||||
display("hello world", target=123)
|
||||
assert str(exc.exception) == "target must be str or None, not int"
|
||||
assert str(exc.exception) == "Cannot find element with id='' in the page."
|
||||
|
||||
|
||||
async def test_tag_target_attribute():
|
||||
@@ -286,4 +274,406 @@ async def test_image_renders_correctly():
|
||||
display(img, target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
assert img.src.startswith("data:image/png;charset=utf-8;base64")
|
||||
assert img.src.startswith("data:image/png;base64"), img.src
|
||||
|
||||
|
||||
async def test_mimebundle_simple():
|
||||
"""
|
||||
An object with _repr_mimebundle_ should use the mimebundle formats.
|
||||
"""
|
||||
|
||||
class MimebundleObj:
|
||||
def _repr_mimebundle_(self):
|
||||
return {
|
||||
"text/html": "<strong>Bold HTML</strong>",
|
||||
"text/plain": "Plain text fallback",
|
||||
}
|
||||
|
||||
display(MimebundleObj())
|
||||
container = await get_display_container()
|
||||
# Should prefer HTML from mimebundle.
|
||||
assert container[0].innerHTML == "<strong>Bold HTML</strong>"
|
||||
|
||||
|
||||
async def test_mimebundle_with_metadata():
|
||||
"""
|
||||
Mimebundle can include metadata for specific MIME types.
|
||||
"""
|
||||
|
||||
class ImageWithMeta:
|
||||
def _repr_mimebundle_(self):
|
||||
return (
|
||||
{
|
||||
"image/png": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="
|
||||
},
|
||||
{"image/png": {"width": "100", "height": "50"}},
|
||||
)
|
||||
|
||||
display(ImageWithMeta(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
assert img.getAttribute("width") == "100"
|
||||
assert img.getAttribute("height") == "50"
|
||||
|
||||
|
||||
async def test_mimebundle_with_tuple_output():
|
||||
"""
|
||||
Mimebundle format values can be tuples with (data, metadata).
|
||||
"""
|
||||
|
||||
class TupleOutput:
|
||||
def _repr_mimebundle_(self):
|
||||
return {"text/html": ("<em>Italic</em>", {"custom": "meta"})}
|
||||
|
||||
display(TupleOutput())
|
||||
container = await get_display_container()
|
||||
assert container[0].innerHTML == "<em>Italic</em>"
|
||||
|
||||
|
||||
async def test_mimebundle_metadata_merge():
|
||||
"""
|
||||
Format-specific metadata should merge with global metadata.
|
||||
"""
|
||||
|
||||
class MetaMerge:
|
||||
def _repr_mimebundle_(self):
|
||||
return (
|
||||
{
|
||||
"image/png": (
|
||||
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==",
|
||||
{"height": "75"},
|
||||
)
|
||||
},
|
||||
{"image/png": {"width": "100"}},
|
||||
)
|
||||
|
||||
display(MetaMerge(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
# Both global and format-specific metadata should be present.
|
||||
assert img.getAttribute("width") == "100"
|
||||
assert img.getAttribute("height") == "75"
|
||||
|
||||
|
||||
async def test_mimebundle_unsupported_mime():
|
||||
"""
|
||||
If mimebundle contains only unsupported MIME types, fall back to regular methods.
|
||||
"""
|
||||
|
||||
class UnsupportedMime:
|
||||
def _repr_mimebundle_(self):
|
||||
return {"application/pdf": "PDF data", "text/latex": "LaTeX data"}
|
||||
|
||||
def _repr_html_(self):
|
||||
return "<p>HTML fallback</p>"
|
||||
|
||||
display(UnsupportedMime())
|
||||
container = await get_display_container()
|
||||
# Should fall back to _repr_html_.
|
||||
assert container[0].innerHTML == "<p>HTML fallback</p>"
|
||||
|
||||
|
||||
async def test_mimebundle_no_dict():
|
||||
"""
|
||||
Mimebundle that returns just a dict (no tuple) should work.
|
||||
"""
|
||||
|
||||
class SimpleMimebundle:
|
||||
def _repr_mimebundle_(self):
|
||||
return {"text/html": "<code>Code</code>"}
|
||||
|
||||
display(SimpleMimebundle())
|
||||
container = await get_display_container()
|
||||
assert container[0].innerHTML == "<code>Code</code>"
|
||||
|
||||
|
||||
async def test_repr_html():
|
||||
"""
|
||||
Objects with _repr_html_ should render as HTML.
|
||||
"""
|
||||
|
||||
class HTMLRepr:
|
||||
def _repr_html_(self):
|
||||
return "<h1>HTML Header</h1>"
|
||||
|
||||
display(HTMLRepr())
|
||||
container = await get_display_container()
|
||||
assert container[0].innerHTML == "<h1>HTML Header</h1>"
|
||||
|
||||
|
||||
async def test_repr_html_with_metadata():
|
||||
"""
|
||||
_repr_html_ can return (html, metadata) tuple.
|
||||
"""
|
||||
|
||||
class HTMLWithMeta:
|
||||
def _repr_html_(self):
|
||||
return ("<p>Paragraph</p>", {"data-custom": "value"})
|
||||
|
||||
display(HTMLWithMeta())
|
||||
container = await get_display_container()
|
||||
# Metadata is not used in _repr_html_ rendering, but ensure HTML is
|
||||
# correct.
|
||||
assert container[0].innerHTML == "<p>Paragraph</p>"
|
||||
|
||||
|
||||
async def test_repr_svg():
|
||||
"""
|
||||
Objects with _repr_svg_ should render as SVG.
|
||||
"""
|
||||
|
||||
class SVGRepr:
|
||||
def _repr_svg_(self):
|
||||
return '<svg width="100" height="100"><circle cx="50" cy="50" r="40" fill="blue"/></svg>'
|
||||
|
||||
display(SVGRepr(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
assert "svg" in target.innerHTML.lower()
|
||||
assert "circle" in target.innerHTML.lower()
|
||||
|
||||
|
||||
async def test_repr_json():
|
||||
"""
|
||||
Objects with _repr_json_ should render as JSON.
|
||||
"""
|
||||
|
||||
class JSONRepr:
|
||||
def _repr_json_(self):
|
||||
return '{"key": "value", "number": 42}'
|
||||
|
||||
display(JSONRepr(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
assert '"key": "value"' in target.innerHTML
|
||||
value = json.loads(target.innerText)
|
||||
assert value["key"] == "value"
|
||||
assert value["number"] == 42
|
||||
|
||||
|
||||
async def test_repr_png_bytes():
|
||||
"""
|
||||
_repr_png_ can render raw bytes.
|
||||
"""
|
||||
|
||||
class PNGBytes:
|
||||
def _repr_png_(self):
|
||||
# Valid 1x1 transparent PNG as bytes.
|
||||
return b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01\r\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82"
|
||||
|
||||
display(PNGBytes(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
assert img.src.startswith("data:image/png;base64,")
|
||||
|
||||
|
||||
async def test_repr_png_base64():
|
||||
"""
|
||||
_repr_png_ can render a base64-encoded string.
|
||||
"""
|
||||
|
||||
class PNGBase64:
|
||||
def _repr_png_(self):
|
||||
return "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="
|
||||
|
||||
display(PNGBase64(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
assert img.src.startswith("data:image/png;base64,")
|
||||
|
||||
|
||||
async def test_repr_jpeg():
|
||||
"""
|
||||
Objects with _repr_jpeg_ should render as JPEG images.
|
||||
"""
|
||||
|
||||
class JPEGRepr:
|
||||
def _repr_jpeg_(self):
|
||||
# Minimal valid JPEG header (won't display but tests the path).
|
||||
return b"\xff\xd8\xff\xe0\x00\x10JFIF"
|
||||
|
||||
display(JPEGRepr(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
assert img.src.startswith("data:image/jpeg;base64,")
|
||||
|
||||
|
||||
async def test_repr_jpeg_base64():
|
||||
"""
|
||||
_repr_jpeg_ can render a base64-encoded string.
|
||||
"""
|
||||
|
||||
class JPEGBase64:
|
||||
def _repr_jpeg_(self):
|
||||
return "ZCBqcGVnIG1pbmltdW0=="
|
||||
|
||||
display(JPEGBase64(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
img = target.find("img")[0]
|
||||
assert img.src.startswith("data:image/jpeg;base64,")
|
||||
|
||||
|
||||
async def test_object_with_no_repr_methods():
|
||||
"""
|
||||
Objects with no representation methods should fall back to __repr__ with warning.
|
||||
"""
|
||||
|
||||
class NoReprMethods:
|
||||
pass
|
||||
|
||||
obj = NoReprMethods()
|
||||
display(obj)
|
||||
container = await get_display_container()
|
||||
# Should contain the default repr output - the class name. :-)
|
||||
assert "NoReprMethods" in container.innerText
|
||||
|
||||
|
||||
async def test_repr_method_returns_none():
|
||||
"""
|
||||
If a repr method exists but returns None, try next method.
|
||||
"""
|
||||
|
||||
class NoneReturner:
|
||||
def _repr_html_(self):
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return "Fallback repr"
|
||||
|
||||
display(NoneReturner())
|
||||
container = await get_display_container()
|
||||
assert container.innerText == "Fallback repr"
|
||||
|
||||
|
||||
async def test_multiple_repr_methods_priority():
|
||||
"""
|
||||
When multiple repr methods exist, should use first available in priority order.
|
||||
"""
|
||||
|
||||
class MultipleReprs:
|
||||
def _repr_html_(self):
|
||||
# Highest priority.
|
||||
return "<p>HTML version</p>"
|
||||
|
||||
def __repr__(self):
|
||||
# Lower priority.
|
||||
return "Text version"
|
||||
|
||||
display(MultipleReprs())
|
||||
container = await get_display_container()
|
||||
# Should use HTML, not repr.
|
||||
assert container[0].innerHTML == "<p>HTML version</p>"
|
||||
|
||||
|
||||
async def test_empty_string_display():
|
||||
"""
|
||||
Empty strings are ignored.
|
||||
"""
|
||||
display("")
|
||||
container = await get_display_container()
|
||||
assert len(container.children) == 0
|
||||
|
||||
|
||||
async def test_newline_string_skipped():
|
||||
"""
|
||||
Single newline strings are skipped (legacy behavior).
|
||||
"""
|
||||
display("\n")
|
||||
container = await get_display_container()
|
||||
# Should be empty because newlines are skipped.
|
||||
assert len(container.children) == 0
|
||||
|
||||
|
||||
async def test_string_with_special_html_chars():
|
||||
"""
|
||||
Strings with HTML special characters should be escaped.
|
||||
"""
|
||||
display("<script>alert('xss')</script>")
|
||||
container = await get_display_container()
|
||||
assert "<script>" in container[0].innerHTML
|
||||
assert "<script>" not in container[0].innerHTML
|
||||
|
||||
|
||||
async def test_javascript_mime_type():
|
||||
"""
|
||||
JavaScript MIME type should create script tags.
|
||||
"""
|
||||
|
||||
class JSRepr:
|
||||
def _repr_javascript_(self):
|
||||
return "console.log('test');"
|
||||
|
||||
display(JSRepr(), target="test-element-container", append=False)
|
||||
target = web.page.find("#test-element-container")[0]
|
||||
assert "<script>" in target.innerHTML
|
||||
assert "console.log" in target.innerHTML
|
||||
|
||||
|
||||
async def test_append_false_clears_multiple_children():
|
||||
"""
|
||||
append=False should clear all existing children, not just the last one.
|
||||
"""
|
||||
# Add some initial content.
|
||||
display("child 1")
|
||||
display("child 2")
|
||||
display("child 3")
|
||||
container = await get_display_container()
|
||||
assert len(container.children) == 3 # three divs.
|
||||
|
||||
# Now display with append=False.
|
||||
display("new content", append=False)
|
||||
container = await get_display_container()
|
||||
# No divs used, just the new textual content.
|
||||
assert container.innerText == "new content"
|
||||
|
||||
|
||||
async def test_mixed_append_true_false():
|
||||
"""
|
||||
Mixing append=True and append=False should work correctly.
|
||||
"""
|
||||
display("first", append=True)
|
||||
display("second", append=True)
|
||||
display("third", append=False)
|
||||
container = await get_display_container()
|
||||
assert container.innerText == "third"
|
||||
|
||||
|
||||
def test_target_with_multiple_hashes():
|
||||
"""
|
||||
Target with multiple # characters should only strip the first one.
|
||||
|
||||
Such an id is not valid in HTML, but we should handle it gracefully.
|
||||
"""
|
||||
# Should try to find element with id="#weird-id".
|
||||
# This will raise ValueError as it doesn't exist.
|
||||
with upytest.raises(ValueError):
|
||||
display("content", target="##weird-id")
|
||||
|
||||
|
||||
async def test_display_none_value():
|
||||
"""
|
||||
Displaying None should use its repr.
|
||||
"""
|
||||
display(None)
|
||||
container = await get_display_container()
|
||||
assert container.innerText == "None"
|
||||
|
||||
|
||||
async def test_display_boolean_values():
|
||||
"""
|
||||
Booleans should display as their repr.
|
||||
"""
|
||||
display(True, False)
|
||||
container = await get_display_container()
|
||||
assert "True" in container.innerText
|
||||
assert "False" in container.innerText
|
||||
|
||||
|
||||
async def test_display_numbers():
|
||||
"""
|
||||
Numbers should display correctly.
|
||||
"""
|
||||
display(42, 3.14159, -17)
|
||||
container = await get_display_container()
|
||||
text = container.innerText
|
||||
assert "42" in text
|
||||
assert "3.14159" in text
|
||||
assert "-17" in text
|
||||
|
||||
@@ -24,15 +24,26 @@ def teardown():
|
||||
|
||||
def test_event_add_listener():
|
||||
"""
|
||||
Adding a listener to an event should add it to the list of listeners. It
|
||||
should only be added once.
|
||||
Adding a listener to an event should add it to the list of listeners.
|
||||
It should only be added once.
|
||||
"""
|
||||
event = Event()
|
||||
listener = lambda x: x
|
||||
event.add_listener(listener)
|
||||
event.add_listener(listener)
|
||||
assert len(event._listeners) == 1 # Only one item added.
|
||||
assert listener in event._listeners # The item is the expected listener.
|
||||
assert len(event._listeners) == 1
|
||||
assert listener in event._listeners
|
||||
|
||||
|
||||
def test_event_add_non_callable_listener():
|
||||
"""
|
||||
Adding a non-callable listener should raise a ValueError.
|
||||
"""
|
||||
event = Event()
|
||||
with upytest.raises(ValueError):
|
||||
event.add_listener("not a callable")
|
||||
with upytest.raises(ValueError):
|
||||
event.add_listener(123)
|
||||
|
||||
|
||||
def test_event_remove_listener():
|
||||
@@ -45,12 +56,25 @@ def test_event_remove_listener():
|
||||
listener2 = lambda x: x
|
||||
event.add_listener(listener1)
|
||||
event.add_listener(listener2)
|
||||
assert len(event._listeners) == 2 # Two listeners added.
|
||||
assert listener1 in event._listeners # The first listener is in the list.
|
||||
assert listener2 in event._listeners # The second listener is in the list.
|
||||
assert len(event._listeners) == 2
|
||||
assert listener1 in event._listeners
|
||||
assert listener2 in event._listeners
|
||||
event.remove_listener(listener1)
|
||||
assert len(event._listeners) == 1 # Only one item remains.
|
||||
assert listener2 in event._listeners # The second listener is in the list.
|
||||
assert len(event._listeners) == 1
|
||||
assert listener2 in event._listeners
|
||||
|
||||
|
||||
def test_event_remove_nonexistent_listener():
|
||||
"""
|
||||
Removing a listener that doesn't exist should be silently ignored.
|
||||
"""
|
||||
event = Event()
|
||||
listener1 = lambda x: x
|
||||
listener2 = lambda x: x
|
||||
event.add_listener(listener1)
|
||||
event.remove_listener(listener2)
|
||||
assert len(event._listeners) == 1
|
||||
assert listener1 in event._listeners
|
||||
|
||||
|
||||
def test_event_remove_all_listeners():
|
||||
@@ -62,15 +86,15 @@ def test_event_remove_all_listeners():
|
||||
listener2 = lambda x: x
|
||||
event.add_listener(listener1)
|
||||
event.add_listener(listener2)
|
||||
assert len(event._listeners) == 2 # Two listeners added.
|
||||
assert len(event._listeners) == 2
|
||||
event.remove_listener()
|
||||
assert len(event._listeners) == 0 # No listeners remain.
|
||||
assert len(event._listeners) == 0
|
||||
|
||||
|
||||
def test_event_trigger():
|
||||
"""
|
||||
Triggering an event should call all of the listeners with the provided
|
||||
arguments.
|
||||
result.
|
||||
"""
|
||||
event = Event()
|
||||
counter = 0
|
||||
@@ -81,15 +105,45 @@ def test_event_trigger():
|
||||
assert x == "ok"
|
||||
|
||||
event.add_listener(listener)
|
||||
assert counter == 0 # The listener has not been triggered yet.
|
||||
assert counter == 0
|
||||
event.trigger("ok")
|
||||
assert counter == 1 # The listener has been triggered with the expected result.
|
||||
assert counter == 1
|
||||
|
||||
|
||||
def test_event_trigger_no_listeners():
|
||||
"""
|
||||
Triggering an event with no listeners should not raise an error.
|
||||
"""
|
||||
event = Event()
|
||||
event.trigger("test")
|
||||
|
||||
|
||||
def test_event_trigger_multiple_listeners():
|
||||
"""
|
||||
Triggering an event should call all registered listeners.
|
||||
"""
|
||||
event = Event()
|
||||
results = []
|
||||
|
||||
def listener1(x):
|
||||
results.append(f"listener1: {x}")
|
||||
|
||||
def listener2(x):
|
||||
results.append(f"listener2: {x}")
|
||||
|
||||
event.add_listener(listener1)
|
||||
event.add_listener(listener2)
|
||||
event.trigger("test")
|
||||
|
||||
assert len(results) == 2
|
||||
assert "listener1: test" in results
|
||||
assert "listener2: test" in results
|
||||
|
||||
|
||||
async def test_event_trigger_with_awaitable():
|
||||
"""
|
||||
Triggering an event with an awaitable listener should call the listener
|
||||
with the provided arguments.
|
||||
with the provided result.
|
||||
"""
|
||||
call_flag = asyncio.Event()
|
||||
event = Event()
|
||||
@@ -102,16 +156,119 @@ async def test_event_trigger_with_awaitable():
|
||||
call_flag.set()
|
||||
|
||||
event.add_listener(listener)
|
||||
assert counter == 0 # The listener has not been triggered yet.
|
||||
assert counter == 0
|
||||
event.trigger("ok")
|
||||
await call_flag.wait()
|
||||
assert counter == 1 # The listener has been triggered with the expected result.
|
||||
assert counter == 1
|
||||
|
||||
|
||||
async def test_event_trigger_mixed_listeners():
|
||||
"""
|
||||
Triggering an event with both sync and async listeners should work.
|
||||
"""
|
||||
event = Event()
|
||||
sync_called = False
|
||||
async_flag = asyncio.Event()
|
||||
|
||||
def sync_listener(x):
|
||||
nonlocal sync_called
|
||||
sync_called = True
|
||||
assert x == "mixed"
|
||||
|
||||
async def async_listener(x):
|
||||
assert x == "mixed"
|
||||
async_flag.set()
|
||||
|
||||
event.add_listener(sync_listener)
|
||||
event.add_listener(async_listener)
|
||||
event.trigger("mixed")
|
||||
|
||||
assert sync_called
|
||||
await async_flag.wait()
|
||||
|
||||
|
||||
def test_event_listener_exception():
|
||||
"""
|
||||
If a listener raises an exception, it should propagate and not be
|
||||
silently ignored.
|
||||
"""
|
||||
event = Event()
|
||||
|
||||
def bad_listener(x):
|
||||
raise RuntimeError("Listener error")
|
||||
|
||||
event.add_listener(bad_listener)
|
||||
|
||||
with upytest.raises(RuntimeError):
|
||||
event.trigger("test")
|
||||
|
||||
|
||||
def test_event_listener_exception_stops_other_listeners():
|
||||
"""
|
||||
If a listener raises an exception, subsequent listeners should not be
|
||||
called. There's a problem with the user's code that needs to be addressed!
|
||||
"""
|
||||
event = Event()
|
||||
called = []
|
||||
|
||||
def listener1(x):
|
||||
called.append("listener1")
|
||||
|
||||
def bad_listener(x):
|
||||
called.append("bad_listener")
|
||||
raise RuntimeError("Listener error")
|
||||
|
||||
def listener3(x):
|
||||
called.append("listener3")
|
||||
|
||||
event.add_listener(listener1)
|
||||
event.add_listener(bad_listener)
|
||||
event.add_listener(listener3)
|
||||
|
||||
with upytest.raises(RuntimeError):
|
||||
event.trigger("test")
|
||||
|
||||
assert "listener1" in called
|
||||
assert "bad_listener" in called
|
||||
assert "listener3" not in called
|
||||
|
||||
|
||||
async def test_event_async_listener_exception():
|
||||
"""
|
||||
If an async listener raises an exception, it cannot prevent other
|
||||
listeners from being called, as async listeners run as tasks. This
|
||||
is different behavior from sync listeners, but the simplest model
|
||||
for users to understand.
|
||||
|
||||
This test ensures that even if one async listener fails, others
|
||||
still run as per this expected behaviour. In MicroPython, the
|
||||
exception will be reported to the user.
|
||||
"""
|
||||
event = Event()
|
||||
call_flag = asyncio.Event()
|
||||
called = []
|
||||
|
||||
async def bad_listener(x):
|
||||
called.append("bad_listener")
|
||||
raise RuntimeError("Async listener error")
|
||||
|
||||
async def good_listener(x):
|
||||
called.append("good_listener")
|
||||
call_flag.set()
|
||||
|
||||
event.add_listener(bad_listener)
|
||||
event.add_listener(good_listener)
|
||||
event.trigger("test")
|
||||
|
||||
await call_flag.wait()
|
||||
assert "bad_listener" in called
|
||||
assert "good_listener" in called
|
||||
|
||||
|
||||
async def test_when_decorator_with_event():
|
||||
"""
|
||||
When the decorated function takes a single parameter,
|
||||
it should be passed the event object.
|
||||
When the decorated function takes a single parameter, it should be
|
||||
passed the event object.
|
||||
"""
|
||||
btn = web.button("foo_button", id="foo_id")
|
||||
container = get_container()
|
||||
@@ -133,8 +290,8 @@ async def test_when_decorator_with_event():
|
||||
|
||||
async def test_when_decorator_without_event():
|
||||
"""
|
||||
When the decorated function takes no parameters (not including 'self'),
|
||||
it should be called without the event object.
|
||||
When the decorated function takes no parameters, it should be called
|
||||
without the event object.
|
||||
"""
|
||||
btn = web.button("foo_button", id="foo_id")
|
||||
container = get_container()
|
||||
@@ -143,7 +300,7 @@ async def test_when_decorator_without_event():
|
||||
called = False
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
@web.when("click", selector="#foo_id")
|
||||
@when("click", selector="#foo_id")
|
||||
def foo():
|
||||
nonlocal called
|
||||
called = True
|
||||
@@ -156,8 +313,8 @@ async def test_when_decorator_without_event():
|
||||
|
||||
async def test_when_decorator_with_event_as_async_handler():
|
||||
"""
|
||||
When the decorated function takes a single parameter,
|
||||
it should be passed the event object. Async version.
|
||||
When the decorated function takes a single parameter, it should be
|
||||
passed the event object. Async version.
|
||||
"""
|
||||
btn = web.button("foo_button", id="foo_id")
|
||||
container = get_container()
|
||||
@@ -179,8 +336,8 @@ async def test_when_decorator_with_event_as_async_handler():
|
||||
|
||||
async def test_when_decorator_without_event_as_async_handler():
|
||||
"""
|
||||
When the decorated function takes no parameters (not including 'self'),
|
||||
it should be called without the event object. Async version.
|
||||
When the decorated function takes no parameters, it should be called
|
||||
without the event object. Async version.
|
||||
"""
|
||||
btn = web.button("foo_button", id="foo_id")
|
||||
container = get_container()
|
||||
@@ -189,7 +346,7 @@ async def test_when_decorator_without_event_as_async_handler():
|
||||
called = False
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
@web.when("click", selector="#foo_id")
|
||||
@when("click", selector="#foo_id")
|
||||
async def foo():
|
||||
nonlocal called
|
||||
called = True
|
||||
@@ -202,7 +359,7 @@ async def test_when_decorator_without_event_as_async_handler():
|
||||
|
||||
async def test_two_when_decorators():
|
||||
"""
|
||||
When decorating a function twice, both should function
|
||||
When decorating a function twice, both should function independently.
|
||||
"""
|
||||
btn = web.button("foo_button", id="foo_id")
|
||||
container = get_container()
|
||||
@@ -235,7 +392,7 @@ async def test_two_when_decorators():
|
||||
async def test_when_decorator_multiple_elements():
|
||||
"""
|
||||
The @when decorator's selector should successfully select multiple
|
||||
DOM elements
|
||||
DOM elements.
|
||||
"""
|
||||
btn1 = web.button(
|
||||
"foo_button1",
|
||||
@@ -283,7 +440,8 @@ async def test_when_decorator_multiple_elements():
|
||||
)
|
||||
def test_when_decorator_invalid_selector():
|
||||
"""
|
||||
When the selector parameter of @when is invalid, it should raise an error.
|
||||
When the selector parameter of @when is invalid, it should raise an
|
||||
error.
|
||||
"""
|
||||
if upytest.is_micropython:
|
||||
from jsffi import JsException
|
||||
@@ -298,139 +456,320 @@ def test_when_decorator_invalid_selector():
|
||||
assert "'#.bad' is not a valid selector" in str(e.exception), str(e.exception)
|
||||
|
||||
|
||||
def test_when_missing_selector_for_dom_event():
|
||||
"""
|
||||
When using @when with a DOM event but no selector, should raise
|
||||
ValueError.
|
||||
"""
|
||||
with upytest.raises(ValueError):
|
||||
|
||||
@when("click")
|
||||
def handler(event):
|
||||
pass
|
||||
|
||||
|
||||
def test_when_empty_selector_finds_no_elements():
|
||||
"""
|
||||
When selector matches no elements, should raise ValueError.
|
||||
"""
|
||||
with upytest.raises(ValueError):
|
||||
|
||||
@when("click", "#nonexistent-element-id-12345")
|
||||
def handler(event):
|
||||
pass
|
||||
|
||||
|
||||
def test_when_decorates_an_event():
|
||||
"""
|
||||
When the @when decorator is used on a function to handle an Event instance,
|
||||
the function should be called when the Event object is triggered.
|
||||
When the @when decorator is used on a function to handle an Event
|
||||
instance, the function should be called when the Event object is
|
||||
triggered.
|
||||
"""
|
||||
|
||||
whenable = Event()
|
||||
counter = 0
|
||||
|
||||
# When as a decorator.
|
||||
@when(whenable)
|
||||
def handler(result):
|
||||
"""
|
||||
A function that should be called when the whenable object is triggered.
|
||||
|
||||
The result generated by the whenable object should be passed to the
|
||||
function.
|
||||
A function that should be called when the whenable object is
|
||||
triggered.
|
||||
"""
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
assert result == "ok"
|
||||
|
||||
# The function should not be called until the whenable object is triggered.
|
||||
assert counter == 0
|
||||
# Trigger the whenable object.
|
||||
whenable.trigger("ok")
|
||||
# The function should have been called when the whenable object was
|
||||
# triggered.
|
||||
assert counter == 1
|
||||
|
||||
|
||||
def test_when_called_with_an_event_and_handler():
|
||||
async def test_when_with_list_of_events():
|
||||
"""
|
||||
The when function should be able to be called with an Event object,
|
||||
and a handler function.
|
||||
The @when decorator should handle a list of Event objects.
|
||||
"""
|
||||
whenable = Event()
|
||||
event1 = Event()
|
||||
event2 = Event()
|
||||
counter = 0
|
||||
|
||||
@when([event1, event2])
|
||||
def handler(result):
|
||||
"""
|
||||
A function that should be called when the whenable object is triggered.
|
||||
|
||||
The result generated by the whenable object should be passed to the
|
||||
function.
|
||||
"""
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
assert result == "ok"
|
||||
|
||||
# When as a function.
|
||||
when(whenable, handler)
|
||||
|
||||
# The function should not be called until the whenable object is triggered.
|
||||
assert counter == 0
|
||||
# Trigger the whenable object.
|
||||
whenable.trigger("ok")
|
||||
# The function should have been called when the whenable object was
|
||||
# triggered.
|
||||
event1.trigger("test1")
|
||||
assert counter == 1
|
||||
event2.trigger("test2")
|
||||
assert counter == 2
|
||||
|
||||
|
||||
async def test_when_with_async_event_handler():
|
||||
"""
|
||||
Async handlers should work with custom Event objects.
|
||||
"""
|
||||
event = Event()
|
||||
call_flag = asyncio.Event()
|
||||
counter = 0
|
||||
|
||||
@when(event)
|
||||
async def handler(result):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
assert result == "async test"
|
||||
call_flag.set()
|
||||
|
||||
assert counter == 0
|
||||
event.trigger("async test")
|
||||
await call_flag.wait()
|
||||
assert counter == 1
|
||||
|
||||
def test_when_on_different_callables():
|
||||
|
||||
async def test_when_with_element_selector():
|
||||
"""
|
||||
The when function works with:
|
||||
|
||||
* Synchronous functions
|
||||
* Asynchronous functions
|
||||
* Inner functions
|
||||
* Async inner functions
|
||||
* Closure functions
|
||||
* Async closure functions
|
||||
The @when decorator should accept an Element object as selector.
|
||||
"""
|
||||
btn = web.button("test", id="elem_selector_test")
|
||||
container = get_container()
|
||||
container.append(btn)
|
||||
|
||||
def func(x=1):
|
||||
# A simple function.
|
||||
return x
|
||||
called = False
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
async def a_func(x=1):
|
||||
# A simple async function.
|
||||
return x
|
||||
@when("click", btn)
|
||||
def handler(event):
|
||||
nonlocal called
|
||||
called = True
|
||||
call_flag.set()
|
||||
|
||||
btn.click()
|
||||
await call_flag.wait()
|
||||
assert called
|
||||
|
||||
|
||||
async def test_when_with_element_collection_selector():
|
||||
"""
|
||||
The @when decorator should accept an ElementCollection as selector.
|
||||
"""
|
||||
btn1 = web.button("btn1", id="col_test_1", classes=["test-class"])
|
||||
btn2 = web.button("btn2", id="col_test_2", classes=["test-class"])
|
||||
container = get_container()
|
||||
container.append(btn1)
|
||||
container.append(btn2)
|
||||
|
||||
collection = web.page.find(".test-class")
|
||||
counter = 0
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
@when("click", collection)
|
||||
def handler(event):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
if counter == 2:
|
||||
call_flag.set()
|
||||
|
||||
btn1.click()
|
||||
btn2.click()
|
||||
await call_flag.wait()
|
||||
assert counter == 2
|
||||
|
||||
|
||||
async def test_when_with_list_of_elements():
|
||||
"""
|
||||
The @when decorator should accept a list of DOM elements as selector.
|
||||
"""
|
||||
btn1 = web.button("btn1", id="list_test_1")
|
||||
btn2 = web.button("btn2", id="list_test_2")
|
||||
container = get_container()
|
||||
container.append(btn1)
|
||||
container.append(btn2)
|
||||
|
||||
elements = [btn1._dom_element, btn2._dom_element]
|
||||
counter = 0
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
@when("click", elements)
|
||||
def handler(event):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
if counter == 2:
|
||||
call_flag.set()
|
||||
|
||||
btn1.click()
|
||||
btn2.click()
|
||||
await call_flag.wait()
|
||||
assert counter == 2
|
||||
|
||||
|
||||
def test_when_decorator_returns_wrapper():
|
||||
"""
|
||||
The @when decorator should return the wrapped function.
|
||||
"""
|
||||
event = Event()
|
||||
|
||||
@when(event)
|
||||
def handler(result):
|
||||
return result
|
||||
|
||||
assert callable(handler)
|
||||
|
||||
|
||||
def test_when_multiple_events_on_same_handler():
|
||||
"""
|
||||
Multiple @when decorators can be stacked on the same function.
|
||||
"""
|
||||
event1 = Event()
|
||||
event2 = Event()
|
||||
counter = 0
|
||||
|
||||
@when(event1)
|
||||
@when(event2)
|
||||
def handler(result):
|
||||
nonlocal counter
|
||||
counter += 1
|
||||
|
||||
assert counter == 0
|
||||
event1.trigger("test")
|
||||
assert counter == 1
|
||||
event2.trigger("test")
|
||||
assert counter == 2
|
||||
|
||||
|
||||
async def test_when_on_different_callables():
|
||||
"""
|
||||
The @when decorator works with various callable types.
|
||||
"""
|
||||
results = []
|
||||
|
||||
def func(x):
|
||||
results.append("func")
|
||||
|
||||
async def a_func(x):
|
||||
results.append("a_func")
|
||||
|
||||
def make_inner_func():
|
||||
# Creates a simple inner function.
|
||||
|
||||
def inner_func(x=1):
|
||||
return x
|
||||
def inner_func(x):
|
||||
results.append("inner_func")
|
||||
|
||||
return inner_func
|
||||
|
||||
|
||||
def make_inner_a_func():
|
||||
# Creates a simple async inner function.
|
||||
|
||||
async def inner_a_func(x=1):
|
||||
return x
|
||||
async def inner_a_func(x):
|
||||
results.append("inner_a_func")
|
||||
|
||||
return inner_a_func
|
||||
|
||||
|
||||
def make_closure():
|
||||
# Creates a simple closure function.
|
||||
a = 1
|
||||
|
||||
def closure_func(x=1):
|
||||
return a + x
|
||||
def closure_func(x):
|
||||
results.append(f"closure_func:{a}")
|
||||
|
||||
return closure_func
|
||||
|
||||
|
||||
def make_a_closure():
|
||||
# Creates a simple async closure function.
|
||||
a = 1
|
||||
|
||||
async def closure_a_func(x=1):
|
||||
return a + x
|
||||
async def closure_a_func(x):
|
||||
results.append(f"closure_a_func:{a}")
|
||||
|
||||
return closure_a_func
|
||||
|
||||
|
||||
inner_func = make_inner_func()
|
||||
inner_a_func = make_inner_a_func()
|
||||
cl_func = make_closure()
|
||||
cl_a_func = make_a_closure()
|
||||
|
||||
whenable = Event()
|
||||
|
||||
whenable = Event()
|
||||
# Each of these should work with the @when decorator.
|
||||
when(whenable)(func)
|
||||
when(whenable)(a_func)
|
||||
when(whenable)(inner_func)
|
||||
when(whenable)(inner_a_func)
|
||||
when(whenable)(cl_func)
|
||||
when(whenable)(cl_a_func)
|
||||
|
||||
# Each of these should work with the when function.
|
||||
when(whenable, func)
|
||||
when(whenable, a_func)
|
||||
when(whenable, inner_func)
|
||||
when(whenable, inner_a_func)
|
||||
when(whenable, cl_func)
|
||||
when(whenable, cl_a_func)
|
||||
# If we get here, everything worked.
|
||||
assert True
|
||||
# Verify no handlers have been called yet.
|
||||
assert len(results) == 0
|
||||
|
||||
# Trigger the event.
|
||||
whenable.trigger("test")
|
||||
|
||||
# Wait for async handlers to complete.
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Verify all handlers were called.
|
||||
assert len(results) == 6
|
||||
assert "func" in results
|
||||
assert "a_func" in results
|
||||
assert "inner_func" in results
|
||||
assert "inner_a_func" in results
|
||||
assert "closure_func:1" in results
|
||||
assert "closure_a_func:1" in results
|
||||
|
||||
|
||||
async def test_when_dom_event_with_options():
|
||||
"""
|
||||
Options should be passed to addEventListener for DOM events.
|
||||
"""
|
||||
click_count = 0
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
@when("click", "#button-for-event-testing", once=True)
|
||||
def handle_click(event):
|
||||
nonlocal click_count
|
||||
click_count += 1
|
||||
call_flag.set()
|
||||
|
||||
btn = web.page["#button-for-event-testing"]
|
||||
btn.click()
|
||||
await call_flag.wait()
|
||||
assert click_count == 1
|
||||
|
||||
# Click again - should not increment due to once=True.
|
||||
btn.click()
|
||||
# Bit of a bodge - a brief wait to ensure no handler fires.
|
||||
await asyncio.sleep(0.01)
|
||||
assert click_count == 1
|
||||
|
||||
|
||||
async def test_when_custom_event_options_ignored():
|
||||
"""
|
||||
Options should be silently ignored for custom Event objects.
|
||||
"""
|
||||
my_event = Event()
|
||||
trigger_count = 0
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
@when(my_event, once=True)
|
||||
def handler(result):
|
||||
nonlocal trigger_count
|
||||
trigger_count += 1
|
||||
if trigger_count == 2:
|
||||
call_flag.set()
|
||||
|
||||
# Should trigger multiple times despite once=True being ignored.
|
||||
my_event.trigger("first")
|
||||
my_event.trigger("second")
|
||||
await call_flag.wait()
|
||||
assert trigger_count == 2
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
"""
|
||||
Ensure the pyscript.test function behaves as expected.
|
||||
Tests for the fetch function and response handling.
|
||||
"""
|
||||
|
||||
import json
|
||||
from pyscript import fetch
|
||||
|
||||
|
||||
@@ -18,6 +19,17 @@ async def test_fetch_json():
|
||||
assert data["completed"] is False
|
||||
|
||||
|
||||
async def test_fetch_json_direct():
|
||||
"""
|
||||
The fetch function should support direct method chaining for JSON.
|
||||
"""
|
||||
data = await fetch("https://jsonplaceholder.typicode.com/todos/1").json()
|
||||
assert data["userId"] == 1
|
||||
assert data["id"] == 1
|
||||
assert data["title"] == "delectus aut autem"
|
||||
assert data["completed"] is False
|
||||
|
||||
|
||||
async def test_fetch_text():
|
||||
"""
|
||||
The fetch function should return the expected text response.
|
||||
@@ -31,6 +43,17 @@ async def test_fetch_text():
|
||||
assert "1" in text
|
||||
|
||||
|
||||
async def test_fetch_text_direct():
|
||||
"""
|
||||
The fetch function should support direct method chaining for text.
|
||||
"""
|
||||
text = await fetch("https://jsonplaceholder.typicode.com/todos/1").text()
|
||||
assert "delectus aut autem" in text
|
||||
assert "completed" in text
|
||||
assert "false" in text
|
||||
assert "1" in text
|
||||
|
||||
|
||||
async def test_fetch_bytearray():
|
||||
"""
|
||||
The fetch function should return the expected bytearray response.
|
||||
@@ -38,12 +61,22 @@ async def test_fetch_bytearray():
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
assert response.ok
|
||||
data = await response.bytearray()
|
||||
assert isinstance(data, bytearray)
|
||||
assert b"delectus aut autem" in data
|
||||
assert b"completed" in data
|
||||
assert b"false" in data
|
||||
assert b"1" in data
|
||||
|
||||
|
||||
async def test_fetch_bytearray_direct():
|
||||
"""
|
||||
The fetch function should support direct method chaining for bytearray.
|
||||
"""
|
||||
data = await fetch("https://jsonplaceholder.typicode.com/todos/1").bytearray()
|
||||
assert isinstance(data, bytearray)
|
||||
assert b"delectus aut autem" in data
|
||||
|
||||
|
||||
async def test_fetch_array_buffer():
|
||||
"""
|
||||
The fetch function should return the expected array buffer response.
|
||||
@@ -58,26 +91,217 @@ async def test_fetch_array_buffer():
|
||||
assert b"1" in bytes_
|
||||
|
||||
|
||||
async def test_fetch_ok():
|
||||
async def test_fetch_array_buffer_direct():
|
||||
"""
|
||||
The fetch function should return a response with ok set to True for an
|
||||
existing URL.
|
||||
The fetch function should support direct method chaining for arrayBuffer.
|
||||
"""
|
||||
data = await fetch("https://jsonplaceholder.typicode.com/todos/1").arrayBuffer()
|
||||
bytes_ = bytes(data)
|
||||
assert b"delectus aut autem" in bytes_
|
||||
|
||||
|
||||
async def test_fetch_blob():
|
||||
"""
|
||||
The fetch function should return a blob response.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
assert response.ok
|
||||
assert response.status == 200
|
||||
data = await response.json()
|
||||
assert data["userId"] == 1
|
||||
assert data["id"] == 1
|
||||
assert data["title"] == "delectus aut autem"
|
||||
assert data["completed"] is False
|
||||
blob = await response.blob()
|
||||
assert blob.size > 0
|
||||
assert blob.type in ["application/json", "application/json; charset=utf-8"]
|
||||
|
||||
|
||||
async def test_fetch_not_ok():
|
||||
async def test_fetch_blob_direct():
|
||||
"""
|
||||
The fetch function should return a response with ok set to False for a
|
||||
non-existent URL.
|
||||
The fetch function should support direct method chaining for blob.
|
||||
"""
|
||||
blob = await fetch("https://jsonplaceholder.typicode.com/todos/1").blob()
|
||||
assert blob.size > 0
|
||||
assert blob.type in ["application/json", "application/json; charset=utf-8"]
|
||||
|
||||
|
||||
async def test_fetch_response_ok():
|
||||
"""
|
||||
The fetch function should return a response with ok set to True for
|
||||
successful requests.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
assert response.ok
|
||||
|
||||
|
||||
async def test_fetch_response_not_ok():
|
||||
"""
|
||||
The fetch function should return a response with ok set to False for
|
||||
failed requests.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1000")
|
||||
assert not response.ok
|
||||
assert response.status == 404
|
||||
|
||||
|
||||
async def test_fetch_response_status():
|
||||
"""
|
||||
The fetch function should provide access to response status code.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
assert response.status == 200
|
||||
|
||||
|
||||
async def test_fetch_response_status_text():
|
||||
"""
|
||||
The fetch function should provide access to response statusText.
|
||||
Note: HTTP/2 responses often have empty statusText, so we just verify
|
||||
the property exists and is a string.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
assert isinstance(response.statusText, str)
|
||||
assert response.statusText in ["", "OK"]
|
||||
|
||||
|
||||
async def test_fetch_with_post_method():
|
||||
"""
|
||||
The fetch function should support POST requests.
|
||||
"""
|
||||
response = await fetch(
|
||||
"https://jsonplaceholder.typicode.com/posts",
|
||||
method="POST",
|
||||
headers={"Content-Type": "application/json"},
|
||||
body=json.dumps({"title": "foo", "body": "bar", "userId": 1}),
|
||||
)
|
||||
assert response.ok
|
||||
assert response.status == 201
|
||||
data = await response.json()
|
||||
assert data["title"] == "foo"
|
||||
assert data["body"] == "bar"
|
||||
assert data["userId"] == 1
|
||||
assert "id" in data
|
||||
|
||||
|
||||
async def test_fetch_with_put_method():
|
||||
"""
|
||||
The fetch function should support PUT requests.
|
||||
"""
|
||||
response = await fetch(
|
||||
"https://jsonplaceholder.typicode.com/posts/1",
|
||||
method="PUT",
|
||||
headers={"Content-Type": "application/json"},
|
||||
body=json.dumps(
|
||||
{"id": 1, "title": "updated", "body": "updated body", "userId": 1}
|
||||
),
|
||||
)
|
||||
assert response.ok
|
||||
assert response.status == 200
|
||||
data = await response.json()
|
||||
assert data["title"] == "updated"
|
||||
assert data["body"] == "updated body"
|
||||
|
||||
|
||||
async def test_fetch_with_delete_method():
|
||||
"""
|
||||
The fetch function should support DELETE requests.
|
||||
"""
|
||||
response = await fetch(
|
||||
"https://jsonplaceholder.typicode.com/posts/1",
|
||||
method="DELETE",
|
||||
)
|
||||
assert response.ok
|
||||
assert response.status == 200
|
||||
|
||||
|
||||
async def test_fetch_with_custom_headers():
|
||||
"""
|
||||
The fetch function should support custom headers.
|
||||
"""
|
||||
response = await fetch(
|
||||
"https://jsonplaceholder.typicode.com/todos/1",
|
||||
headers={"Accept": "application/json"},
|
||||
)
|
||||
assert response.ok
|
||||
data = await response.json()
|
||||
assert data["id"] == 1
|
||||
|
||||
|
||||
async def test_fetch_multiple_data_extractions():
|
||||
"""
|
||||
The fetch function could allow multiple data extractions from the same
|
||||
response when using the await pattern. This is a strange one, kept in
|
||||
for completeness. Note that browser behaviour may vary here (see inline
|
||||
comments). ;-)
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
|
||||
# First extraction.
|
||||
text1 = await response.text()
|
||||
assert "delectus aut autem" in text1
|
||||
# Second extraction behaviour varies by browser.
|
||||
try:
|
||||
text2 = await response.text()
|
||||
# Some browsers allow it and return empty or repeated data.
|
||||
assert text2 == "" or "delectus aut autem" in text2
|
||||
except Exception:
|
||||
# Other browsers throw an exception for already-consumed body.
|
||||
# This is expected and valid behaviour per the fetch spec.
|
||||
pass
|
||||
|
||||
|
||||
async def test_fetch_404_error_handling():
|
||||
"""
|
||||
The fetch function should handle 404 responses gracefully.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/999999")
|
||||
assert not response.ok
|
||||
assert response.status == 404
|
||||
# Should still be able to extract data even from error responses.
|
||||
data = await response.json()
|
||||
assert data == {}
|
||||
|
||||
|
||||
async def test_fetch_error_response_with_text():
|
||||
"""
|
||||
Error responses should still allow text extraction.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/999999")
|
||||
assert not response.ok
|
||||
text = await response.text()
|
||||
# Error responses may have empty or JSON content.
|
||||
assert isinstance(text, str)
|
||||
|
||||
|
||||
async def test_fetch_response_headers():
|
||||
"""
|
||||
The fetch function should provide access to response headers.
|
||||
"""
|
||||
response = await fetch("https://jsonplaceholder.typicode.com/todos/1")
|
||||
assert response.ok
|
||||
# Access headers through the underlying response object.
|
||||
content_type = response.headers.get("content-type")
|
||||
assert "application/json" in content_type
|
||||
|
||||
|
||||
async def test_fetch_direct_chaining_with_error():
|
||||
"""
|
||||
Direct method chaining should work even with error responses.
|
||||
"""
|
||||
data = await fetch("https://jsonplaceholder.typicode.com/todos/999999").json()
|
||||
# Should return empty dict for 404. This is expected API behaviour
|
||||
# from the jsonplaceholder API.
|
||||
assert data == {}
|
||||
|
||||
|
||||
async def test_fetch_options_passed_correctly():
|
||||
"""
|
||||
The fetch function should correctly pass options to the underlying
|
||||
JavaScript fetch.
|
||||
"""
|
||||
response = await fetch(
|
||||
"https://jsonplaceholder.typicode.com/posts",
|
||||
method="POST",
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"X-Custom-Header": "test-value",
|
||||
},
|
||||
body=json.dumps({"test": "data"}),
|
||||
)
|
||||
assert response.ok
|
||||
# The request succeeded, confirming options were passed correctly.
|
||||
assert response.status == 201
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
"""
|
||||
Exercise (as much as is possible) the pyscript.ffi namespace.
|
||||
|
||||
We assume that the underlying `create_proxy` and `to_js` functions
|
||||
work as expected (these are tested in Pyodide and MicroPython respectively).
|
||||
"""
|
||||
|
||||
import upytest
|
||||
@@ -38,3 +41,85 @@ def test_to_js():
|
||||
else:
|
||||
from pyodide.ffi import JsProxy
|
||||
assert isinstance(js_obj, JsProxy)
|
||||
|
||||
|
||||
def test_is_none_with_python_none():
|
||||
"""
|
||||
The is_none function should return True for Python None.
|
||||
"""
|
||||
assert ffi.is_none(None)
|
||||
|
||||
|
||||
def test_is_none_with_js_null():
|
||||
"""
|
||||
The is_none function should return True for JavaScript null.
|
||||
"""
|
||||
import js
|
||||
|
||||
assert ffi.is_none(ffi.jsnull)
|
||||
|
||||
|
||||
def test_is_none_with_other_values():
|
||||
"""
|
||||
The is_none function should return False for non-null false-y
|
||||
values.
|
||||
"""
|
||||
assert not ffi.is_none(0)
|
||||
assert not ffi.is_none("")
|
||||
assert not ffi.is_none(False)
|
||||
assert not ffi.is_none([])
|
||||
assert not ffi.is_none({})
|
||||
|
||||
|
||||
def test_assign_single_source():
|
||||
"""
|
||||
The assign function should merge a single source object into target.
|
||||
"""
|
||||
import js
|
||||
|
||||
target = js.Object.new()
|
||||
ffi.assign(target, {"a": 1, "b": 2})
|
||||
|
||||
assert target.a == 1
|
||||
assert target.b == 2
|
||||
|
||||
|
||||
def test_assign_multiple_sources():
|
||||
"""
|
||||
The assign function should merge multiple source objects into target.
|
||||
"""
|
||||
import js
|
||||
|
||||
target = js.Object.new()
|
||||
ffi.assign(target, {"a": 1}, {"b": 2}, {"c": 3})
|
||||
|
||||
assert target.a == 1
|
||||
assert target.b == 2
|
||||
assert target.c == 3
|
||||
|
||||
|
||||
def test_assign_overwrites_properties():
|
||||
"""
|
||||
The assign function should overwrite existing properties.
|
||||
"""
|
||||
import js
|
||||
|
||||
target = js.Object.new()
|
||||
target.a = 1
|
||||
ffi.assign(target, {"a": 2, "b": 3})
|
||||
|
||||
assert target.a == 2
|
||||
assert target.b == 3
|
||||
|
||||
|
||||
def test_assign_returns_target():
|
||||
"""
|
||||
The assign function should return the modified target object.
|
||||
"""
|
||||
import js
|
||||
|
||||
target = js.Object.new()
|
||||
result = ffi.assign(target, {"a": 1})
|
||||
|
||||
assert result is target
|
||||
assert result.a == 1
|
||||
|
||||
56
core/tests/python/tests/test_fs.py
Normal file
56
core/tests/python/tests/test_fs.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
**INCOMPLETE** tests for the pyscript.fs module.
|
||||
|
||||
Note: Full unit tests require Chromium browser and user interaction
|
||||
to grant filesystem permissions. These tests focus on validation logic and
|
||||
error handling that can be tested without permissions.
|
||||
"""
|
||||
|
||||
import upytest
|
||||
from pyscript import fs
|
||||
|
||||
|
||||
def test_mounted_dict_accessible():
|
||||
"""
|
||||
The mounted dictionary should be accessible and be a dict.
|
||||
"""
|
||||
assert hasattr(fs, "mounted")
|
||||
assert isinstance(fs.mounted, dict)
|
||||
|
||||
|
||||
def test_functions_exist():
|
||||
"""
|
||||
All public fs functions should exist and be callable.
|
||||
"""
|
||||
assert hasattr(fs, "mount")
|
||||
assert callable(fs.mount)
|
||||
assert hasattr(fs, "sync")
|
||||
assert callable(fs.sync)
|
||||
assert hasattr(fs, "unmount")
|
||||
assert callable(fs.unmount)
|
||||
assert hasattr(fs, "revoke")
|
||||
assert callable(fs.revoke)
|
||||
|
||||
|
||||
async def test_sync_unmounted_path():
|
||||
"""
|
||||
Syncing an unmounted path should raise KeyError with helpful message.
|
||||
"""
|
||||
with upytest.raises(KeyError):
|
||||
await fs.sync("/nonexistent")
|
||||
|
||||
|
||||
async def test_unmount_unmounted_path():
|
||||
"""
|
||||
Unmounting an unmounted path should raise KeyError with helpful message.
|
||||
"""
|
||||
with upytest.raises(KeyError):
|
||||
await fs.unmount("/nonexistent")
|
||||
|
||||
|
||||
def test_check_permission_function_exists():
|
||||
"""
|
||||
The internal _check_permission function should exist.
|
||||
"""
|
||||
assert hasattr(fs, "_check_permission")
|
||||
assert callable(fs._check_permission)
|
||||
@@ -1,4 +1,4 @@
|
||||
""""
|
||||
"""
|
||||
Tests for the PyScript media module.
|
||||
"""
|
||||
|
||||
@@ -7,80 +7,170 @@ import upytest
|
||||
from pyscript import media
|
||||
|
||||
|
||||
@upytest.skip(
|
||||
"Uses Pyodide-specific to_js function in MicroPython",
|
||||
skip_when=upytest.is_micropython,
|
||||
)
|
||||
async def test_device_enumeration():
|
||||
"""Test enumerating media devices."""
|
||||
devices = await media.list_devices()
|
||||
assert isinstance(devices, list), "list_devices should return a list"
|
||||
|
||||
# If devices are found, verify they have the expected functionality
|
||||
if devices:
|
||||
device = devices[0]
|
||||
|
||||
# Test real device properties exist (but don't assert on their values)
|
||||
# Browser security might restrict actual values until permissions are granted
|
||||
assert hasattr(device, "id"), "Device should have id property"
|
||||
assert hasattr(device, "kind"), "Device should have kind property"
|
||||
assert device.kind in [
|
||||
"videoinput",
|
||||
"audioinput",
|
||||
"audiooutput",
|
||||
], f"Device should have a valid kind, got: {device.kind}"
|
||||
|
||||
# Verify dictionary access works with actual device
|
||||
assert (
|
||||
device["id"] == device.id
|
||||
), "Dictionary access should match property access"
|
||||
assert (
|
||||
device["kind"] == device.kind
|
||||
), "Dictionary access should match property access"
|
||||
|
||||
|
||||
@upytest.skip("Waiting on a bug-fix in MicroPython, for this test to work.", skip_when=upytest.is_micropython)
|
||||
async def test_video_stream_acquisition():
|
||||
"""Test video stream."""
|
||||
async def test_list_devices_returns_list():
|
||||
"""
|
||||
The list_devices function should return a list of Device objects.
|
||||
"""
|
||||
try:
|
||||
# Load a video stream
|
||||
stream = await media.Device.load(video=True)
|
||||
|
||||
# Verify we get a real stream with expected properties
|
||||
assert hasattr(stream, "active"), "Stream should have active property"
|
||||
|
||||
# Check for video tracks, but don't fail if permissions aren't granted
|
||||
if stream._dom_element and hasattr(stream._dom_element, "getVideoTracks"):
|
||||
tracks = stream._dom_element.getVideoTracks()
|
||||
if tracks.length > 0:
|
||||
assert True, "Video stream has video tracks"
|
||||
except Exception as e:
|
||||
# If the browser blocks access, the test should still pass
|
||||
# This is because we're testing the API works, not that permissions are granted
|
||||
assert (
|
||||
True
|
||||
), f"Stream acquisition attempted but may require permissions: {str(e)}"
|
||||
devices = await media.list_devices()
|
||||
assert isinstance(devices, list)
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
@upytest.skip("Waiting on a bug-fix in MicroPython, for this test to work.", skip_when=upytest.is_micropython)
|
||||
async def test_custom_video_constraints():
|
||||
"""Test loading video with custom constraints."""
|
||||
async def test_device_properties():
|
||||
"""
|
||||
Device objects should have expected properties.
|
||||
"""
|
||||
try:
|
||||
devices = await media.list_devices()
|
||||
|
||||
if devices:
|
||||
device = devices[0]
|
||||
|
||||
# Test all properties exist.
|
||||
assert hasattr(device, "id")
|
||||
assert hasattr(device, "kind")
|
||||
assert hasattr(device, "label")
|
||||
assert hasattr(device, "group")
|
||||
|
||||
# Test kind has valid value.
|
||||
assert device.kind in [
|
||||
"videoinput",
|
||||
"audioinput",
|
||||
"audiooutput",
|
||||
]
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
async def test_device_dict_access():
|
||||
"""
|
||||
Device objects should support dictionary-style access for JavaScript
|
||||
interop.
|
||||
"""
|
||||
try:
|
||||
devices = await media.list_devices()
|
||||
|
||||
if devices:
|
||||
device = devices[0]
|
||||
|
||||
# Dictionary access should match property access.
|
||||
assert device["id"] == device.id
|
||||
assert device["kind"] == device.kind
|
||||
assert device["label"] == device.label
|
||||
assert device["group"] == device.group
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
async def test_request_stream_video_only():
|
||||
"""
|
||||
The request_stream method should return a stream for video only.
|
||||
"""
|
||||
try:
|
||||
stream = await media.Device.request_stream(video=True)
|
||||
assert hasattr(stream, "active")
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
async def test_request_stream_audio_only():
|
||||
"""
|
||||
The request_stream method should return a stream for audio only.
|
||||
"""
|
||||
try:
|
||||
stream = await media.Device.request_stream(audio=True, video=False)
|
||||
assert hasattr(stream, "active")
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
async def test_request_stream_audio_and_video():
|
||||
"""
|
||||
The request_stream method should return a stream for both audio and video.
|
||||
"""
|
||||
try:
|
||||
stream = await media.Device.request_stream(audio=True, video=True)
|
||||
assert hasattr(stream, "active")
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
async def test_request_stream_with_constraints():
|
||||
"""
|
||||
The request_stream method should accept video constraints as a dict.
|
||||
"""
|
||||
try:
|
||||
# Define custom constraints
|
||||
constraints = {"width": 640, "height": 480}
|
||||
stream = await media.Device.request_stream(video=constraints)
|
||||
assert hasattr(stream, "active")
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
# Load stream with custom constraints
|
||||
stream = await media.Device.load(video=constraints)
|
||||
|
||||
# Basic stream property check
|
||||
assert hasattr(stream, "active"), "Stream should have active property"
|
||||
async def test_load_backwards_compatibility():
|
||||
"""
|
||||
The deprecated load method should still work for backwards compatibility.
|
||||
"""
|
||||
try:
|
||||
stream = await media.Device.load(video=True)
|
||||
assert hasattr(stream, "active")
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
# Check for tracks only if we have access
|
||||
if stream._dom_element and hasattr(stream._dom_element, "getVideoTracks"):
|
||||
tracks = stream._dom_element.getVideoTracks()
|
||||
if tracks.length > 0 and hasattr(tracks[0], "getSettings"):
|
||||
# Settings verification is optional - browsers may handle constraints differently
|
||||
pass
|
||||
except Exception as e:
|
||||
# If the browser blocks access, test that the API structure works
|
||||
assert True, f"Custom constraint test attempted: {str(e)}"
|
||||
|
||||
@upytest.skip(
|
||||
"Blocks on Pyodide due to permission dialog.",
|
||||
skip_when=not upytest.is_micropython,
|
||||
)
|
||||
async def test_device_get_stream():
|
||||
"""
|
||||
The get_stream instance method should return a stream from a specific
|
||||
device.
|
||||
"""
|
||||
try:
|
||||
devices = await media.list_devices()
|
||||
|
||||
# Find a video input device to test with.
|
||||
video_devices = [d for d in devices if d.kind == "videoinput"]
|
||||
|
||||
if video_devices:
|
||||
stream = await video_devices[0].get_stream()
|
||||
assert hasattr(stream, "active")
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
|
||||
async def test_device_filtering_by_kind():
|
||||
"""
|
||||
Devices should be filterable by their kind property.
|
||||
"""
|
||||
try:
|
||||
devices = await media.list_devices()
|
||||
|
||||
video_inputs = [d for d in devices if d.kind == "videoinput"]
|
||||
audio_inputs = [d for d in devices if d.kind == "audioinput"]
|
||||
audio_outputs = [d for d in devices if d.kind == "audiooutput"]
|
||||
|
||||
# All filtered devices should have correct kind.
|
||||
for device in video_inputs:
|
||||
assert device.kind == "videoinput"
|
||||
|
||||
for device in audio_inputs:
|
||||
assert device.kind == "audioinput"
|
||||
|
||||
for device in audio_outputs:
|
||||
assert device.kind == "audiooutput"
|
||||
except Exception:
|
||||
# Permission denied or no devices available - test passes.
|
||||
pass
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
Ensure the pyscript.storage object behaves as a Python dict.
|
||||
Tests for the pyscript.storage module.
|
||||
"""
|
||||
|
||||
from pyscript import Storage, storage
|
||||
@@ -8,6 +8,9 @@ test_store = None
|
||||
|
||||
|
||||
async def setup():
|
||||
"""
|
||||
Set up a clean test storage before each test.
|
||||
"""
|
||||
global test_store
|
||||
if test_store is None:
|
||||
test_store = await storage("test_store")
|
||||
@@ -16,6 +19,9 @@ async def setup():
|
||||
|
||||
|
||||
async def teardown():
|
||||
"""
|
||||
Clean up test storage after each test.
|
||||
"""
|
||||
if test_store:
|
||||
test_store.clear()
|
||||
await test_store.sync()
|
||||
@@ -25,17 +31,17 @@ async def test_storage_as_dict():
|
||||
"""
|
||||
The storage object should behave as a Python dict.
|
||||
"""
|
||||
# Assign
|
||||
# Assign.
|
||||
test_store["a"] = 1
|
||||
# Retrieve
|
||||
# Retrieve.
|
||||
assert test_store["a"] == 1
|
||||
assert "a" in test_store
|
||||
assert len(test_store) == 1
|
||||
# Iterate
|
||||
# Iterate.
|
||||
for k, v in test_store.items():
|
||||
assert k == "a"
|
||||
assert v == 1
|
||||
# Remove
|
||||
# Remove.
|
||||
del test_store["a"]
|
||||
assert "a" not in test_store
|
||||
assert len(test_store) == 0
|
||||
@@ -86,3 +92,233 @@ async def test_storage_clear():
|
||||
assert len(test_store) == 2
|
||||
test_store.clear()
|
||||
assert len(test_store) == 0
|
||||
|
||||
|
||||
async def test_storage_get_method():
|
||||
"""
|
||||
The get method should return default value for missing keys.
|
||||
"""
|
||||
test_store["exists"] = "value"
|
||||
|
||||
assert test_store.get("exists") == "value"
|
||||
assert test_store.get("missing") is None
|
||||
assert test_store.get("missing", "default") == "default"
|
||||
|
||||
|
||||
async def test_storage_keys_values_items():
|
||||
"""
|
||||
The keys, values, and items methods should work like dict.
|
||||
"""
|
||||
test_store["a"] = 1
|
||||
test_store["b"] = 2
|
||||
test_store["c"] = 3
|
||||
|
||||
assert set(test_store.keys()) == {"a", "b", "c"}
|
||||
assert set(test_store.values()) == {1, 2, 3}
|
||||
assert set(test_store.items()) == {("a", 1), ("b", 2), ("c", 3)}
|
||||
|
||||
|
||||
async def test_storage_update():
|
||||
"""
|
||||
The update method should add multiple items at once.
|
||||
"""
|
||||
test_store["a"] = 1
|
||||
|
||||
# Update with dict.
|
||||
test_store.update({"b": 2, "c": 3})
|
||||
assert test_store["b"] == 2
|
||||
assert test_store["c"] == 3
|
||||
|
||||
# Update with keyword arguments.
|
||||
test_store.update(d=4, e=5)
|
||||
assert test_store["d"] == 4
|
||||
assert test_store["e"] == 5
|
||||
|
||||
|
||||
async def test_storage_pop():
|
||||
"""
|
||||
The pop method should remove and return values.
|
||||
"""
|
||||
test_store["a"] = 1
|
||||
test_store["b"] = 2
|
||||
|
||||
value = test_store.pop("a")
|
||||
assert value == 1
|
||||
assert "a" not in test_store
|
||||
assert len(test_store) == 1
|
||||
|
||||
# Pop with default.
|
||||
value = test_store.pop("missing", "default")
|
||||
assert value == "default"
|
||||
|
||||
|
||||
async def test_storage_persistence():
|
||||
"""
|
||||
Data should persist after sync and reload.
|
||||
"""
|
||||
test_store["persistent"] = "value"
|
||||
await test_store.sync()
|
||||
|
||||
# Reload the same storage.
|
||||
reloaded = await storage("test_store")
|
||||
assert reloaded["persistent"] == "value"
|
||||
|
||||
|
||||
async def test_storage_nested_structures():
|
||||
"""
|
||||
Nested data structures should be stored and retrieved correctly.
|
||||
"""
|
||||
nested = {
|
||||
"level1": {"level2": {"level3": [1, 2, 3]}},
|
||||
"list_of_dicts": [{"a": 1}, {"b": 2}, {"c": 3}],
|
||||
}
|
||||
|
||||
test_store["nested"] = nested
|
||||
await test_store.sync()
|
||||
|
||||
retrieved = test_store["nested"]
|
||||
assert retrieved["level1"]["level2"]["level3"] == [1, 2, 3]
|
||||
assert retrieved["list_of_dicts"][0]["a"] == 1
|
||||
|
||||
|
||||
async def test_storage_overwrite():
|
||||
"""
|
||||
Overwriting values should work correctly.
|
||||
"""
|
||||
test_store["key"] = "original"
|
||||
assert test_store["key"] == "original"
|
||||
|
||||
test_store["key"] = "updated"
|
||||
assert test_store["key"] == "updated"
|
||||
|
||||
|
||||
async def test_storage_empty_string_key():
|
||||
"""
|
||||
Empty strings should be valid keys.
|
||||
"""
|
||||
test_store[""] = "empty key"
|
||||
assert "" in test_store
|
||||
assert test_store[""] == "empty key"
|
||||
|
||||
|
||||
async def test_storage_special_characters_in_keys():
|
||||
"""
|
||||
Keys with special characters should work.
|
||||
"""
|
||||
test_store["key with spaces"] = 1
|
||||
test_store["key-with-dashes"] = 2
|
||||
test_store["key.with.dots"] = 3
|
||||
test_store["key/with/slashes"] = 4
|
||||
|
||||
assert test_store["key with spaces"] == 1
|
||||
assert test_store["key-with-dashes"] == 2
|
||||
assert test_store["key.with.dots"] == 3
|
||||
assert test_store["key/with/slashes"] == 4
|
||||
|
||||
|
||||
async def test_storage_multiple_stores():
|
||||
"""
|
||||
Multiple named storages should be independent.
|
||||
"""
|
||||
store1 = await storage("store1")
|
||||
store2 = await storage("store2")
|
||||
|
||||
store1.clear()
|
||||
store2.clear()
|
||||
|
||||
store1["key"] = "value1"
|
||||
store2["key"] = "value2"
|
||||
|
||||
assert store1["key"] == "value1"
|
||||
assert store2["key"] == "value2"
|
||||
|
||||
# Clean up.
|
||||
store1.clear()
|
||||
store2.clear()
|
||||
await store1.sync()
|
||||
await store2.sync()
|
||||
|
||||
|
||||
async def test_storage_empty_name_raises():
|
||||
"""
|
||||
Creating storage with empty name should raise ValueError.
|
||||
"""
|
||||
try:
|
||||
await storage("")
|
||||
assert False, "Should have raised ValueError"
|
||||
except ValueError as e:
|
||||
assert "non-empty" in str(e)
|
||||
|
||||
|
||||
async def test_custom_storage_class():
|
||||
"""
|
||||
Custom Storage subclasses should work correctly.
|
||||
"""
|
||||
calls = []
|
||||
|
||||
class TrackingStorage(Storage):
|
||||
def __setitem__(self, key, value):
|
||||
calls.append(("set", key, value))
|
||||
super().__setitem__(key, value)
|
||||
|
||||
def __delitem__(self, key):
|
||||
calls.append(("del", key))
|
||||
super().__delitem__(key)
|
||||
|
||||
custom_store = await storage("custom_test", storage_class=TrackingStorage)
|
||||
custom_store.clear()
|
||||
calls.clear()
|
||||
|
||||
# Test setitem tracking.
|
||||
custom_store["test"] = 123
|
||||
assert ("set", "test", 123) in calls
|
||||
assert custom_store["test"] == 123
|
||||
|
||||
# Test delitem tracking.
|
||||
del custom_store["test"]
|
||||
assert ("del", "test") in calls
|
||||
|
||||
# Clean up.
|
||||
custom_store.clear()
|
||||
await custom_store.sync()
|
||||
|
||||
|
||||
async def test_storage_boolean_false_vs_none():
|
||||
"""
|
||||
False and None should be distinguishable.
|
||||
"""
|
||||
test_store["false"] = False
|
||||
test_store["none"] = None
|
||||
|
||||
assert test_store["false"] is False
|
||||
assert test_store["false"] is not None
|
||||
assert test_store["none"] is None
|
||||
assert test_store["none"] is not False
|
||||
|
||||
|
||||
async def test_storage_numeric_zero_vs_none():
|
||||
"""
|
||||
Zero and None should be distinguishable.
|
||||
"""
|
||||
test_store["zero_int"] = 0
|
||||
test_store["zero_float"] = 0.0
|
||||
test_store["none"] = None
|
||||
|
||||
assert test_store["zero_int"] == 0
|
||||
assert test_store["zero_int"] is not None
|
||||
assert test_store["zero_float"] == 0.0
|
||||
assert test_store["zero_float"] is not None
|
||||
assert test_store["none"] is None
|
||||
|
||||
|
||||
async def test_storage_empty_collections():
|
||||
"""
|
||||
Empty lists and dicts should be stored correctly.
|
||||
"""
|
||||
test_store["empty_list"] = []
|
||||
test_store["empty_dict"] = {}
|
||||
|
||||
assert test_store["empty_list"] == []
|
||||
assert isinstance(test_store["empty_list"], list)
|
||||
assert test_store["empty_dict"] == {}
|
||||
assert isinstance(test_store["empty_dict"], dict)
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
"""
|
||||
Tests for the pyscript.util module.
|
||||
"""
|
||||
|
||||
import upytest
|
||||
import js
|
||||
from pyscript import util
|
||||
@@ -5,8 +9,8 @@ from pyscript import util
|
||||
|
||||
def test_as_bytearray():
|
||||
"""
|
||||
Test the as_bytearray function correctly converts a JavaScript ArrayBuffer
|
||||
to a Python bytearray.
|
||||
The as_bytearray function should convert a JavaScript ArrayBuffer to a
|
||||
Python bytearray.
|
||||
"""
|
||||
msg = b"Hello, world!"
|
||||
buffer = js.ArrayBuffer.new(len(msg))
|
||||
@@ -18,31 +22,187 @@ def test_as_bytearray():
|
||||
assert ba == msg
|
||||
|
||||
|
||||
def test_not_supported():
|
||||
def test_as_bytearray_empty():
|
||||
"""
|
||||
Test the NotSupported class raises an exception when trying to access
|
||||
attributes or call the object.
|
||||
The as_bytearray function should handle empty ArrayBuffers.
|
||||
"""
|
||||
buffer = js.ArrayBuffer.new(0)
|
||||
ba = util.as_bytearray(buffer)
|
||||
assert isinstance(ba, bytearray)
|
||||
assert len(ba) == 0
|
||||
|
||||
|
||||
def test_as_bytearray_binary_data():
|
||||
"""
|
||||
The as_bytearray function should handle binary data with all byte values.
|
||||
"""
|
||||
# Test with all possible byte values.
|
||||
data = bytes(range(256))
|
||||
buffer = js.ArrayBuffer.new(len(data))
|
||||
ui8a = js.Uint8Array.new(buffer)
|
||||
for i, b in enumerate(data):
|
||||
ui8a[i] = b
|
||||
ba = util.as_bytearray(buffer)
|
||||
assert ba == bytearray(data)
|
||||
|
||||
|
||||
def test_not_supported_repr():
|
||||
"""
|
||||
The NotSupported class should have a meaningful repr.
|
||||
"""
|
||||
ns = util.NotSupported("test_feature", "Feature not available")
|
||||
repr_str = repr(ns)
|
||||
assert "NotSupported" in repr_str
|
||||
assert "test_feature" in repr_str
|
||||
|
||||
|
||||
def test_not_supported_getattr():
|
||||
"""
|
||||
The NotSupported class should raise AttributeError on attribute access.
|
||||
"""
|
||||
ns = util.NotSupported("test", "This is not supported.")
|
||||
with upytest.raises(AttributeError) as e:
|
||||
ns.test
|
||||
assert str(e.exception) == "This is not supported.", str(e.exception)
|
||||
ns.some_attr
|
||||
assert str(e.exception) == "This is not supported."
|
||||
|
||||
|
||||
def test_not_supported_setattr():
|
||||
"""
|
||||
The NotSupported class should raise AttributeError on attribute
|
||||
assignment.
|
||||
"""
|
||||
ns = util.NotSupported("test", "This is not supported.")
|
||||
with upytest.raises(AttributeError) as e:
|
||||
ns.test = 1
|
||||
assert str(e.exception) == "This is not supported.", str(e.exception)
|
||||
ns.some_attr = 1
|
||||
assert str(e.exception) == "This is not supported."
|
||||
|
||||
|
||||
def test_not_supported_call():
|
||||
"""
|
||||
The NotSupported class should raise TypeError when called.
|
||||
"""
|
||||
ns = util.NotSupported("test", "This is not supported.")
|
||||
with upytest.raises(TypeError) as e:
|
||||
ns()
|
||||
assert str(e.exception) == "This is not supported.", str(e.exception)
|
||||
assert str(e.exception) == "This is not supported."
|
||||
|
||||
|
||||
def test_is_awaitable():
|
||||
def test_not_supported_call_with_args():
|
||||
"""
|
||||
Test the is_awaitable function correctly identifies an asynchronous
|
||||
function.
|
||||
The NotSupported class should raise TypeError when called with arguments.
|
||||
"""
|
||||
ns = util.NotSupported("test", "This is not supported.")
|
||||
with upytest.raises(TypeError) as e:
|
||||
ns(1, 2, 3)
|
||||
assert str(e.exception) == "This is not supported."
|
||||
|
||||
|
||||
def test_is_awaitable_async_function():
|
||||
"""
|
||||
The is_awaitable function should identify async functions as awaitable.
|
||||
"""
|
||||
|
||||
async def async_func():
|
||||
yield
|
||||
pass
|
||||
|
||||
assert util.is_awaitable(async_func)
|
||||
|
||||
|
||||
def test_is_awaitable_regular_function():
|
||||
"""
|
||||
The is_awaitable function should identify regular functions as not
|
||||
awaitable.
|
||||
"""
|
||||
|
||||
def regular_func():
|
||||
pass
|
||||
|
||||
assert not util.is_awaitable(regular_func)
|
||||
|
||||
|
||||
def test_is_awaitable_lambda():
|
||||
"""
|
||||
The is_awaitable function should identify lambdas as not awaitable.
|
||||
"""
|
||||
assert not util.is_awaitable(lambda: None)
|
||||
|
||||
|
||||
def test_is_awaitable_async_lambda():
|
||||
"""
|
||||
The is_awaitable function should identify async lambdas as awaitable.
|
||||
"""
|
||||
# Note: async lambdas don't exist in Python, but this documents the
|
||||
# expected behavior.
|
||||
async_lambda = lambda: (yield)
|
||||
# This test documents current behavior - may vary by implementation.
|
||||
|
||||
|
||||
def test_is_awaitable_generator():
|
||||
"""
|
||||
The is_awaitable function should handle generator functions correctly.
|
||||
"""
|
||||
|
||||
def gen_func():
|
||||
yield 1
|
||||
|
||||
# Generator functions are treated differently in MicroPython vs Pyodide.
|
||||
# In MicroPython, async functions are generator functions.
|
||||
result = util.is_awaitable(gen_func)
|
||||
# Result depends on Python implementation.
|
||||
assert isinstance(result, bool)
|
||||
|
||||
|
||||
def test_is_awaitable_async_closure():
|
||||
"""
|
||||
The is_awaitable function should handle async closures correctly.
|
||||
"""
|
||||
|
||||
def make_async_closure():
|
||||
async def inner():
|
||||
pass
|
||||
|
||||
return inner
|
||||
|
||||
closure = make_async_closure()
|
||||
assert util.is_awaitable(closure)
|
||||
|
||||
|
||||
def test_is_awaitable_regular_closure():
|
||||
"""
|
||||
The is_awaitable function should handle regular closures correctly.
|
||||
"""
|
||||
|
||||
def make_closure():
|
||||
def inner():
|
||||
pass
|
||||
|
||||
return inner
|
||||
|
||||
closure = make_closure()
|
||||
assert not util.is_awaitable(closure)
|
||||
|
||||
|
||||
def test_is_awaitable_builtin():
|
||||
"""
|
||||
The is_awaitable function should identify built-in functions as not
|
||||
awaitable.
|
||||
"""
|
||||
assert not util.is_awaitable(print)
|
||||
assert not util.is_awaitable(len)
|
||||
|
||||
|
||||
def test_is_awaitable_class_method():
|
||||
"""
|
||||
The is_awaitable function should handle class methods correctly.
|
||||
"""
|
||||
|
||||
class TestClass:
|
||||
async def async_method(self):
|
||||
pass
|
||||
|
||||
def sync_method(self):
|
||||
pass
|
||||
|
||||
obj = TestClass()
|
||||
assert util.is_awaitable(obj.async_method)
|
||||
assert not util.is_awaitable(obj.sync_method)
|
||||
|
||||
@@ -6,6 +6,7 @@ import asyncio
|
||||
|
||||
import upytest
|
||||
from pyscript import RUNNING_IN_WORKER, document, web, when
|
||||
from pyscript.ffi import to_js
|
||||
|
||||
|
||||
def setup():
|
||||
@@ -22,14 +23,22 @@ def test_getitem_by_id():
|
||||
"""
|
||||
An element with an id in the DOM can be retrieved by id.
|
||||
"""
|
||||
result = web.page.find("#div-no-classes")
|
||||
# There is a single result.
|
||||
assert len(result) == 1
|
||||
result = web.page["#div-no-classes"]
|
||||
# The result is a div.
|
||||
assert result[0].get_tag_name() == "div"
|
||||
assert result.get_tag_name() == "div"
|
||||
# The result has the expected id.
|
||||
assert result.id == "div-no-classes"
|
||||
# Now do the same but without the '#'
|
||||
result = web.page["div-no-classes"]
|
||||
assert result.get_tag_name() == "div"
|
||||
assert result.id == "div-no-classes"
|
||||
|
||||
|
||||
def test_getitem_by_class():
|
||||
def test_find_item_by_class():
|
||||
"""
|
||||
Elements with a given class in the DOM can be retrieved by class via the
|
||||
find method.
|
||||
"""
|
||||
ids = [
|
||||
"test_class_selector",
|
||||
"test_selector_w_children",
|
||||
@@ -37,35 +46,188 @@ def test_getitem_by_class():
|
||||
]
|
||||
expected_class = "a-test-class"
|
||||
result = web.page.find(f".{expected_class}")
|
||||
|
||||
# EXPECT to find exact number of elements with the class in the page (== 3)
|
||||
assert len(result) == 3
|
||||
|
||||
# EXPECT that all element ids are in the expected list
|
||||
assert [el.id for el in result] == ids
|
||||
|
||||
|
||||
def test_read_n_write_collection_elements():
|
||||
"""
|
||||
Elements with a given class in the DOM can be retrieved by class via the
|
||||
find method. They can be bulk updated via the update_all method.
|
||||
"""
|
||||
elements = web.page.find(".multi-elems")
|
||||
|
||||
for element in elements:
|
||||
assert element.innerHTML == f"Content {element.id.replace('#', '')}"
|
||||
|
||||
new_content = "New Content"
|
||||
elements.innerHTML = new_content
|
||||
elements.update_all(innerHTML=new_content)
|
||||
for element in elements:
|
||||
assert element.innerHTML == new_content
|
||||
|
||||
|
||||
class TestHelperFunctions:
|
||||
"""
|
||||
Test the internal helper functions.
|
||||
"""
|
||||
|
||||
def test_wrap_if_not_none_with_element(self):
|
||||
"""
|
||||
Test wrapping a valid DOM element.
|
||||
"""
|
||||
|
||||
# Create a DOM element.
|
||||
dom_div = document.createElement("div")
|
||||
dom_div.id = "test-wrap"
|
||||
|
||||
# Wrap it.
|
||||
result = web._wrap_if_not_none(dom_div)
|
||||
|
||||
# Should return an Element instance.
|
||||
assert isinstance(result, web.Element)
|
||||
assert result.id == "test-wrap"
|
||||
|
||||
def test_wrap_if_not_none_with_none(self):
|
||||
"""
|
||||
Test wrapping None returns None.
|
||||
"""
|
||||
|
||||
# Pass None (as a JS null).
|
||||
result = web._wrap_if_not_none(to_js(None))
|
||||
|
||||
# Should return None.
|
||||
assert result is None
|
||||
|
||||
def test_find_by_id_without_hash(self):
|
||||
"""
|
||||
Test finding element by id without # prefix.
|
||||
"""
|
||||
|
||||
# Create and append an element with an id.
|
||||
test_div = web.div("Test content", id="helper-test-id")
|
||||
web.page.body.append(test_div)
|
||||
|
||||
# Find it using the helper.
|
||||
result = web._find_by_id(document, "helper-test-id")
|
||||
|
||||
# Should find the element.
|
||||
assert result is not None
|
||||
assert result.id == "helper-test-id"
|
||||
assert result.innerHTML == "Test content"
|
||||
|
||||
def test_find_by_id_with_hash(self):
|
||||
"""
|
||||
Test finding element by id with # prefix.
|
||||
"""
|
||||
|
||||
# Create and append an element with an id.
|
||||
test_div = web.div("Test content", id="helper-test-id-hash")
|
||||
web.page.body.append(test_div)
|
||||
|
||||
# Find it using the helper with # prefix.
|
||||
result = web._find_by_id(document, "#helper-test-id-hash")
|
||||
|
||||
# Should find the element.
|
||||
assert result is not None
|
||||
assert result.id == "helper-test-id-hash"
|
||||
|
||||
def test_find_by_id_not_found(self):
|
||||
"""
|
||||
Test finding non-existent id returns None.
|
||||
"""
|
||||
|
||||
# Try to find non-existent element.
|
||||
result = web._find_by_id(document, "this-id-does-not-exist")
|
||||
|
||||
# Should return None.
|
||||
assert result is None
|
||||
|
||||
def test_find_by_id_within_element(self):
|
||||
"""
|
||||
Test finding by id within a specific element.
|
||||
"""
|
||||
|
||||
# Create a container with a child.
|
||||
child = web.p("Child", id="child-in-container")
|
||||
container = web.div(child, id="container-for-search")
|
||||
web.page.body.append(container)
|
||||
|
||||
# Find the child within the container.
|
||||
result = web._find_by_id(container._dom_element, "child-in-container")
|
||||
|
||||
# Should find the child.
|
||||
assert result is not None
|
||||
assert result.id == "child-in-container"
|
||||
|
||||
def test_find_and_wrap_with_results(self):
|
||||
"""
|
||||
Test finding elements by selector.
|
||||
"""
|
||||
|
||||
# Create multiple elements with same class.
|
||||
container = web.div(
|
||||
web.p("Para 1", classes=["test-para"]),
|
||||
web.p("Para 2", classes=["test-para"]),
|
||||
web.p("Para 3", classes=["test-para"]),
|
||||
id="find-wrap-container",
|
||||
)
|
||||
web.page.body.append(container)
|
||||
|
||||
# Find them using the helper.
|
||||
result = web._find_and_wrap(container._dom_element, ".test-para")
|
||||
|
||||
# Should return an ElementCollection.
|
||||
assert isinstance(result, web.ElementCollection)
|
||||
assert len(result) == 3
|
||||
assert result[0].innerHTML == "Para 1"
|
||||
assert result[1].innerHTML == "Para 2"
|
||||
assert result[2].innerHTML == "Para 3"
|
||||
|
||||
def test_find_and_wrap_no_results(self):
|
||||
"""
|
||||
Test finding with selector that matches nothing.
|
||||
"""
|
||||
|
||||
# Create a container.
|
||||
container = web.div(id="empty-container")
|
||||
web.page.body.append(container)
|
||||
|
||||
# Find with selector that matches nothing.
|
||||
result = web._find_and_wrap(container._dom_element, ".does-not-exist")
|
||||
|
||||
# Should return empty collection.
|
||||
assert isinstance(result, web.ElementCollection)
|
||||
assert len(result) == 0
|
||||
|
||||
def test_find_and_wrap_on_document(self):
|
||||
"""
|
||||
Test finding elements in entire document.
|
||||
"""
|
||||
|
||||
# Create elements in the document.
|
||||
web.page.body.append(web.span("Span 1", classes=["doc-span"]))
|
||||
web.page.body.append(web.span("Span 2", classes=["doc-span"]))
|
||||
|
||||
# Find them in the entire document.
|
||||
result = web._find_and_wrap(document, ".doc-span")
|
||||
|
||||
# Should find both.
|
||||
assert isinstance(result, web.ElementCollection)
|
||||
assert len(result) >= 2 # May have others from previous tests.
|
||||
|
||||
|
||||
class TestElement:
|
||||
"""
|
||||
Test the base Element class functionality.
|
||||
"""
|
||||
|
||||
def test_query(self):
|
||||
# GIVEN an existing element on the page, with at least 1 child element
|
||||
id_ = "test_selector_w_children"
|
||||
parent_div = web.page.find(f"#{id_}")[0]
|
||||
parent_div = web.page[f"#{id_}"]
|
||||
|
||||
# EXPECT it to be able to query for the first child element
|
||||
div = parent_div.find("div")[0]
|
||||
div = parent_div[0]
|
||||
|
||||
# EXPECT the new element to be associated with the parent
|
||||
assert (
|
||||
@@ -101,7 +263,7 @@ class TestElement:
|
||||
|
||||
def test_append_element(self):
|
||||
id_ = "element-append-tests"
|
||||
div = web.page.find(f"#{id_}")[0]
|
||||
div = web.page[f"#{id_}"]
|
||||
len_children_before = len(div.children)
|
||||
new_el = web.p("new element")
|
||||
div.append(new_el)
|
||||
@@ -110,7 +272,7 @@ class TestElement:
|
||||
|
||||
def test_append_dom_element_element(self):
|
||||
id_ = "element-append-tests"
|
||||
div = web.page.find(f"#{id_}")[0]
|
||||
div = web.page[f"#{id_}"]
|
||||
len_children_before = len(div.children)
|
||||
new_el = web.p("new element")
|
||||
div.append(new_el._dom_element)
|
||||
@@ -119,7 +281,7 @@ class TestElement:
|
||||
|
||||
def test_append_collection(self):
|
||||
id_ = "element-append-tests"
|
||||
div = web.page.find(f"#{id_}")[0]
|
||||
div = web.page[f"#{id_}"]
|
||||
len_children_before = len(div.children)
|
||||
collection = web.page.find(".collection")
|
||||
div.append(collection)
|
||||
@@ -131,19 +293,24 @@ class TestElement:
|
||||
def test_read_classes(self):
|
||||
id_ = "test_class_selector"
|
||||
expected_class = "a-test-class"
|
||||
div = web.page.find(f"#{id_}")[0]
|
||||
assert div.classes == [expected_class]
|
||||
div = web.page[f"#{id_}"]
|
||||
assert div.classes == {expected_class}
|
||||
|
||||
def test_add_remove_class(self):
|
||||
id_ = "div-no-classes"
|
||||
classname = "tester-class"
|
||||
div = web.page.find(f"#{id_}")[0]
|
||||
div = web.page[f"#{id_}"]
|
||||
assert not div.classes
|
||||
div.classes.add(classname)
|
||||
same_div = web.page.find(f"#{id_}")[0]
|
||||
assert div.classes == [classname] == same_div.classes
|
||||
assert div.classes == {classname}
|
||||
div.classes.remove(classname)
|
||||
assert div.classes == [] == same_div.classes
|
||||
assert div.classes == set()
|
||||
# Handle multiple classes in a single string
|
||||
multiple_classes = "class1 class2 class3"
|
||||
div.classes.add(multiple_classes)
|
||||
assert div.classes == {"class1", "class2", "class3"}
|
||||
div.classes.remove("class2 class3")
|
||||
assert div.classes == {"class1"}
|
||||
|
||||
async def test_when_decorator(self):
|
||||
called = False
|
||||
@@ -243,6 +410,129 @@ class TestElement:
|
||||
)
|
||||
assert div.textContent == div._dom_element.textContent == "<b>New Content</b>"
|
||||
|
||||
def test_update_classes(self):
|
||||
"""Test updating classes via update()."""
|
||||
div = web.div()
|
||||
div.update(classes=["foo", "bar"])
|
||||
assert "foo" in div.classes
|
||||
assert "bar" in div.classes
|
||||
|
||||
def test_update_single_class(self):
|
||||
"""Test updating single class string via update()."""
|
||||
div = web.div()
|
||||
div.update(classes="foo")
|
||||
assert "foo" in div.classes
|
||||
|
||||
def test_update_style(self):
|
||||
"""Test updating styles via update()."""
|
||||
div = web.div()
|
||||
div.update(style={"color": "red", "font-size": "16px"})
|
||||
assert div.style["color"] == "red", div.style["color"]
|
||||
assert div.style["font-size"] == "16px"
|
||||
|
||||
def test_update_attributes(self):
|
||||
"""Test updating attributes via update()."""
|
||||
div = web.div()
|
||||
div.update(id="test-id", title="Test Title")
|
||||
assert div.id == "test-id"
|
||||
assert div.title == "Test Title"
|
||||
|
||||
def test_update_combined(self):
|
||||
"""Test updating classes, styles, and attributes together."""
|
||||
div = web.div()
|
||||
div.update(classes=["foo"], style={"color": "red"}, id="test-id")
|
||||
assert "foo" in div.classes
|
||||
assert div.style["color"] == "red"
|
||||
assert div.id == "test-id"
|
||||
|
||||
def test_getitem_integer_index(self):
|
||||
"""Test indexing children by integer."""
|
||||
parent = web.div(web.p("Child 1"), web.p("Child 2"))
|
||||
assert parent[0].innerHTML == "Child 1"
|
||||
assert parent[1].innerHTML == "Child 2"
|
||||
|
||||
def test_getitem_slice(self):
|
||||
"""Test slicing children."""
|
||||
parent = web.div(web.p("Child 1"), web.p("Child 2"), web.p("Child 3"))
|
||||
sliced = parent[0:2]
|
||||
assert len(sliced) == 2
|
||||
assert sliced[0].innerHTML == "Child 1"
|
||||
assert sliced[1].innerHTML == "Child 2"
|
||||
|
||||
def test_getitem_by_id(self):
|
||||
"""Test looking up descendant by id."""
|
||||
child = web.p("Child", id="child-id")
|
||||
parent = web.div(child)
|
||||
web.page.body.append(parent)
|
||||
|
||||
result = parent["child-id"]
|
||||
assert result is not None
|
||||
assert result.id == "child-id"
|
||||
|
||||
def test_getitem_by_id_with_hash(self):
|
||||
"""Test looking up descendant by id with # prefix."""
|
||||
child = web.p("Child", id="child-id-2")
|
||||
parent = web.div(child)
|
||||
web.page.body.append(parent)
|
||||
|
||||
result = parent["#child-id-2"]
|
||||
assert result is not None
|
||||
assert result.id == "child-id-2"
|
||||
|
||||
def test_clone_basic(self):
|
||||
"""Test cloning an element."""
|
||||
original = web.div("Content", id="original")
|
||||
original.classes.add("test-class")
|
||||
|
||||
clone = original.clone()
|
||||
assert clone.innerHTML == original.innerHTML
|
||||
assert "test-class" in clone.classes
|
||||
assert clone is not original
|
||||
assert clone._dom_element is not original._dom_element
|
||||
|
||||
def test_clone_with_id(self):
|
||||
"""Test cloning with new id."""
|
||||
original = web.div("Content", id="original")
|
||||
clone = original.clone(clone_id="cloned")
|
||||
assert clone.id == "cloned"
|
||||
assert original.id == "original"
|
||||
|
||||
def test_for_attribute(self):
|
||||
"""Test that for_ maps to htmlFor."""
|
||||
label = web.label("Test", for_="input-id")
|
||||
assert label.for_ == "input-id"
|
||||
assert 'for="input-id"' in label.outerHTML
|
||||
|
||||
def test_trailing_underscore_removal(self):
|
||||
"""Test that trailing underscores are removed."""
|
||||
div = web.div()
|
||||
div.class_ = "test-class"
|
||||
# Setting class_ should set the className, which affects classes
|
||||
assert "test-class" in div.classes
|
||||
assert div.className == "test-class"
|
||||
|
||||
|
||||
class TestContainerElement:
|
||||
"""Test ContainerElement specific functionality."""
|
||||
|
||||
def test_container_iteration(self):
|
||||
"""Test iterating over container's children."""
|
||||
parent = web.div(web.p("1"), web.p("2"), web.p("3"))
|
||||
children = list(parent)
|
||||
assert len(children) == 3
|
||||
assert children[0].innerHTML == "1"
|
||||
|
||||
def test_container_children_kwarg(self):
|
||||
"""Test creating container with children kwarg."""
|
||||
parent = web.div(children=[web.p("1"), web.p("2")])
|
||||
assert len(parent.children) == 2
|
||||
|
||||
def test_container_html_string(self):
|
||||
"""Test inserting HTML string as child."""
|
||||
parent = web.div("<b>Bold text</b>")
|
||||
assert "Bold text" in parent.innerHTML
|
||||
assert "<b>" in parent.innerHTML
|
||||
|
||||
|
||||
class TestCollection:
|
||||
|
||||
@@ -260,24 +550,6 @@ class TestCollection:
|
||||
assert el == elements[i]
|
||||
assert elements[:] == elements
|
||||
|
||||
def test_style_rule(self):
|
||||
selector = ".multi-elems"
|
||||
elements = web.page.find(selector)
|
||||
for el in elements:
|
||||
assert el.style["background-color"] != "red"
|
||||
|
||||
elements.style["background-color"] = "red"
|
||||
|
||||
for i, el in enumerate(web.page.find(selector)):
|
||||
assert elements[i].style["background-color"] == "red"
|
||||
assert el.style["background-color"] == "red"
|
||||
|
||||
elements.style.remove("background-color")
|
||||
|
||||
for i, el in enumerate(web.page.find(selector)):
|
||||
assert el.style["background-color"] != "red"
|
||||
assert elements[i].style["background-color"] != "red"
|
||||
|
||||
@upytest.skip(
|
||||
"Flakey in Pyodide on Worker",
|
||||
skip_when=RUNNING_IN_WORKER and not upytest.is_micropython,
|
||||
@@ -286,7 +558,7 @@ class TestCollection:
|
||||
called = False
|
||||
call_flag = asyncio.Event()
|
||||
|
||||
buttons_collection = web.page["button"]
|
||||
buttons_collection = web.page.find("button")
|
||||
|
||||
@when("click", buttons_collection)
|
||||
def on_click(event):
|
||||
@@ -304,27 +576,40 @@ class TestCollection:
|
||||
called = False
|
||||
call_flag.clear()
|
||||
|
||||
async def test_when_decorator_on_event(self):
|
||||
call_counter = 0
|
||||
call_flag = asyncio.Event()
|
||||
def test_update_all_single_attribute(self):
|
||||
"""Test updating single attribute on all elements."""
|
||||
div1 = web.div("Content 1")
|
||||
div2 = web.div("Content 2")
|
||||
collection = web.ElementCollection([div1, div2])
|
||||
|
||||
buttons_collection = web.page.find("button")
|
||||
number_of_clicks = len(buttons_collection)
|
||||
collection.update_all(className="updated")
|
||||
assert div1.className == "updated"
|
||||
assert div2.className == "updated"
|
||||
|
||||
@when(buttons_collection.on_click)
|
||||
def on_click(event):
|
||||
nonlocal call_counter
|
||||
call_counter += 1
|
||||
if call_counter == number_of_clicks:
|
||||
call_flag.set()
|
||||
def test_update_all_multiple_attributes(self):
|
||||
"""Test updating multiple attributes on all elements."""
|
||||
div1 = web.div()
|
||||
div2 = web.div()
|
||||
collection = web.ElementCollection([div1, div2])
|
||||
|
||||
# Now let's simulate a click on the button (using the low level JS API)
|
||||
# so we don't risk dom getting in the way
|
||||
assert call_counter == 0
|
||||
for button in buttons_collection:
|
||||
button._dom_element.click()
|
||||
await call_flag.wait()
|
||||
assert call_counter == number_of_clicks
|
||||
collection.update_all(innerHTML="Hello", title="Test")
|
||||
assert div1.innerHTML == "Hello"
|
||||
assert div1.title == "Test"
|
||||
assert div2.innerHTML == "Hello"
|
||||
assert div2.title == "Test"
|
||||
|
||||
def test_collection_getitem_by_id(self):
|
||||
"""Test looking up element by id in collection."""
|
||||
div1 = web.div(web.p("Child", id="find-me"))
|
||||
div2 = web.div(web.p("Child 2"))
|
||||
collection = web.ElementCollection([div1, div2])
|
||||
|
||||
web.page.body.append(div1)
|
||||
web.page.body.append(div2)
|
||||
|
||||
result = collection["find-me"]
|
||||
assert result is not None
|
||||
assert result.id == "find-me"
|
||||
|
||||
|
||||
class TestCreation:
|
||||
@@ -376,8 +661,7 @@ class TestInput:
|
||||
def test_value(self):
|
||||
for id_ in self.input_ids:
|
||||
expected_type = id_.split("_")[-1]
|
||||
result = web.page.find(f"#{id_}")
|
||||
input_el = result[0]
|
||||
input_el = web.page[f"#{id_}"]
|
||||
assert input_el._dom_element.type == expected_type
|
||||
assert (
|
||||
input_el.value == f"Content {id_}" == input_el._dom_element.value
|
||||
@@ -388,41 +672,6 @@ class TestInput:
|
||||
input_el.value = new_value
|
||||
assert input_el.value == new_value
|
||||
|
||||
# Check that we can set the value back to the original using
|
||||
# the collection
|
||||
new_value = f"Content {id_}"
|
||||
result.value = new_value
|
||||
assert input_el.value == new_value
|
||||
|
||||
def test_set_value_collection(self):
|
||||
for id_ in self.input_ids:
|
||||
input_el = web.page.find(f"#{id_}")
|
||||
|
||||
assert input_el.value[0] == f"Content {id_}" == input_el[0].value
|
||||
|
||||
new_value = f"New Value {id_}"
|
||||
input_el.value = new_value
|
||||
assert (
|
||||
input_el.value[0] == new_value == input_el[0].value
|
||||
), f"Expected '{input_el.value}' to be 'Content {id_}' to be '{input_el._dom_element.value}'"
|
||||
|
||||
new_value = f"Content {id_}"
|
||||
input_el.value = new_value
|
||||
|
||||
# TODO: We only attach attributes to the classes that have them now which means we
|
||||
# would have to have some other way to help users if using attributes that aren't
|
||||
# actually on the class. Maybe a job for __setattr__?
|
||||
#
|
||||
# def test_element_without_value(self):
|
||||
# result = web.page.find(f"#tests-terminal"][0]
|
||||
# with upytest.raises(AttributeError):
|
||||
# result.value = "some value"
|
||||
#
|
||||
# def test_element_without_value_via_collection(self):
|
||||
# result = web.page.find(f"#tests-terminal"]
|
||||
# with upytest.raises(AttributeError):
|
||||
# result.value = "some value"
|
||||
|
||||
|
||||
class TestSelect:
|
||||
|
||||
@@ -532,7 +781,7 @@ class TestSelect:
|
||||
|
||||
def test_select_options_remove(self):
|
||||
# GIVEN the existing select element with 3 options
|
||||
select = web.page.find("#test_select_element_to_remove")[0]
|
||||
select = web.page["#test_select_element_to_remove"]
|
||||
|
||||
# EXPECT the select element to have 3 options
|
||||
assert len(select.options) == 4
|
||||
@@ -610,7 +859,7 @@ class TestElements:
|
||||
kwargs = {k: parse_value(v) for k, v in properties.items()}
|
||||
|
||||
# Let's make sure the target div to contain the element is empty.
|
||||
container = web.page["#test-element-container"][0]
|
||||
container = web.page["#test-element-container"]
|
||||
container.innerHTML = ""
|
||||
assert container.innerHTML == "", container.innerHTML
|
||||
|
||||
@@ -881,7 +1130,9 @@ class TestElements:
|
||||
assert el._dom_element.tagName == "LABEL"
|
||||
assert el.for_ == label_for, "The label should have the correct for attribute."
|
||||
# Ensure the label element is rendered with the correct "for" attribute
|
||||
assert f'for="{label_for}"' in el.outerHTML, "The label should have the correct 'for' attribute in its HTML."
|
||||
assert (
|
||||
f'for="{label_for}"' in el.outerHTML
|
||||
), "The label should have the correct 'for' attribute in its HTML."
|
||||
|
||||
def test_legend(self):
|
||||
self._create_el_and_basic_asserts("legend", "some text")
|
||||
@@ -1195,3 +1446,58 @@ class TestElements:
|
||||
assert el.children[1].id == "child2"
|
||||
assert el.children[1].parentNode.textContent == parent_full_content
|
||||
assert el.children[1].textContent == p2_text_content
|
||||
|
||||
|
||||
class TestPageObject:
|
||||
"""Test the Page object."""
|
||||
|
||||
def test_page_getitem_with_id(self):
|
||||
"""Test looking up element by id using page[id]."""
|
||||
result = web.page["test_id_selector"]
|
||||
assert result is not None
|
||||
assert result.id == "test_id_selector"
|
||||
|
||||
def test_page_getitem_with_hash(self):
|
||||
"""Test looking up element by id with # prefix."""
|
||||
result = web.page["#test_id_selector"]
|
||||
assert result is not None
|
||||
assert result.id == "test_id_selector"
|
||||
|
||||
def test_page_getitem_nonexistent(self):
|
||||
"""Test looking up nonexistent id returns None."""
|
||||
result = web.page["nonexistent-id"]
|
||||
assert result is None
|
||||
|
||||
def test_page_title_get(self):
|
||||
"""Test getting page title."""
|
||||
original_title = web.page.title
|
||||
assert isinstance(original_title, str)
|
||||
|
||||
def test_page_title_set(self):
|
||||
"""Test setting page title."""
|
||||
original = web.page.title
|
||||
web.page.title = "Test Title"
|
||||
assert web.page.title == "Test Title"
|
||||
web.page.title = original # Restore
|
||||
|
||||
|
||||
class TestErrorCases:
|
||||
"""Test error handling."""
|
||||
|
||||
def test_invalid_event_name(self):
|
||||
"""Test that invalid event names raise ValueError."""
|
||||
div = web.div()
|
||||
with upytest.raises(ValueError):
|
||||
div.on_nonexistent_event
|
||||
|
||||
def test_invalid_append_type(self):
|
||||
"""Test that appending invalid types raises TypeError."""
|
||||
div = web.div()
|
||||
with upytest.raises(TypeError):
|
||||
div.append(12345) # Numbers can't be appended
|
||||
|
||||
def test_event_name_without_on_prefix(self):
|
||||
"""Test that get_event requires on_ prefix."""
|
||||
div = web.div()
|
||||
with upytest.raises(ValueError):
|
||||
div.get_event("click") # Should be "on_click"
|
||||
|
||||
@@ -8,7 +8,13 @@ import upytest
|
||||
from pyscript import WebSocket
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.")
|
||||
# Websocket tests are disabled by default because they don't reliably work in
|
||||
# playwright based tests. Feel free to set this to False to enable them when
|
||||
# running tests locally in an actual browser (they all pass there).
|
||||
SKIP_WEBSOCKET_TESTS = True
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_with_attributes():
|
||||
"""
|
||||
Event handlers assigned via object attributes.
|
||||
@@ -54,7 +60,7 @@ async def test_websocket_with_attributes():
|
||||
assert closed_flag is True
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.")
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_with_init():
|
||||
"""
|
||||
Event handlers assigned via __init__ arguments.
|
||||
@@ -100,3 +106,174 @@ async def test_websocket_with_init():
|
||||
assert "request served by" in messages[0].lower()
|
||||
assert messages[1] == "Hello, world!"
|
||||
assert closed_flag is True
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_async_handlers():
|
||||
"""
|
||||
Async event handlers should work correctly.
|
||||
"""
|
||||
messages = []
|
||||
ready_to_test = asyncio.Event()
|
||||
|
||||
async def on_open(event):
|
||||
await asyncio.sleep(0)
|
||||
ws.send("async test")
|
||||
|
||||
async def on_message(event):
|
||||
await asyncio.sleep(0)
|
||||
messages.append(event.data)
|
||||
if len(messages) == 2:
|
||||
ws.close()
|
||||
|
||||
async def on_close(event):
|
||||
await asyncio.sleep(0)
|
||||
ready_to_test.set()
|
||||
|
||||
ws = WebSocket(
|
||||
url="wss://echo.websocket.org",
|
||||
onopen=on_open,
|
||||
onmessage=on_message,
|
||||
onclose=on_close,
|
||||
)
|
||||
|
||||
await ready_to_test.wait()
|
||||
assert len(messages) == 2
|
||||
assert messages[1] == "async test"
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_binary_data_conversion():
|
||||
"""
|
||||
WebSocket should convert binary data to memoryview for Python.
|
||||
"""
|
||||
messages = []
|
||||
ready_to_test = asyncio.Event()
|
||||
|
||||
def on_open(event):
|
||||
# Send binary data as bytearray.
|
||||
binary_data = bytearray([0x48, 0x65, 0x6C, 0x6C, 0x6F])
|
||||
ws.send(binary_data)
|
||||
|
||||
def on_message(event):
|
||||
messages.append(event.data)
|
||||
if len(messages) == 2:
|
||||
ws.close()
|
||||
|
||||
def on_close(event):
|
||||
ready_to_test.set()
|
||||
|
||||
ws = WebSocket(
|
||||
url="wss://echo.websocket.org",
|
||||
onopen=on_open,
|
||||
onmessage=on_message,
|
||||
onclose=on_close,
|
||||
)
|
||||
|
||||
await ready_to_test.wait()
|
||||
assert len(messages) == 2
|
||||
# Verify wrapper converts binary to memoryview Python type.
|
||||
assert isinstance(messages[1], memoryview)
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_send_bytes_conversion():
|
||||
"""
|
||||
WebSocket send should convert Python bytes to JS Uint8Array.
|
||||
"""
|
||||
messages = []
|
||||
ready_to_test = asyncio.Event()
|
||||
|
||||
def on_open(event):
|
||||
# Test that bytes are converted properly.
|
||||
ws.send(bytes([0x41, 0x42, 0x43]))
|
||||
|
||||
def on_message(event):
|
||||
messages.append(event.data)
|
||||
if len(messages) == 2:
|
||||
ws.close()
|
||||
|
||||
def on_close(event):
|
||||
ready_to_test.set()
|
||||
|
||||
ws = WebSocket(
|
||||
url="wss://echo.websocket.org",
|
||||
onopen=on_open,
|
||||
onmessage=on_message,
|
||||
onclose=on_close,
|
||||
)
|
||||
|
||||
await ready_to_test.wait()
|
||||
assert len(messages) == 2
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_event_wrapper():
|
||||
"""
|
||||
WebSocketEvent wrapper should provide access to event properties.
|
||||
"""
|
||||
event_types = []
|
||||
ready_to_test = asyncio.Event()
|
||||
|
||||
def on_open(event):
|
||||
event_types.append(event.type)
|
||||
ws.send("test")
|
||||
|
||||
def on_message(event):
|
||||
event_types.append(event.type)
|
||||
# Verify event wrapper exposes properties.
|
||||
assert hasattr(event, "data")
|
||||
assert hasattr(event, "type")
|
||||
if len(event_types) >= 2:
|
||||
ws.close()
|
||||
|
||||
def on_close(event):
|
||||
event_types.append(event.type)
|
||||
ready_to_test.set()
|
||||
|
||||
ws = WebSocket(
|
||||
url="wss://echo.websocket.org",
|
||||
onopen=on_open,
|
||||
onmessage=on_message,
|
||||
onclose=on_close,
|
||||
)
|
||||
|
||||
await ready_to_test.wait()
|
||||
assert "open" in event_types
|
||||
assert "message" in event_types
|
||||
assert "close" in event_types
|
||||
|
||||
|
||||
@upytest.skip("Websocket tests are disabled.", skip_when=SKIP_WEBSOCKET_TESTS)
|
||||
async def test_websocket_reassign_handler():
|
||||
"""
|
||||
Event handlers should be replaceable after creation.
|
||||
"""
|
||||
first_handler_called = False
|
||||
second_handler_called = False
|
||||
ready_to_test = asyncio.Event()
|
||||
|
||||
def first_handler(event):
|
||||
nonlocal first_handler_called
|
||||
first_handler_called = True
|
||||
|
||||
def second_handler(event):
|
||||
nonlocal second_handler_called
|
||||
second_handler_called = True
|
||||
ws.close()
|
||||
|
||||
def on_open(event):
|
||||
# Replace the message handler before any messages arrive.
|
||||
ws.onmessage = second_handler
|
||||
ws.send("test")
|
||||
|
||||
def on_close(event):
|
||||
ready_to_test.set()
|
||||
|
||||
ws = WebSocket(url="wss://echo.websocket.org", onopen=on_open, onclose=on_close)
|
||||
ws.onmessage = first_handler
|
||||
|
||||
await ready_to_test.wait()
|
||||
# Verify that handler replacement worked.
|
||||
assert first_handler_called is False
|
||||
assert second_handler_called is True
|
||||
|
||||
105
core/tests/python/tests/test_workers.py
Normal file
105
core/tests/python/tests/test_workers.py
Normal file
@@ -0,0 +1,105 @@
|
||||
"""
|
||||
Tests for the pyscript.workers module.
|
||||
|
||||
Note: These tests can only run in the main thread since they test worker
|
||||
creation and access.
|
||||
|
||||
I've added the import of workers and create_named_worker inside each test
|
||||
so that the test module can still be imported in a worker context without
|
||||
errors. It also means the module is GC'd between tests, which is a good way
|
||||
to ensure each test is independent given the global nature of the workers
|
||||
proxy.
|
||||
"""
|
||||
|
||||
import upytest
|
||||
from pyscript import RUNNING_IN_WORKER
|
||||
|
||||
|
||||
@upytest.skip("Main thread only", skip_when=RUNNING_IN_WORKER)
|
||||
async def test_workers_proxy_exists():
|
||||
"""
|
||||
The workers proxy should be accessible and support both.
|
||||
bracket and dot notation.
|
||||
"""
|
||||
from pyscript import workers
|
||||
|
||||
assert workers is not None
|
||||
# Defined in the HTML.
|
||||
worker = await workers["testworker"]
|
||||
assert worker is not None
|
||||
worker = await workers.testworker
|
||||
assert worker is not None
|
||||
|
||||
|
||||
@upytest.skip("Main thread only", skip_when=RUNNING_IN_WORKER)
|
||||
async def test_worker_exported_functions():
|
||||
"""
|
||||
Functions exported from a worker should be callable.
|
||||
"""
|
||||
from pyscript import workers
|
||||
|
||||
worker = await workers["testworker"]
|
||||
# Test multiple exported functions.
|
||||
add_result = await worker.add(10, 20)
|
||||
multiply_result = await worker.multiply(4, 5)
|
||||
greeting = await worker.get_message()
|
||||
|
||||
assert add_result == 30
|
||||
assert multiply_result == 20
|
||||
assert greeting == "Hello from worker"
|
||||
|
||||
|
||||
@upytest.skip("Main thread only", skip_when=RUNNING_IN_WORKER)
|
||||
async def test_create_named_worker_basic():
|
||||
"""
|
||||
Creating a named worker dynamically should work.
|
||||
"""
|
||||
from pyscript import create_named_worker, workers
|
||||
|
||||
worker = await create_named_worker(
|
||||
src="./worker_functions.py", name="dynamic-test-worker"
|
||||
)
|
||||
|
||||
assert worker is not None
|
||||
# Verify we can call its functions.
|
||||
result = await worker.add(1, 2)
|
||||
assert result == 3
|
||||
# Verify it's also accessible via the workers proxy.
|
||||
same_worker = await workers["dynamic-test-worker"]
|
||||
result2 = await same_worker.add(3, 4)
|
||||
assert result2 == 7
|
||||
|
||||
|
||||
@upytest.skip("Main thread only", skip_when=RUNNING_IN_WORKER)
|
||||
async def test_create_named_worker_with_config():
|
||||
"""
|
||||
Creating a worker with configuration should work.
|
||||
"""
|
||||
from pyscript import create_named_worker
|
||||
|
||||
# Create worker with a PyScript configuration dict.
|
||||
worker = await create_named_worker(
|
||||
src="./worker_functions.py",
|
||||
name="configured-worker",
|
||||
config={"packages_cache": "never"},
|
||||
)
|
||||
assert worker is not None
|
||||
# Worker should still function normally.
|
||||
result = await worker.multiply(6, 7)
|
||||
assert result == 42
|
||||
|
||||
|
||||
@upytest.skip("Main thread only", skip_when=RUNNING_IN_WORKER)
|
||||
async def test_create_named_worker_micropython():
|
||||
"""
|
||||
Creating a MicroPython worker should work.
|
||||
"""
|
||||
from pyscript import create_named_worker
|
||||
|
||||
worker = await create_named_worker(
|
||||
src="./worker_functions.py", name="mpy-worker", type="mpy"
|
||||
)
|
||||
assert worker is not None
|
||||
# Verify functionality.
|
||||
result = await worker.add(100, 200)
|
||||
assert result == 300
|
||||
18
core/tests/python/worker_functions.py
Normal file
18
core/tests/python/worker_functions.py
Normal file
@@ -0,0 +1,18 @@
|
||||
"""
|
||||
Numpty test code to run in a worker for pyscript.workers module tests.
|
||||
"""
|
||||
|
||||
|
||||
def add(a, b):
|
||||
return a + b
|
||||
|
||||
|
||||
def multiply(a, b):
|
||||
return a * b
|
||||
|
||||
|
||||
def get_message():
|
||||
return "Hello from worker"
|
||||
|
||||
|
||||
__export__ = ["add", "multiply", "get_message"]
|
||||
Reference in New Issue
Block a user