A persistent IndexedDB store for PyScript (#2101)

A persistent IndexedDB store for PyScript
This commit is contained in:
Andrea Giammarchi
2024-06-19 14:11:57 +02:00
committed by GitHub
parent aab015b9b8
commit d0406be84c
9 changed files with 489 additions and 167 deletions

View File

@@ -43,6 +43,7 @@ from pyscript.magic_js import (
sync,
window,
)
from pyscript.storage import Storage, storage
from pyscript.websocket import WebSocket
try:

View File

@@ -0,0 +1,148 @@
# https://www.npmjs.com/package/flatted
import json as _json
class _Known:
def __init__(self):
self.key = []
self.value = []
class _String:
def __init__(self, value):
self.value = value
def _array_keys(value):
keys = []
i = 0
for _ in value:
keys.append(i)
i += 1
return keys
def _object_keys(value):
keys = []
for key in value:
keys.append(key)
return keys
def _is_array(value):
return isinstance(value, list) or isinstance(value, tuple)
def _is_object(value):
return isinstance(value, dict)
def _is_string(value):
return isinstance(value, str)
def _index(known, input, value):
input.append(value)
index = str(len(input) - 1)
known.key.append(value)
known.value.append(index)
return index
def _loop(keys, input, known, output):
for key in keys:
value = output[key]
if isinstance(value, _String):
_ref(key, input[int(value.value)], input, known, output)
return output
def _ref(key, value, input, known, output):
if _is_array(value) and not value in known:
known.append(value)
value = _loop(_array_keys(value), input, known, value)
elif _is_object(value) and not value in known:
known.append(value)
value = _loop(_object_keys(value), input, known, value)
output[key] = value
def _relate(known, input, value):
if _is_string(value) or _is_array(value) or _is_object(value):
try:
return known.value[known.key.index(value)]
except:
return _index(known, input, value)
return value
def _transform(known, input, value):
if _is_array(value):
output = []
for val in value:
output.append(_relate(known, input, val))
return output
if _is_object(value):
obj = {}
for key in value:
obj[key] = _relate(known, input, value[key])
return obj
return value
def _wrap(value):
if _is_string(value):
return _String(value)
if _is_array(value):
i = 0
for val in value:
value[i] = _wrap(val)
i += 1
elif _is_object(value):
for key in value:
value[key] = _wrap(value[key])
return value
def parse(value, *args, **kwargs):
json = _json.loads(value, *args, **kwargs)
wrapped = []
for value in json:
wrapped.append(_wrap(value))
input = []
for value in wrapped:
if isinstance(value, _String):
input.append(value.value)
else:
input.append(value)
value = input[0]
if _is_array(value):
return _loop(_array_keys(value), input, [value], value)
if _is_object(value):
return _loop(_object_keys(value), input, [value], value)
return value
def stringify(value, *args, **kwargs):
known = _Known()
input = []
output = []
i = int(_index(known, input, value))
while i < len(input):
output.append(_transform(known, input, input[i]))
i += 1
return _json.dumps(output, *args, **kwargs)

View File

@@ -0,0 +1,60 @@
from polyscript import storage as _storage
from pyscript.flatted import parse as _parse
from pyscript.flatted import stringify as _stringify
# convert a Python value into an IndexedDB compatible entry
def _to_idb(value):
if value is None:
return _stringify(["null", 0])
if isinstance(value, (bool, float, int, str, list, dict, tuple)):
return _stringify(["generic", value])
if isinstance(value, bytearray):
return _stringify(["bytearray", [v for v in value]])
if isinstance(value, memoryview):
return _stringify(["memoryview", [v for v in value]])
raise TypeError(f"Unexpected value: {value}")
# convert an IndexedDB compatible entry into a Python value
def _from_idb(value):
(
kind,
result,
) = _parse(value)
if kind == "null":
return None
if kind == "generic":
return result
if kind == "bytearray":
return bytearray(result)
if kind == "memoryview":
return memoryview(bytearray(result))
return value
class Storage(dict):
def __init__(self, store):
super().__init__({k: _from_idb(v) for k, v in store.entries()})
self.__store__ = store
def __delitem__(self, attr):
self.__store__.delete(attr)
super().__delitem__(attr)
def __setitem__(self, attr, value):
self.__store__.set(attr, _to_idb(value))
super().__setitem__(attr, value)
def clear(self):
self.__store__.clear()
super().clear()
async def sync(self):
await self.__store__.sync()
async def storage(name="", storage_class=Storage):
if not name:
raise ValueError("The storage name must be defined")
return storage_class(await _storage(f"@pyscript/{name}"))