from sveltish.stores import writable
Sveltish
Svelte Stores
are one of the secret weapons of the Svelte framework (the recently voted most loved web framework).
Stores allow easy reactive programming by presenting an Observer pattern that is as simple as necessary, but not simpler.
Install
pip install sveltish
How to use
Sometimes, you’ll have values that need to be accessed by multiple unrelated objects.
For that, you can use stores
. It is a very simple implementation (around 100 lines of code) of the Observer/Observable pattern.
A store is simply an object with a subscribe
method that allows interested parties to be notified when its value changes.
Writable Stores
= writable(0)
count = [] # logging for testing
history # subscribe returns an unsubscriber
def record(x):
history.append(x)print(history)
= count.subscribe(record)
stop
0]) test_eq(history, [
[0]
We just created a count
store. Its value can be accessed via a callback
we pass in the count.subscribe
method:
A Writable can be set from the outside. When it happens, all its subscribers will react.
def increment(): count.update(lambda x: x + 1)
def decrement(): count.update(lambda x: x - 1)
def reset(): count.set(0)
set(3)
count.
increment()
decrement()
decrement()
reset()set(42)
count.
0, 3, 4, 3, 2, 0, 42]) test_eq(history, [
[0, 3]
[0, 3, 4]
[0, 3, 4, 3]
[0, 3, 4, 3, 2]
[0, 3, 4, 3, 2, 0]
[0, 3, 4, 3, 2, 0, 42]
The unsubscriber
, in this example the stop
function, stops the notifications to the subscriber
.
stop()
reset()set(22)
count.0, 3, 4, 3, 2, 0, 42])
test_eq(history, [ count
w<0> $int: 22
Notice that you can still change the store
but there was no print message this time. There was no observer listening.
When we subscribe new callbacks, they will be promptly informed of the current state of the store
.
= count.subscribe(lambda x: print(f"Count is now {x}"))
stop = count.subscribe(lambda x: print(f"double of count is {2*x}")) stop2
Count is now 22
double of count is 44
reset()
Count is now 0
double of count is 0
stop() stop2()
You can create an empty Writable Store
.
= writable()
store = []
history = store.subscribe(lambda x: history.append(x))
unsubscribe
unsubscribe()None]) test_eq(history, [
If you try to unsubscribe twice, it won’t break. It just does nothing the second time… and in the third time… and…
unsubscribe(), unsubscribe(), unsubscribe()
(None, None, None)
Stores assume mutable objects.
class Bunch:
__init__ = lambda self, **kw: setattr(self, '__dict__', kw)
= Bunch()
obj = 0
called = writable(obj)
store def callback(x):
global called
+= 1
called = store.subscribe(callback) stop
1)
test_eq(called, = 1 #type: ignore
obj.a set(obj)
store.2) test_eq(called,
Readable Stores
However… It is clear that not all stores should be writable by whoever has a reference to them. Many times you want a single publisher
of change in store that is only consumed (subscribed
) by many other objects. For those cases, we have readable stores.
from sveltish.stores import readable
A Readable store without a start
function is a constant value and has no meaning for us. Therefore, start
is a required argument.
try:
= readable(0) # shoud fail
c except Exception as error:
print(error)
lambda: readable(0)) test_fail(
readable() missing 1 required positional argument: 'start'
class Publisher:
def __init__(self): self.set = lambda x: None
def set_set(self, set):
self.set = set
return lambda: None
def use_set(self, value): self.set(value)
= Publisher()
p = readable(0, p.set_set)
reader reader
r<0> $int: 0
Ths store only starts updating after the first subscriber. Here, the publisher does not change the store.
1), reader p.use_set(
(None, r<0> $int: 0)
= reader.subscribe(lambda x: print(f"reader is now {x}")) stop
reader is now 0
2) p.use_set(
reader is now 2
stop()
Another example of Readable Store usage:
from threading import Event, Thread
import time
def start(set): # the start function is the publisher
= Event()
stopped def loop(): # needs to be in a separate thread
while not stopped.wait(1): # in seconds
set(time.localtime())
=loop).start()
Thread(targetreturn stopped.set
= readable(time.localtime(), start)
now now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
While there is no subscriber, the Readable will not be updated.
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
= now.subscribe(lambda x: print(time.strftime(f"%H:%M:%S", x), end="\r")) OhPleaseStop
22:12:41
2)
time.sleep( OhPleaseStop()
22:12:43
Derived Stores
A Derived Store
stores a value based on the value of another store.
from sveltish.stores import derived
For example:
= writable(1)
count = count.subscribe(lambda x: print(f"count is {x}"))
stopCount = derived(count, lambda x: x * 2)
double = double.subscribe(lambda x: print(f"double is {x}"))
stopDouble 2*count.get()) test_eq(double.get(),
count is 1
double is 2
set(2)
count.4) test_eq(double.get(),
count is 2
double is 4
stopCount(), stopDouble()
(None, None)
Building on our previous example, we can create a store that derives the elapsed time since the original store was started.
= None
elapsing def calc_elapsed(now):
global elapsing
if not elapsing:
= now
elapsing return time.mktime(now) - time.mktime(elapsing)
now
r<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=43, tm_wday=2, tm_yday=67, tm_isdst=0)
= derived(now, lambda x: calc_elapsed(x))
elapsed elapsed
r<0> $float: 0.0
= elapsed.subscribe(lambda x: print(f"Elapsed time of source store: {x} seconds.", end="\r")) stopElapsed
Elapsed time of source store: 0.0 seconds.
1)
time.sleep( stopElapsed()
Elapsed time of source store: 2.0 seconds.
Derived stores allow us to transform the value of a store. In RxPy they are called operators
. You can build several operators like: filter
, fold
, map
, zip
…
Let’s build a custom filter
operator:
= writable({"name": "John", "age": 32})
user = user.subscribe(lambda x: print(f"User: {x}")) stopLog
User: {'name': 'John', 'age': 32}
= derived(user, lambda x: x["name"])
name = name.subscribe(lambda x: print(f"Name: {x}")) stopName
Name: John
lambda x: x | {"age": 45}) user.update(
User: {'name': 'John', 'age': 45}
Updating the age does not trigger the name subscriber
. Let’s see what happens when we update the name.
lambda x: x | {"name": "Fred"}) user.update(
Name: Fred
User: {'name': 'Fred', 'age': 45}
Only changes to the name of the user triggers the name
subscriber.
stopName(), stopLog()
(None, None)
Another cool thing about Derived Stores is that you can derive from a list of stores. Let’s build a zip
operator.
= writable([1,2,3,4])
a = writable([5,6,7,8])
b a,b
(w<0> $list: [1, 2, 3, 4], w<0> $list: [5, 6, 7, 8])
= derived([a,b], lambda a,b: list(zip(a,b))) zipper
1, 5), (2, 6), (3, 7), (4, 8)]) test_eq(zipper.get(), [(
While zipper
has no subscribers, it keeps the initial value, it is stopped
.
set([4,3,2,1])
a.1, 5), (2, 6), (3, 7), (4, 8)]) test_eq(zipper.get(), [(
A subscription starts
zipper and it will start to react to the changes of the stores.
= zipper.subscribe(lambda x: None)
u 4, 5), (3, 6), (2, 7), (1, 8)]) test_eq(zipper.get(), [(
set([8,7,6,5])
b.4, 8), (3, 7), (2, 6), (1, 5)]) test_eq(zipper.get(), [(
u()
Store composition with pipes
1).pipe(lambda x: x + 1).pipe(lambda x: x * 2) writable(
r<0> $int: 4
1).pipe(lambda x: x+1, lambda x: x*2) writable(
r<0> $int: 4
1) | (lambda x: x+1) | (lambda x: x*2) writable(
r<0> $int: 4
= writable(1)
a = (a
u5 | (lambda x: x*2)
| (lambda x: x*2)
| (lambda x: x*2)).subscribe(lambda x: print(f"u5: {x}"))
u5: 8
set(2) a.
u5: 16
u5()
Missing features
You may have noticed that along the way we had always to subscribe and then had to remember to unsubscribe when we were done. This is a bit of a nuisance. Svelte has a compiler that provide some syntatic sugar to make this easier. They call it auto-subscriptions
.
Sveltish
does not have auto-subscriptions
yet. But if you have a nice idea how to implement it, please let me know.