mirror of
https://github.com/pyscript/pyscript.git
synced 2025-12-19 10:17:23 -05:00
Remove superfluous code now MicroPython supports inspect API for function signature inspection. (#2387)
* Remove superfluous code now MicroPython supports inspect API for function signature inspection. * Added test to ensure all callables are covered.
This commit is contained in:
committed by
GitHub
parent
b609b605f5
commit
ffc78ab6a2
@@ -3,7 +3,7 @@ export default {
|
||||
"pyscript": {
|
||||
"__init__.py": "from polyscript import lazy_py_modules as py_import\nfrom pyscript.magic_js import RUNNING_IN_WORKER,PyWorker,config,current_target,document,js_import,js_modules,sync,window\nfrom pyscript.display import HTML,display\nfrom pyscript.fetch import fetch\nfrom pyscript.storage import Storage,storage\nfrom pyscript.websocket import WebSocket\nfrom pyscript.events import when,Event\nif not RUNNING_IN_WORKER:from pyscript.workers import create_named_worker,workers",
|
||||
"display.py": "_K='_repr_mimebundle_'\n_J='image/svg+xml'\n_I='application/json'\n_H='__repr__'\n_G='savefig'\n_F='text/html'\n_E='image/jpeg'\n_D='application/javascript'\n_C='utf-8'\n_B='text/plain'\n_A='image/png'\nimport base64,html,io,re\nfrom pyscript.magic_js import current_target,document,window\nfrom pyscript.ffi import is_none\n_MIME_METHODS={_G:_A,'_repr_javascript_':_D,'_repr_json_':_I,'_repr_latex':'text/latex','_repr_png_':_A,'_repr_jpeg_':_E,'_repr_pdf_':'application/pdf','_repr_svg_':_J,'_repr_markdown_':'text/markdown','_repr_html_':_F,_H:_B}\ndef _render_image(mime,value,meta):\n\tA=value\n\tif isinstance(A,bytes):A=base64.b64encode(A).decode(_C)\n\tB=re.compile('^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$')\n\tif len(A)>0 and not B.match(A):A=base64.b64encode(A.encode(_C)).decode(_C)\n\tC=f\"data:{mime};charset=utf-8;base64,{A}\";D=' '.join(['{k}=\"{v}\"'for(A,B)in meta.items()]);return f'<img src=\"{C}\" {D}></img>'\ndef _identity(value,meta):return value\n_MIME_RENDERERS={_B:html.escape,_F:_identity,_A:lambda value,meta:_render_image(_A,value,meta),_E:lambda value,meta:_render_image(_E,value,meta),_J:_identity,_I:_identity,_D:lambda value,meta:f\"<script>{value}<\\\\/script>\"}\nclass HTML:\n\tdef __init__(A,html):A._html=html\n\tdef _repr_html_(A):return A._html\ndef _eval_formatter(obj,print_method):\n\tB=obj;A=print_method\n\tif A==_H:return repr(B)\n\tif hasattr(B,A):\n\t\tif A==_G:C=io.BytesIO();B.savefig(C,format='png');C.seek(0);return base64.b64encode(C.read()).decode(_C)\n\t\treturn getattr(B,A)()\n\tif A==_K:return{},{}\ndef _format_mime(obj):\n\tC=obj\n\tif isinstance(C,str):return html.escape(C),_B\n\tD=_eval_formatter(C,_K)\n\tif isinstance(D,tuple):E,I=D\n\telse:E=D\n\tA,F=None,[]\n\tfor(H,B)in _MIME_METHODS.items():\n\t\tif B in E:A=E[B]\n\t\telse:A=_eval_formatter(C,H)\n\t\tif is_none(A):continue\n\t\tif B not in _MIME_RENDERERS:F.append(B);continue\n\t\tbreak\n\tif is_none(A):\n\t\tif F:window.console.warn(f\"Rendered object requested unavailable MIME renderers: {F}\")\n\t\tA=repr(A);B=_B\n\telif isinstance(A,tuple):A,G=A\n\telse:G={}\n\treturn _MIME_RENDERERS[B](A,G),B\ndef _write(element,value,append=False):\n\tB=element;C,D=_format_mime(value)\n\tif C=='\\\\n':return\n\tif append:A=document.createElement('div');B.append(A)\n\telse:\n\t\tA=B.lastElementChild\n\t\tif is_none(A):A=B\n\tif D in(_D,_F):E=document.createRange().createContextualFragment(C);A.append(E)\n\telse:A.innerHTML=C\ndef display(*E,target=None,append=True):\n\tD=append;A=target\n\tif is_none(A):A=current_target()\n\telif not isinstance(A,str):C=f\"target must be str or None, not {A.__class__.__name__}\";raise TypeError(C)\n\telif A=='':C='Cannot have an empty target';raise ValueError(C)\n\telif A.startswith('#'):A=A[1:]\n\tB=document.getElementById(A)\n\tif is_none(B):C=f\"Invalid selector with id={A}. Cannot be found in the page.\";raise ValueError(C)\n\tif B.tagName=='SCRIPT'and hasattr(B,'target'):B=B.target\n\tfor F in E:\n\t\tif not D:B.replaceChildren()\n\t\t_write(B,F,append=D)",
|
||||
"events.py": "import asyncio,inspect,sys\nfrom functools import wraps\nfrom pyscript.magic_js import document\nfrom pyscript.ffi import create_proxy\nfrom pyscript.util import is_awaitable\nfrom pyscript import config\nclass Event:\n\tdef __init__(A):A._listeners=[]\n\tdef trigger(C,result):\n\t\tB=result\n\t\tfor A in C._listeners:\n\t\t\tif is_awaitable(A):asyncio.create_task(A(B))\n\t\t\telse:A(B)\n\tdef add_listener(B,listener):\n\t\tA=listener\n\t\tif is_awaitable(A)or callable(A):\n\t\t\tif A not in B._listeners:B._listeners.append(A)\n\t\telse:C='Listener must be callable or awaitable.';raise ValueError(C)\n\tdef remove_listener(A,*B):\n\t\tif B:\n\t\t\tfor C in B:A._listeners.remove(C)\n\t\telse:A._listeners=[]\ndef when(target,*B,**D):\n\tG='handler';C=target;E=None\n\tif B and(callable(B[0])or is_awaitable(B[0])):E=B[0]\n\telif callable(D.get(G))or is_awaitable(D.get(G)):E=D.pop(G)\n\tif isinstance(C,str):\n\t\tA=B[0]if B else D.pop('selector')\n\t\tif not A:I='No selector provided.';raise ValueError(I)\n\t\tfrom pyscript.web import Element as J,ElementCollection as K\n\t\tif isinstance(A,str):F=document.querySelectorAll(A)\n\t\telif isinstance(A,J):F=[A._dom_element]\n\t\telif isinstance(A,K):F=[A._dom_element for A in A]\n\t\telse:F=A if isinstance(A,list)else[A]\n\tdef H(func):\n\t\tE='positional arguments';D='takes';A=func\n\t\tif config['type']=='mpy':\n\t\t\tif is_awaitable(A):\n\t\t\t\tasync def B(*C,**F):\n\t\t\t\t\ttry:return await A(*C,**F)\n\t\t\t\t\texcept TypeError as B:\n\t\t\t\t\t\tif D in str(B)and E in str(B):return await A()\n\t\t\t\t\t\traise\n\t\t\telse:\n\t\t\t\tdef B(*C,**F):\n\t\t\t\t\ttry:return A(*C,**F)\n\t\t\t\t\texcept TypeError as B:\n\t\t\t\t\t\tif D in str(B)and E in str(B):return A()\n\t\t\t\t\t\traise\n\t\telse:\n\t\t\tG=inspect.signature(A)\n\t\t\tif G.parameters:\n\t\t\t\tif is_awaitable(A):\n\t\t\t\t\tasync def B(event):return await A(event)\n\t\t\t\telse:B=A\n\t\t\telif is_awaitable(A):\n\t\t\t\tasync def B(*B,**C):return await A()\n\t\t\telse:\n\t\t\t\tdef B(*B,**C):return A()\n\t\tB=wraps(A)(B)\n\t\tif isinstance(C,Event):C.add_listener(B)\n\t\telif isinstance(C,list)and all(isinstance(A,Event)for A in C):\n\t\t\tfor H in C:H.add_listener(B)\n\t\telse:\n\t\t\tfor I in F:I.addEventListener(C,create_proxy(B))\n\t\treturn B\n\treturn H(E)if E else H",
|
||||
"events.py": "import asyncio,inspect,sys\nfrom functools import wraps\nfrom pyscript.magic_js import document\nfrom pyscript.ffi import create_proxy\nfrom pyscript.util import is_awaitable\nfrom pyscript import config\nclass Event:\n\tdef __init__(A):A._listeners=[]\n\tdef trigger(C,result):\n\t\tB=result\n\t\tfor A in C._listeners:\n\t\t\tif is_awaitable(A):asyncio.create_task(A(B))\n\t\t\telse:A(B)\n\tdef add_listener(B,listener):\n\t\tA=listener\n\t\tif is_awaitable(A)or callable(A):\n\t\t\tif A not in B._listeners:B._listeners.append(A)\n\t\telse:C='Listener must be callable or awaitable.';raise ValueError(C)\n\tdef remove_listener(A,*B):\n\t\tif B:\n\t\t\tfor C in B:A._listeners.remove(C)\n\t\telse:A._listeners=[]\ndef when(target,*B,**D):\n\tG='handler';C=target;E=None\n\tif B and(callable(B[0])or is_awaitable(B[0])):E=B[0]\n\telif callable(D.get(G))or is_awaitable(D.get(G)):E=D.pop(G)\n\tif isinstance(C,str):\n\t\tA=B[0]if B else D.pop('selector')\n\t\tif not A:I='No selector provided.';raise ValueError(I)\n\t\tfrom pyscript.web import Element as J,ElementCollection as K\n\t\tif isinstance(A,str):F=document.querySelectorAll(A)\n\t\telif isinstance(A,J):F=[A._dom_element]\n\t\telif isinstance(A,K):F=[A._dom_element for A in A]\n\t\telse:F=A if isinstance(A,list)else[A]\n\tdef H(func):\n\t\tB=func;D=inspect.signature(B)\n\t\tif D.parameters:\n\t\t\tif is_awaitable(B):\n\t\t\t\tasync def A(event):return await B(event)\n\t\t\telse:A=B\n\t\telif is_awaitable(B):\n\t\t\tasync def A(*A,**C):return await B()\n\t\telse:\n\t\t\tdef A(*A,**C):return B()\n\t\tA=wraps(B)(A)\n\t\tif isinstance(C,Event):C.add_listener(A)\n\t\telif isinstance(C,list)and all(isinstance(A,Event)for A in C):\n\t\t\tfor E in C:E.add_listener(A)\n\t\telse:\n\t\t\tfor G in F:G.addEventListener(C,create_proxy(A))\n\t\treturn A\n\treturn H(E)if E else H",
|
||||
"fetch.py": "import json,js\nfrom pyscript.util import as_bytearray\nclass _Response:\n\tdef __init__(A,response):A._response=response\n\tdef __getattr__(A,attr):return getattr(A._response,attr)\n\tasync def arrayBuffer(B):\n\t\tA=await B._response.arrayBuffer()\n\t\tif hasattr(A,'to_py'):return A.to_py()\n\t\treturn memoryview(as_bytearray(A))\n\tasync def blob(A):return await A._response.blob()\n\tasync def bytearray(A):B=await A._response.arrayBuffer();return as_bytearray(B)\n\tasync def json(A):return json.loads(await A.text())\n\tasync def text(A):return await A._response.text()\nclass _DirectResponse:\n\t@staticmethod\n\tdef setup(promise,response):A=promise;A._response=_Response(response);return A._response\n\tdef __init__(B,promise):A=promise;B._promise=A;A._response=None;A.arrayBuffer=B.arrayBuffer;A.blob=B.blob;A.bytearray=B.bytearray;A.json=B.json;A.text=B.text\n\tasync def _response(A):\n\t\tif not A._promise._response:await A._promise\n\t\treturn A._promise._response\n\tasync def arrayBuffer(A):B=await A._response();return await B.arrayBuffer()\n\tasync def blob(A):B=await A._response();return await B.blob()\n\tasync def bytearray(A):B=await A._response();return await B.bytearray()\n\tasync def json(A):B=await A._response();return await B.json()\n\tasync def text(A):B=await A._response();return await B.text()\ndef fetch(url,**B):C=js.JSON.parse(json.dumps(B));D=lambda response,*B:_DirectResponse.setup(A,response);A=js.fetch(url,C).then(D);_DirectResponse(A);return A",
|
||||
"ffi.py": "try:\n\timport js;from pyodide.ffi import create_proxy as _cp,to_js as _py_tjs,jsnull;from_entries=js.Object.fromEntries;is_none=lambda value:value is None or value is jsnull\n\tdef _tjs(value,**A):\n\t\tB='dict_converter'\n\t\tif not hasattr(A,B):A[B]=from_entries\n\t\treturn _py_tjs(value,**A)\nexcept:from jsffi import create_proxy as _cp;from jsffi import to_js as _tjs;import js;jsnull=js.Object.getPrototypeOf(js.Object.prototype);is_none=lambda value:value is None or value is jsnull\ncreate_proxy=_cp\nto_js=_tjs\ntry:\n\tfrom polyscript import ffi as _ffi;direct=_ffi.direct;gather=_ffi.gather;query=_ffi.query\n\tdef assign(source,*B):\n\t\tA=source\n\t\tfor C in B:_ffi.assign(A,to_js(C))\n\t\treturn A\nexcept:\n\timport js;_assign=js.Object.assign;direct=lambda source:source\n\tdef assign(source,*B):\n\t\tA=source\n\t\tfor C in B:_assign(A,to_js(C))\n\t\treturn A",
|
||||
"flatted.py": "import json as _json\nclass _Known:\n\tdef __init__(A):A.key=[];A.value=[]\nclass _String:\n\tdef __init__(A,value):A.value=value\ndef _array_keys(value):\n\tA=[];B=0\n\tfor C in value:A.append(B);B+=1\n\treturn A\ndef _object_keys(value):\n\tA=[]\n\tfor B in value:A.append(B)\n\treturn A\ndef _is_array(value):return isinstance(value,(list,tuple))\ndef _is_object(value):return isinstance(value,dict)\ndef _is_string(value):return isinstance(value,str)\ndef _index(known,input,value):B=value;A=known;input.append(B);C=str(len(input)-1);A.key.append(B);A.value.append(C);return C\ndef _loop(keys,input,known,output):\n\tA=output\n\tfor B in keys:\n\t\tC=A[B]\n\t\tif isinstance(C,_String):_ref(B,input[int(C.value)],input,known,A)\n\treturn A\ndef _ref(key,value,input,known,output):\n\tB=known;A=value\n\tif _is_array(A)and A not in B:B.append(A);A=_loop(_array_keys(A),input,B,A)\n\telif _is_object(A)and A not in B:B.append(A);A=_loop(_object_keys(A),input,B,A)\n\toutput[key]=A\ndef _relate(known,input,value):\n\tB=known;A=value\n\tif _is_string(A)or _is_array(A)or _is_object(A):\n\t\ttry:return B.value[B.key.index(A)]\n\t\texcept:return _index(B,input,A)\n\treturn A\ndef _transform(known,input,value):\n\tB=known;A=value\n\tif _is_array(A):\n\t\tC=[]\n\t\tfor F in A:C.append(_relate(B,input,F))\n\t\treturn C\n\tif _is_object(A):\n\t\tD={}\n\t\tfor E in A:D[E]=_relate(B,input,A[E])\n\t\treturn D\n\treturn A\ndef _wrap(value):\n\tA=value\n\tif _is_string(A):return _String(A)\n\tif _is_array(A):\n\t\tB=0\n\t\tfor D in A:A[B]=_wrap(D);B+=1\n\telif _is_object(A):\n\t\tfor C in A:A[C]=_wrap(A[C])\n\treturn A\ndef parse(value,*C,**D):\n\tA=value;E=_json.loads(A,*C,**D);B=[]\n\tfor A in E:B.append(_wrap(A))\n\tinput=[]\n\tfor A in B:\n\t\tif isinstance(A,_String):input.append(A.value)\n\t\telse:input.append(A)\n\tA=input[0]\n\tif _is_array(A):return _loop(_array_keys(A),input,[A],A)\n\tif _is_object(A):return _loop(_object_keys(A),input,[A],A)\n\treturn A\ndef stringify(value,*D,**E):\n\tB=_Known();input=[];C=[];A=int(_index(B,input,value))\n\twhile A<len(input):C.append(_transform(B,input,input[A]));A+=1\n\treturn _json.dumps(C,*D,**E)",
|
||||
|
||||
@@ -92,60 +92,26 @@ def when(target, *args, **kwargs):
|
||||
elements = selector if isinstance(selector, list) else [selector]
|
||||
|
||||
def decorator(func):
|
||||
if config["type"] == "mpy": # Is MicroPython?
|
||||
sig = inspect.signature(func)
|
||||
if sig.parameters:
|
||||
if is_awaitable(func):
|
||||
|
||||
async def wrapper(event):
|
||||
return await func(event)
|
||||
|
||||
else:
|
||||
wrapper = func
|
||||
else:
|
||||
# Function doesn't receive events.
|
||||
if is_awaitable(func):
|
||||
|
||||
async def wrapper(*args, **kwargs):
|
||||
"""
|
||||
This is a very ugly hack to get micropython working because
|
||||
`inspect.signature` doesn't exist. It may be actually better
|
||||
to not try any magic for now and raise the error.
|
||||
"""
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
except TypeError as e:
|
||||
if "takes" in str(e) and "positional arguments" in str(e):
|
||||
return await func()
|
||||
raise
|
||||
return await func()
|
||||
|
||||
else:
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
"""
|
||||
This is a very ugly hack to get micropython working because
|
||||
`inspect.signature` doesn't exist. It may be actually better
|
||||
to not try any magic for now and raise the error.
|
||||
"""
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
|
||||
except TypeError as e:
|
||||
if "takes" in str(e) and "positional arguments" in str(e):
|
||||
return func()
|
||||
raise
|
||||
|
||||
else:
|
||||
sig = inspect.signature(func)
|
||||
if sig.parameters:
|
||||
if is_awaitable(func):
|
||||
|
||||
async def wrapper(event):
|
||||
return await func(event)
|
||||
|
||||
else:
|
||||
wrapper = func
|
||||
else:
|
||||
# Function doesn't receive events.
|
||||
if is_awaitable(func):
|
||||
|
||||
async def wrapper(*args, **kwargs):
|
||||
return await func()
|
||||
|
||||
else:
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
return func()
|
||||
return func()
|
||||
|
||||
wrapper = wraps(func)(wrapper)
|
||||
if isinstance(target, Event):
|
||||
|
||||
@@ -358,3 +358,79 @@ def test_when_called_with_an_event_and_handler():
|
||||
# The function should have been called when the whenable object was
|
||||
# triggered.
|
||||
assert counter == 1
|
||||
|
||||
def test_when_on_different_callables():
|
||||
"""
|
||||
The when function works with:
|
||||
|
||||
* Synchronous functions
|
||||
* Asynchronous functions
|
||||
* Inner functions
|
||||
* Async inner functions
|
||||
* Closure functions
|
||||
* Async closure functions
|
||||
"""
|
||||
|
||||
def func(x=1):
|
||||
# A simple function.
|
||||
return x
|
||||
|
||||
async def a_func(x=1):
|
||||
# A simple async function.
|
||||
return x
|
||||
|
||||
def make_inner_func():
|
||||
# Creates a simple inner function.
|
||||
|
||||
def inner_func(x=1):
|
||||
return x
|
||||
|
||||
return inner_func
|
||||
|
||||
|
||||
def make_inner_a_func():
|
||||
# Creates a simple async inner function.
|
||||
|
||||
async def inner_a_func(x=1):
|
||||
return x
|
||||
|
||||
return inner_a_func
|
||||
|
||||
|
||||
def make_closure():
|
||||
# Creates a simple closure function.
|
||||
a = 1
|
||||
|
||||
def closure_func(x=1):
|
||||
return a + x
|
||||
|
||||
return closure_func
|
||||
|
||||
|
||||
def make_a_closure():
|
||||
# Creates a simple async closure function.
|
||||
a = 1
|
||||
|
||||
async def closure_a_func(x=1):
|
||||
return a + x
|
||||
|
||||
return closure_a_func
|
||||
|
||||
|
||||
inner_func = make_inner_func()
|
||||
inner_a_func = make_inner_a_func()
|
||||
cl_func = make_closure()
|
||||
cl_a_func = make_a_closure()
|
||||
|
||||
|
||||
whenable = Event()
|
||||
|
||||
# Each of these should work with the when function.
|
||||
when(whenable, func)
|
||||
when(whenable, a_func)
|
||||
when(whenable, inner_func)
|
||||
when(whenable, inner_a_func)
|
||||
when(whenable, cl_func)
|
||||
when(whenable, cl_a_func)
|
||||
# If we get here, everything worked.
|
||||
assert True
|
||||
|
||||
Reference in New Issue
Block a user