mirror of
https://github.com/pyscript/pyscript.git
synced 2025-12-20 10:47:35 -05:00
* pre-commit: Add ruff to replace bandit, flake8, isort, and pyupgrade * Upgrade ruff * Update .pre-commit-config.yaml * Update .pre-commit-config.yaml * Update .pre-commit-config.yaml
776 lines
26 KiB
Python
776 lines
26 KiB
Python
import dataclasses
|
|
import math
|
|
import os
|
|
import pdb
|
|
import re
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import urllib
|
|
from dataclasses import dataclass
|
|
|
|
import py
|
|
import pytest
|
|
from playwright.sync_api import Error as PlaywrightError
|
|
|
|
ROOT = py.path.local(__file__).dirpath("..", "..", "..")
|
|
BUILD = ROOT.join("pyscriptjs", "build")
|
|
|
|
|
|
@pytest.mark.usefixtures("init")
|
|
class PyScriptTest:
|
|
"""
|
|
Base class to write PyScript integration tests, based on playwright.
|
|
|
|
It provides a simple API to generate HTML files and load them in
|
|
playwright.
|
|
|
|
It also provides a Pythonic API on top of playwright for the most
|
|
common tasks; in particular:
|
|
|
|
- self.console collects all the JS console.* messages. Look at the doc
|
|
of ConsoleMessageCollection for more details.
|
|
|
|
- self.check_js_errors() checks that no JS errors have been thrown
|
|
|
|
- after each test, self.check_js_errors() is automatically run to ensure
|
|
that no JS error passes uncaught.
|
|
|
|
- self.wait_for_console waits until the specified message appears in the
|
|
console
|
|
|
|
- self.wait_for_pyscript waits until all the PyScript tags have been
|
|
evaluated
|
|
|
|
- self.pyscript_run is the main entry point for pyscript tests: it
|
|
creates an HTML page to run the specified snippet.
|
|
"""
|
|
|
|
# Pyodide always print()s this message upon initialization. Make it
|
|
# available to all tests so that it's easiert to check.
|
|
PY_COMPLETE = "Python initialization complete"
|
|
|
|
@pytest.fixture()
|
|
def init(self, request, tmpdir, logger, page):
|
|
"""
|
|
Fixture to automatically initialize all the tests in this class and its
|
|
subclasses.
|
|
|
|
The magic is done by the decorator @pytest.mark.usefixtures("init"),
|
|
which tells pytest to automatically use this fixture for all the test
|
|
method of this class.
|
|
|
|
Using the standard pytest behavior, we can request more fixtures:
|
|
tmpdir, and page; 'page' is a fixture provided by pytest-playwright.
|
|
|
|
Then, we save these fixtures on the self and proceed with more
|
|
initialization. The end result is that the requested fixtures are
|
|
automatically made available as self.xxx in all methods.
|
|
"""
|
|
self.testname = request.function.__name__.replace("test_", "")
|
|
self.tmpdir = tmpdir
|
|
# create a symlink to BUILD inside tmpdir
|
|
tmpdir.join("build").mksymlinkto(BUILD)
|
|
self.tmpdir.chdir()
|
|
self.logger = logger
|
|
|
|
if request.config.option.no_fake_server:
|
|
# use a real HTTP server. Note that as soon as we request the
|
|
# fixture, the server automatically starts in its own thread.
|
|
self.http_server = request.getfixturevalue("http_server")
|
|
self.router = None
|
|
self.is_fake_server = False
|
|
else:
|
|
# use the internal playwright routing
|
|
self.http_server = "http://fake_server"
|
|
self.router = SmartRouter(
|
|
"fake_server",
|
|
cache=request.config.cache,
|
|
logger=logger,
|
|
usepdb=request.config.option.usepdb,
|
|
)
|
|
self.router.install(page)
|
|
self.is_fake_server = True
|
|
#
|
|
self.init_page(page)
|
|
#
|
|
# this extra print is useful when using pytest -s, else we start printing
|
|
# in the middle of the line
|
|
print()
|
|
#
|
|
# if you use pytest --headed you can see the browser page while
|
|
# playwright executes the tests, but the page is closed very quickly
|
|
# as soon as the test finishes. To avoid that, we automatically start
|
|
# a pdb so that we can wait as long as we want.
|
|
yield
|
|
if request.config.option.headed:
|
|
pdb.Pdb.intro = (
|
|
"\n"
|
|
"This (Pdb) was started automatically because you passed --headed:\n"
|
|
"the execution of the test pauses here to give you the time to inspect\n"
|
|
"the browser. When you are done, type one of the following commands:\n"
|
|
" (Pdb) continue\n"
|
|
" (Pdb) cont\n"
|
|
" (Pdb) c\n"
|
|
)
|
|
pdb.set_trace()
|
|
|
|
def init_page(self, page):
|
|
self.page = page
|
|
|
|
# set default timeout to 60000 millliseconds from 30000
|
|
page.set_default_timeout(60000)
|
|
|
|
self.console = ConsoleMessageCollection(self.logger)
|
|
self._js_errors = []
|
|
page.on("console", self._on_console)
|
|
page.on("pageerror", self._on_pageerror)
|
|
|
|
def teardown_method(self):
|
|
# we call check_js_errors on teardown: this means that if there are still
|
|
# non-cleared errors, the test will fail. If you expect errors in your
|
|
# page and they should not cause the test to fail, you should call
|
|
# self.check_js_errors() in the test itself.
|
|
self.check_js_errors()
|
|
|
|
def _on_console(self, msg):
|
|
self.console.add_message(msg.type, msg.text)
|
|
|
|
def _on_pageerror(self, error):
|
|
self.console.add_message("js_error", error.stack)
|
|
self._js_errors.append(error)
|
|
|
|
def check_js_errors(self, *expected_messages):
|
|
"""
|
|
Check whether JS errors were reported.
|
|
|
|
expected_messages is a list of strings of errors that you expect they
|
|
were raised in the page. They are checked using a simple 'in' check,
|
|
equivalent to this:
|
|
if expected_message in actual_error_message:
|
|
...
|
|
|
|
If an error was expected but not found, it raises
|
|
DidNotRaiseJsError().
|
|
|
|
If there are MORE errors other than the expected ones, it raises JsErrors.
|
|
|
|
Upon return, all the errors are cleared, so a subsequent call to
|
|
check_js_errors will not raise, unless NEW JS errors have been reported
|
|
in the meantime.
|
|
"""
|
|
expected_messages = list(expected_messages)
|
|
js_errors = self._js_errors[:]
|
|
|
|
for i, msg in enumerate(expected_messages):
|
|
for j, error in enumerate(js_errors):
|
|
if msg is not None and error is not None and msg in error.message:
|
|
# we matched one expected message with an error, remove both
|
|
expected_messages[i] = None
|
|
js_errors[j] = None
|
|
|
|
# if everything is find, now expected_messages and js_errors contains
|
|
# only Nones. If they contain non-None elements, it means that we
|
|
# either have messages which are expected-but-not-found or errors
|
|
# which are found-but-not-expected.
|
|
expected_messages = [msg for msg in expected_messages if msg is not None]
|
|
js_errors = [err for err in js_errors if err is not None]
|
|
self.clear_js_errors()
|
|
|
|
if expected_messages:
|
|
# expected-but-not-found
|
|
raise JsErrorsDidNotRaise(expected_messages, js_errors)
|
|
|
|
if js_errors:
|
|
# found-but-not-expected
|
|
raise JsErrors(js_errors)
|
|
|
|
def clear_js_errors(self):
|
|
"""
|
|
Clear all JS errors.
|
|
"""
|
|
self._js_errors = []
|
|
|
|
def writefile(self, filename, content):
|
|
"""
|
|
Very thin helper to write a file in the tmpdir
|
|
"""
|
|
f = self.tmpdir.join(filename)
|
|
f.dirpath().ensure(dir=True)
|
|
f.write(content)
|
|
|
|
def goto(self, path):
|
|
self.logger.reset()
|
|
self.logger.log("page.goto", path, color="yellow")
|
|
url = f"{self.http_server}/{path}"
|
|
self.page.goto(url, timeout=0)
|
|
|
|
def wait_for_console(self, text, *, timeout=None, check_js_errors=True):
|
|
"""
|
|
Wait until the given message appear in the console.
|
|
|
|
Note: it must be the *exact* string as printed by e.g. console.log.
|
|
If you need more control on the predicate (e.g. if you want to match a
|
|
substring), use self.page.expect_console_message directly.
|
|
|
|
timeout is expressed in milliseconds. If it's None, it will use
|
|
playwright's own default value, which is 30 seconds).
|
|
|
|
If check_js_errors is True (the default), it also checks that no JS
|
|
errors were raised during the waiting.
|
|
"""
|
|
|
|
def pred(msg):
|
|
return msg.text == text
|
|
|
|
try:
|
|
with self.page.expect_console_message(pred, timeout=timeout):
|
|
pass
|
|
finally:
|
|
# raise JsError if there were any javascript exception. Note that
|
|
# this might happen also in case of a TimeoutError. In that case,
|
|
# the JsError will shadow the TimeoutError but this is correct,
|
|
# because it's very likely that the console message never appeared
|
|
# precisely because of the exception in JS.
|
|
if check_js_errors:
|
|
self.check_js_errors()
|
|
|
|
def wait_for_pyscript(self, *, timeout=None, check_js_errors=True):
|
|
"""
|
|
Wait until pyscript has been fully loaded.
|
|
|
|
Timeout is expressed in milliseconds. If it's None, it will use
|
|
playwright's own default value, which is 30 seconds).
|
|
|
|
If check_js_errors is True (the default), it also checks that no JS
|
|
errors were raised during the waiting.
|
|
"""
|
|
# this is printed by interpreter.ts:Interpreter.initialize
|
|
self.wait_for_console(
|
|
"[pyscript/main] PyScript page fully initialized",
|
|
timeout=timeout,
|
|
check_js_errors=check_js_errors,
|
|
)
|
|
# We still don't know why this wait is necessary, but without it
|
|
# events aren't being triggered in the tests.
|
|
self.page.wait_for_timeout(100)
|
|
|
|
def pyscript_run(self, snippet, *, extra_head="", wait_for_pyscript=True):
|
|
"""
|
|
Main entry point for pyscript tests.
|
|
|
|
snippet contains a fragment of HTML which will be put inside a full
|
|
HTML document. In particular, the <head> automatically contains the
|
|
correct <script> and <link> tags which are necessary to load pyscript
|
|
correctly.
|
|
|
|
This method does the following:
|
|
- write a full HTML file containing the snippet
|
|
- open a playwright page for it
|
|
- wait until pyscript has been fully loaded
|
|
"""
|
|
doc = f"""
|
|
<html>
|
|
<head>
|
|
<link rel="stylesheet" href="{self.http_server}/build/pyscript.css" />
|
|
<script defer src="{self.http_server}/build/pyscript.js"></script>
|
|
{extra_head}
|
|
</head>
|
|
<body>
|
|
{snippet}
|
|
</body>
|
|
</html>
|
|
"""
|
|
filename = f"{self.testname}.html"
|
|
self.writefile(filename, doc)
|
|
self.goto(filename)
|
|
if wait_for_pyscript:
|
|
self.wait_for_pyscript()
|
|
|
|
def iter_locator(self, loc):
|
|
"""
|
|
Helper method to iterate over all the elements which are matched by a
|
|
locator, since playwright does not seem to support it natively.
|
|
"""
|
|
n = loc.count()
|
|
elems = [loc.nth(i) for i in range(n)]
|
|
return iter(elems)
|
|
|
|
def assert_no_banners(self):
|
|
"""
|
|
Ensure that there are no alert banners on the page, which are used for
|
|
errors and warnings. Raise AssertionError if any if found.
|
|
"""
|
|
loc = self.page.locator(".alert-banner")
|
|
n = loc.count()
|
|
if n > 0:
|
|
text = "\n".join(loc.all_inner_texts())
|
|
raise AssertionError(f"Found {n} alert banners:\n" + text)
|
|
|
|
def assert_banner_message(self, expected_message):
|
|
"""
|
|
Ensure that there is an alert banner on the page with the given message.
|
|
Currently it only handles a single.
|
|
"""
|
|
banner = self.page.wait_for_selector(".alert-banner")
|
|
banner_text = banner.inner_text()
|
|
|
|
if expected_message not in banner_text:
|
|
raise AssertionError(
|
|
f"Expected message '{expected_message}' does not "
|
|
f"match banner text '{banner_text}'"
|
|
)
|
|
return True
|
|
|
|
def check_tutor_generated_code(self, modules_to_check=None):
|
|
"""
|
|
Ensure that the source code viewer injected by the PyTutor plugin
|
|
is presend. Raise AssertionError if not found.
|
|
|
|
Args:
|
|
|
|
modules_to_check(str): iterable with names of the python modules
|
|
that have been included in the tutor config
|
|
and needs to be checked (if they are included
|
|
in the displayed source code)
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
# Given: a page that has a <py-tutor> tag
|
|
assert self.page.locator("py-tutor").count()
|
|
|
|
# EXPECT that"
|
|
#
|
|
# the page has the "view-code-button"
|
|
view_code_button = self.page.locator("#view-code-button")
|
|
vcb_count = view_code_button.count()
|
|
if vcb_count != 1:
|
|
raise AssertionError(
|
|
f"Found {vcb_count} code view button. Should have been 1!"
|
|
)
|
|
|
|
# the page has the code-section element
|
|
code_section = self.page.locator("#code-section")
|
|
code_section_count = code_section.count()
|
|
code_msg = (
|
|
f"One (and only one) code section should exist. Found: {code_section_count}"
|
|
)
|
|
assert code_section_count == 1, code_msg
|
|
|
|
pyconfig_tag = self.page.locator("py-config")
|
|
code_section_inner_html = code_section.inner_html()
|
|
|
|
# the code_section has the index.html section
|
|
assert "<p>index.html</p>" in code_section_inner_html
|
|
|
|
# the section has the tags highlighting the HTML code
|
|
assert (
|
|
'<pre class="prism-code language-html" tabindex="0">'
|
|
' <code class="language-html">' in code_section_inner_html
|
|
)
|
|
|
|
# if modules were included, these are also presented in the code section
|
|
if modules_to_check:
|
|
for module in modules_to_check:
|
|
assert f"{module}" in code_section_inner_html
|
|
|
|
# the section also includes the config
|
|
assert "<</span>py-config</span>" in code_section_inner_html
|
|
|
|
# the contents of the py-config tag are included in the code section
|
|
assert pyconfig_tag.inner_html() in code_section_inner_html
|
|
|
|
# the code section to be invisible by default (by having the hidden class)
|
|
assert "code-section-hidden" in code_section.get_attribute("class")
|
|
|
|
# once the view_code_button is pressed, the code section becomes visible
|
|
view_code_button.click()
|
|
assert "code-section-visible" in code_section.get_attribute("class")
|
|
|
|
|
|
# ============== Helpers and utility functions ==============
|
|
|
|
MAX_TEST_TIME = 30 # Number of seconds allowed for checking a testing condition
|
|
TEST_TIME_INCREMENT = 0.25 # 1/4 second, the length of each iteration
|
|
TEST_ITERATIONS = math.ceil(
|
|
MAX_TEST_TIME / TEST_TIME_INCREMENT
|
|
) # 120 iters of 1/4 second
|
|
|
|
|
|
def wait_for_render(page, selector, pattern):
|
|
"""
|
|
Assert that rendering inserts data into the page as expected: search the
|
|
DOM from within the timing loop for a string that is not present in the
|
|
initial markup but should appear by way of rendering
|
|
"""
|
|
re_sub_content = re.compile(pattern)
|
|
py_rendered = False # Flag to be set to True when condition met
|
|
|
|
for _ in range(TEST_ITERATIONS):
|
|
content = page.inner_html(selector)
|
|
if re_sub_content.search(content):
|
|
py_rendered = True
|
|
break
|
|
time.sleep(TEST_TIME_INCREMENT)
|
|
|
|
assert py_rendered # nosec
|
|
|
|
|
|
class JsErrors(Exception):
|
|
"""
|
|
Represent one or more exceptions which happened in JS.
|
|
|
|
It's a thin wrapper around playwright.sync_api.Error, with two important
|
|
differences:
|
|
|
|
1. it has a better name: if you see JsError in a traceback, it's
|
|
immediately obvious that it's a JS exception.
|
|
|
|
2. Show also the JS stacktrace by default, contrarily to
|
|
playwright.sync_api.Error
|
|
"""
|
|
|
|
def __init__(self, errors):
|
|
n = len(errors)
|
|
assert n != 0
|
|
lines = [f"JS errors found: {n}"]
|
|
for err in errors:
|
|
lines.append(self.format_playwright_error(err))
|
|
msg = "\n".join(lines)
|
|
super().__init__(msg)
|
|
self.errors = errors
|
|
|
|
@staticmethod
|
|
def format_playwright_error(error):
|
|
# apparently, playwright Error.stack contains all the info that we
|
|
# want: exception name, message and stacktrace. The docs say that
|
|
# error.stack is optional, so fallback to the standard repr if it's
|
|
# unavailable.
|
|
return error.stack or str(error)
|
|
|
|
|
|
class JsErrorsDidNotRaise(Exception):
|
|
"""
|
|
Exception raised by check_js_errors when the expected JS error messages
|
|
are not found.
|
|
"""
|
|
|
|
def __init__(self, expected_messages, errors):
|
|
lines = ["The following JS errors were expected but could not be found:"]
|
|
for msg in expected_messages:
|
|
lines.append(" - " + msg)
|
|
if errors:
|
|
lines.append("---")
|
|
lines.append("The following JS errors were raised but not expected:")
|
|
for err in errors:
|
|
lines.append(JsErrors.format_playwright_error(err))
|
|
msg = "\n".join(lines)
|
|
super().__init__(msg)
|
|
self.expected_messages = expected_messages
|
|
self.errors = errors
|
|
|
|
|
|
class ConsoleMessageCollection:
|
|
"""
|
|
Helper class to collect and expose ConsoleMessage in a Pythonic way.
|
|
|
|
Usage:
|
|
|
|
console.log.messages: list of ConsoleMessage with type=='log'
|
|
console.log.lines: list of strings
|
|
console.log.text: the whole text as single string
|
|
|
|
console.debug.* same as above, but with different types
|
|
console.info.*
|
|
console.error.*
|
|
console.warning.*
|
|
|
|
console.js_error.* this is a special category which does not exist in the
|
|
browser: it prints uncaught JS exceptions
|
|
|
|
console.all.* same as the individual categories but considering
|
|
all messages which were sent to the console
|
|
"""
|
|
|
|
@dataclass
|
|
class Message:
|
|
type: str # 'log', 'info', 'debug', etc.
|
|
text: str
|
|
|
|
class View:
|
|
"""
|
|
Filter console messages by the given msg_type
|
|
"""
|
|
|
|
def __init__(self, console, msg_type):
|
|
self.console = console
|
|
self.msg_type = msg_type
|
|
|
|
@property
|
|
def messages(self):
|
|
if self.msg_type is None:
|
|
return self.console._messages
|
|
else:
|
|
return [
|
|
msg for msg in self.console._messages if msg.type == self.msg_type
|
|
]
|
|
|
|
@property
|
|
def lines(self):
|
|
return [msg.text for msg in self.messages]
|
|
|
|
@property
|
|
def text(self):
|
|
return "\n".join(self.lines)
|
|
|
|
_COLORS = {
|
|
"warning": "brown",
|
|
"error": "darkred",
|
|
"js_error": "red",
|
|
}
|
|
|
|
def __init__(self, logger):
|
|
self.logger = logger
|
|
self._messages = []
|
|
self.all = self.View(self, None)
|
|
self.log = self.View(self, "log")
|
|
self.debug = self.View(self, "debug")
|
|
self.info = self.View(self, "info")
|
|
self.error = self.View(self, "error")
|
|
self.warning = self.View(self, "warning")
|
|
self.js_error = self.View(self, "js_error")
|
|
|
|
def add_message(self, type, text):
|
|
# log the message: pytest will capture the output and display the
|
|
# messages if the test fails.
|
|
msg = self.Message(type=type, text=text)
|
|
category = f"console.{msg.type}"
|
|
color = self._COLORS.get(msg.type)
|
|
self.logger.log(category, msg.text, color=color)
|
|
self._messages.append(msg)
|
|
|
|
|
|
class Logger:
|
|
"""
|
|
Helper class to log messages to stdout.
|
|
|
|
Features:
|
|
- nice formatted category
|
|
- keep track of time passed since the last reset
|
|
- support colors
|
|
|
|
NOTE: the (lowercase) logger fixture is defined in conftest.py
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.reset()
|
|
# capture things like [pyscript/main]
|
|
self.prefix_regexp = re.compile(r"(\[.+?\])")
|
|
|
|
def reset(self):
|
|
self.start_time = time.time()
|
|
|
|
def colorize_prefix(self, text, *, color):
|
|
# find the first occurrence of something like [pyscript/main] and
|
|
# colorize it
|
|
start, end = Color.escape_pair(color)
|
|
return self.prefix_regexp.sub(rf"{start}\1{end}", text, 1)
|
|
|
|
def log(self, category, text, *, color=None):
|
|
delta = time.time() - self.start_time
|
|
text = self.colorize_prefix(text, color="teal")
|
|
line = f"[{delta:6.2f} {category:16}] {text}"
|
|
if color:
|
|
line = Color.set(color, line)
|
|
print(line)
|
|
|
|
|
|
class Color:
|
|
"""
|
|
Helper method to print colored output using ANSI escape codes.
|
|
"""
|
|
|
|
black = "30"
|
|
darkred = "31"
|
|
darkgreen = "32"
|
|
brown = "33"
|
|
darkblue = "34"
|
|
purple = "35"
|
|
teal = "36"
|
|
lightgray = "37"
|
|
darkgray = "30;01"
|
|
red = "31;01"
|
|
green = "32;01"
|
|
yellow = "33;01"
|
|
blue = "34;01"
|
|
fuchsia = "35;01"
|
|
turquoise = "36;01"
|
|
white = "37;01"
|
|
|
|
@classmethod
|
|
def set(cls, color, string):
|
|
start, end = cls.escape_pair(color)
|
|
return f"{start}{string}{end}"
|
|
|
|
@classmethod
|
|
def escape_pair(cls, color):
|
|
try:
|
|
color = getattr(cls, color)
|
|
except AttributeError:
|
|
pass
|
|
start = f"\x1b[{color}m"
|
|
end = "\x1b[00m"
|
|
return start, end
|
|
|
|
|
|
class SmartRouter:
|
|
"""
|
|
A smart router to be used in conjunction with playwright.Page.route.
|
|
|
|
Main features:
|
|
|
|
- it intercepts the requests to a local "fake server" and serve them
|
|
statically from disk
|
|
|
|
- it intercepts the requests to the network and cache the results
|
|
locally
|
|
"""
|
|
|
|
@dataclass
|
|
class CachedResponse:
|
|
"""
|
|
We cannot put playwright's APIResponse instances inside _cache, because
|
|
they are valid only in the context of the same page. As a workaround,
|
|
we manually save status, headers and body of each cached response.
|
|
"""
|
|
|
|
status: int
|
|
headers: dict
|
|
body: str
|
|
|
|
def asdict(self):
|
|
return dataclasses.asdict(self)
|
|
|
|
@classmethod
|
|
def fromdict(cls, d):
|
|
return cls(**d)
|
|
|
|
def __init__(self, fake_server, *, cache, logger, usepdb=False):
|
|
"""
|
|
fake_server: the domain name of the fake server
|
|
"""
|
|
self.fake_server = fake_server
|
|
self.cache = cache # this is pytest-cache, it survives across sessions
|
|
self.logger = logger
|
|
self.usepdb = usepdb
|
|
self.page = None
|
|
self.requests = [] # (status, kind, url)
|
|
|
|
def install(self, page):
|
|
"""
|
|
Install the smart router on a page
|
|
"""
|
|
self.page = page
|
|
self.page.route("**", self.router)
|
|
|
|
def router(self, route):
|
|
"""
|
|
Intercept and fulfill playwright requests.
|
|
|
|
NOTE!
|
|
If we raise an exception inside router, playwright just hangs and the
|
|
exception seems not to be propagated outside. It's very likely a
|
|
playwright bug.
|
|
|
|
This means that for example pytest doesn't have any chance to
|
|
intercept the exception and fail in a meaningful way.
|
|
|
|
As a workaround, we try to intercept exceptions by ourselves, print
|
|
something reasonable on the console and abort the request (hoping that
|
|
the test will fail cleaninly, that's the best we can do). We also try
|
|
to respect pytest --pdb, for what it's possible.
|
|
"""
|
|
try:
|
|
return self._router(route)
|
|
except Exception:
|
|
print("***** Error inside Fake_Server.router *****")
|
|
info = sys.exc_info()
|
|
print(traceback.format_exc())
|
|
if self.usepdb:
|
|
pdb.post_mortem(info[2])
|
|
route.abort()
|
|
|
|
def log_request(self, status, kind, url):
|
|
self.requests.append((status, kind, url))
|
|
color = "blue" if status == 200 else "red"
|
|
self.logger.log("request", f"{status} - {kind} - {url}", color=color)
|
|
|
|
def _router(self, route):
|
|
full_url = route.request.url
|
|
url = urllib.parse.urlparse(full_url)
|
|
assert url.scheme in ("http", "https")
|
|
|
|
# requests to http://fake_server/ are served from the current dir and
|
|
# never cached
|
|
if url.netloc == self.fake_server:
|
|
self.log_request(200, "fake_server", full_url)
|
|
assert url.path[0] == "/"
|
|
relative_path = url.path[1:]
|
|
if os.path.exists(relative_path):
|
|
route.fulfill(status=200, path=relative_path)
|
|
else:
|
|
route.fulfill(status=404)
|
|
return
|
|
|
|
# network requests might be cached
|
|
resp = self.fetch_from_cache(full_url)
|
|
if resp is not None:
|
|
kind = "CACHED"
|
|
else:
|
|
kind = "NETWORK"
|
|
resp = self.fetch_from_network(route.request)
|
|
self.save_resp_to_cache(full_url, resp)
|
|
|
|
self.log_request(resp.status, kind, full_url)
|
|
route.fulfill(status=resp.status, headers=resp.headers, body=resp.body)
|
|
|
|
def clear_cache(self, url):
|
|
key = "pyscript/" + url
|
|
self.cache.set(key, None)
|
|
|
|
def save_resp_to_cache(self, url, resp):
|
|
key = "pyscript/" + url
|
|
data = resp.asdict()
|
|
# cache.set encodes it as JSON, and "bytes" are not supported: let's
|
|
# encode them as latin-1
|
|
data["body"] = data["body"].decode("latin-1")
|
|
self.cache.set(key, data)
|
|
|
|
def fetch_from_cache(self, url):
|
|
key = "pyscript/" + url
|
|
data = self.cache.get(key, None)
|
|
if data is None:
|
|
return None
|
|
# see the corresponding comment in save_resp_to_cache
|
|
data["body"] = data["body"].encode("latin-1")
|
|
return self.CachedResponse(**data)
|
|
|
|
def fetch_from_network(self, request):
|
|
# sometimes the network is flaky and if the first request doesn't
|
|
# work, a subsequent one works. Instead of giving up immediately,
|
|
# let's try twice
|
|
try:
|
|
api_response = self.page.request.fetch(request)
|
|
except PlaywrightError:
|
|
# sleep a bit and try again
|
|
time.sleep(0.5)
|
|
api_response = self.page.request.fetch(request)
|
|
|
|
cached_response = self.CachedResponse(
|
|
status=api_response.status,
|
|
headers=api_response.headers,
|
|
body=api_response.body(),
|
|
)
|
|
return cached_response
|