Files
pyscript/core/src/stdlib/pyscript/fetch.py
Nicholas Tollervey 9dad29ec17 Refactor repository. Fixes #2161 (#2192)
* Remove duplicate LICENSE.
* Remove un-userd pyscript.sw directory and its content.
* Remove ReadTheDocs settings (unused).
* Remove un-used pyproject.toml
* Remove now unused CHANGELOG. Changes now tracked via release notes on GitHub.
* Updated / cleaned release page template and associated GH actions.
* Update prettierignore to remove un-needed refs.
* Move troubleshooting into correct README.
* Add reason for the index.html
* Rename the "pyscript.core" directory to "core".
* Update PR template because CHANGELOG is no longer used.
* Codespell configuration in pyproject.toml.
* Update pyscript.core -> core in .githubignore
* Remove test-results/.last-run.json. This should be ignored by git.
* Pin nodejs version.

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2024-09-30 10:29:26 +01:00

88 lines
2.4 KiB
Python

import json
import js
from pyscript.util import as_bytearray
### wrap the response to grant Pythonic results
class _Response:
def __init__(self, response):
self._response = response
# grant access to response.ok and other fields
def __getattr__(self, attr):
return getattr(self._response, attr)
# exposed methods with Pythonic results
async def arrayBuffer(self):
buffer = await self._response.arrayBuffer()
# works in Pyodide
if hasattr(buffer, "to_py"):
return buffer.to_py()
# shims in MicroPython
return memoryview(as_bytearray(buffer))
async def blob(self):
return await self._response.blob()
async def bytearray(self):
buffer = await self._response.arrayBuffer()
return as_bytearray(buffer)
async def json(self):
return json.loads(await self.text())
async def text(self):
return await self._response.text()
### allow direct await to _Response methods
class _DirectResponse:
@staticmethod
def setup(promise, response):
promise._response = _Response(response)
return promise._response
def __init__(self, promise):
self._promise = promise
promise._response = None
promise.arrayBuffer = self.arrayBuffer
promise.blob = self.blob
promise.bytearray = self.bytearray
promise.json = self.json
promise.text = self.text
async def _response(self):
if not self._promise._response:
await self._promise
return self._promise._response
async def arrayBuffer(self):
response = await self._response()
return await response.arrayBuffer()
async def blob(self):
response = await self._response()
return await response.blob()
async def bytearray(self):
response = await self._response()
return await response.bytearray()
async def json(self):
response = await self._response()
return await response.json()
async def text(self):
response = await self._response()
return await response.text()
def fetch(url, **kw):
# workaround Pyodide / MicroPython dict <-> js conversion
options = js.JSON.parse(json.dumps(kw))
awaited = lambda response, *args: _DirectResponse.setup(promise, response)
promise = js.fetch(url, options).then(awaited)
_DirectResponse(promise)
return promise