from sveltish.stores import writableSveltish
Svelte Stores are one of the secret weapons of the Svelte framework (the recently voted most loved web framework).
Stores allow easy reactive programming by presenting an Observer pattern that is as simple as necessary, but not simpler.
Install
pip install sveltishHow to use
Sometimes, you’ll have values that need to be accessed by multiple unrelated objects.
For that, you can use stores. It is a very simple implementation (around 100 lines of code) of the Observer/Observable pattern.
A store is simply an object with a subscribe method that allows interested parties to be notified when its value changes.
Writable Stores
count = writable(0)
history = [] # logging for testing
# subscribe returns an unsubscriber
def record(x):
history.append(x)
print(history)
stop = count.subscribe(record)
test_eq(history, [0])[0]
We just created a count store. Its value can be accessed via a callback we pass in the count.subscribe method:
A Writable can be set from the outside. When it happens, all its subscribers will react.
def increment(): count.update(lambda x: x + 1)
def decrement(): count.update(lambda x: x - 1)
def reset(): count.set(0)
count.set(3)
increment()
decrement()
decrement()
reset()
count.set(42)
test_eq(history, [0, 3, 4, 3, 2, 0, 42])[0, 3]
[0, 3, 4]
[0, 3, 4, 3]
[0, 3, 4, 3, 2]
[0, 3, 4, 3, 2, 0]
[0, 3, 4, 3, 2, 0, 42]
The unsubscriber, in this example the stop function, stops the notifications to the subscriber.
stop()
reset()
count.set(22)
test_eq(history, [0, 3, 4, 3, 2, 0, 42])
countw<0> $int: 22
Notice that you can still change the store but there was no print message this time. There was no observer listening.
When we subscribe new callbacks, they will be promptly informed of the current state of the store.
stop = count.subscribe(lambda x: print(f"Count is now {x}"))
stop2 = count.subscribe(lambda x: print(f"double of count is {2*x}"))Count is now 22
double of count is 44
reset()Count is now 0
double of count is 0
stop()
stop2()You can create an empty Writable Store.
store = writable()
history = []
unsubscribe = store.subscribe(lambda x: history.append(x))
unsubscribe()
test_eq(history, [None])If you try to unsubscribe twice, it won’t break. It just does nothing the second time… and in the third time… and…
unsubscribe(), unsubscribe(), unsubscribe()(None, None, None)
Stores assume mutable objects.
In Python everythong is an object. Here we are calling an object something that is not a primitive (eg. int, bool, etc)
class Bunch:
__init__ = lambda self, **kw: setattr(self, '__dict__', kw)
obj = Bunch()
called = 0
store = writable(obj)
def callback(x):
global called
called += 1
stop = store.subscribe(callback)test_eq(called, 1)
obj.a = 1 #type: ignore
store.set(obj)
test_eq(called, 2)Readable Stores
However… It is clear that not all stores should be writable by whoever has a reference to them. Many times you want a single publisher of change in store that is only consumed (subscribed) by many other objects. For those cases, we have readable stores.
The Publisher Subscriber (PubSub) pattern is a variant of the Observable/Observer pattern.
from sveltish.stores import readableA Readable store without a start function is a constant value and has no meaning for us. Therefore, start is a required argument.
try:
c = readable(0) # shoud fail
except Exception as error:
print(error)
test_fail(lambda: readable(0))readable() missing 1 required positional argument: 'start'
class Publisher:
def __init__(self): self.set = lambda x: None
def set_set(self, set):
self.set = set
return lambda: None
def use_set(self, value): self.set(value)p = Publisher()
reader = readable(0, p.set_set)
readerr<0> $int: 0
Ths store only starts updating after the first subscriber. Here, the publisher does not change the store.
p.use_set(1), reader(None, r<0> $int: 0)
stop = reader.subscribe(lambda x: print(f"reader is now {x}"))reader is now 0
p.use_set(2)reader is now 2
stop()Another example of Readable Store usage:
from threading import Event, Thread
import timedef start(set): # the start function is the publisher
stopped = Event()
def loop(): # needs to be in a separate thread
while not stopped.wait(1): # in seconds
set(time.localtime())
Thread(target=loop).start()
return stopped.setnow = readable(time.localtime(), start)
nowr<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
The loop needs to be in its own thread, otherwise the function would never return and we would wait forever.
While there is no subscriber, the Readable will not be updated.
nowr<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=41, tm_wday=2, tm_yday=67, tm_isdst=0)
OhPleaseStop = now.subscribe(lambda x: print(time.strftime(f"%H:%M:%S", x), end="\r"))22:12:41
time.sleep(2)
OhPleaseStop()22:12:43
The Svelte Store api allow you to create a Readable Store without a Notifier. See discussion here.
Derived Stores
A Derived Store stores a value based on the value of another store.
from sveltish.stores import derivedFor example:
count = writable(1)
stopCount = count.subscribe(lambda x: print(f"count is {x}"))
double = derived(count, lambda x: x * 2)
stopDouble = double.subscribe(lambda x: print(f"double is {x}"))
test_eq(double.get(), 2*count.get())count is 1
double is 2
count.set(2)
test_eq(double.get(), 4)count is 2
double is 4
stopCount(), stopDouble()(None, None)
Building on our previous example, we can create a store that derives the elapsed time since the original store was started.
elapsing = None
def calc_elapsed(now):
global elapsing
if not elapsing:
elapsing = now
return time.mktime(now) - time.mktime(elapsing)nowr<0> $struct_time: time.struct_time(tm_year=2023, tm_mon=3, tm_mday=8, tm_hour=22, tm_min=12, tm_sec=43, tm_wday=2, tm_yday=67, tm_isdst=0)
elapsed = derived(now, lambda x: calc_elapsed(x))
elapsedr<0> $float: 0.0
stopElapsed = elapsed.subscribe(lambda x: print(f"Elapsed time of source store: {x} seconds.", end="\r"))Elapsed time of source store: 0.0 seconds.
time.sleep(1)
stopElapsed()Elapsed time of source store: 2.0 seconds.
Derived stores allow us to transform the value of a store. In RxPy they are called operators. You can build several operators like: filter, fold, map, zip…
Let’s build a custom filter operator:
user = writable({"name": "John", "age": 32})
stopLog = user.subscribe(lambda x: print(f"User: {x}"))User: {'name': 'John', 'age': 32}
name = derived(user, lambda x: x["name"])
stopName = name.subscribe(lambda x: print(f"Name: {x}"))Name: John
user.update(lambda x: x | {"age": 45})User: {'name': 'John', 'age': 45}
Updating the age does not trigger the name subscriber. Let’s see what happens when we update the name.
user.update(lambda x: x | {"name": "Fred"})Name: Fred
User: {'name': 'Fred', 'age': 45}
Only changes to the name of the user triggers the name subscriber.
stopName(), stopLog()(None, None)
Another cool thing about Derived Stores is that you can derive from a list of stores. Let’s build a zip operator.
a = writable([1,2,3,4])
b = writable([5,6,7,8])
a,b(w<0> $list: [1, 2, 3, 4], w<0> $list: [5, 6, 7, 8])
zipper = derived([a,b], lambda a,b: list(zip(a,b)))test_eq(zipper.get(), [(1, 5), (2, 6), (3, 7), (4, 8)])While zipper has no subscribers, it keeps the initial value, it is stopped.
a.set([4,3,2,1])
test_eq(zipper.get(), [(1, 5), (2, 6), (3, 7), (4, 8)])A subscription starts zipper and it will start to react to the changes of the stores.
u = zipper.subscribe(lambda x: None)
test_eq(zipper.get(), [(4, 5), (3, 6), (2, 7), (1, 8)])b.set([8,7,6,5])
test_eq(zipper.get(), [(4, 8), (3, 7), (2, 6), (1, 5)])u()Store composition with pipes
writable(1).pipe(lambda x: x + 1).pipe(lambda x: x * 2)r<0> $int: 4
writable(1).pipe(lambda x: x+1, lambda x: x*2)r<0> $int: 4
writable(1) | (lambda x: x+1) | (lambda x: x*2)r<0> $int: 4
a = writable(1)
u5 = (a
| (lambda x: x*2)
| (lambda x: x*2)
| (lambda x: x*2)).subscribe(lambda x: print(f"u5: {x}"))u5: 8
a.set(2)u5: 16
u5()Missing features
You may have noticed that along the way we had always to subscribe and then had to remember to unsubscribe when we were done. This is a bit of a nuisance. Svelte has a compiler that provide some syntatic sugar to make this easier. They call it auto-subscriptions.
Sveltish does not have auto-subscriptions yet. But if you have a nice idea how to implement it, please let me know.