flowchart LR
C(("Client")) --> S((Server))
style C fill:#009E73, color:white
style S fill:#0072B2, color:white
Python
Getting Started
Install rembus with pip or uv:
pip install rembusuv add rembusThen create a component object to interact with the others components:
# sync API
rb = rembus.node()
# async API
rb = await rembus.component()The rb object provides methods for exposing functions implementation, subscribing to topics, publishing messages and requesting services:
rpcpublishexposesubscribe
RPC
In this example a RPC request is made for the service stats which is implemented in a Julia
S
server:
# 🔵 stats.jl
using Statistics
using Rembus
# Return a stat summary of dataframes
# values from the value column.
function stats(df)
return Dict(
"min" => min(df.value...),
"max" => max(df.value...),
"mean" => mean(df.value),
"std" => std(df.value)
)
end
rb = component()
expose(rb, stats)
println("up and running")
wait(rb)The python
C
client creates a pandas DataFrame and requests the stats service:
# 🟢 client.py
import rembus
from random import random
import pandas as pd
nrows = 1000000
df = pd.DataFrame({
"name": [f"kpi_{i}" for i in range(1,nrows)],
"ts": range(1,nrows),
"value": [random() for i in range(1,nrows)]
})
rb = rembus.node()
summary = rb.rpc("stats", df)The sequence of steps is:
- On the
C
client side a python pandas dataframe is created and used as argument of the
rpcmethod; - On the
S
server side the
statsmethod is called with a Julia DataFrame as argument; - On the
S
server side the return value of
statsis a Julia Dictionary; - On the C client side the response is a Python dictionary.
Publish-Subscribe
flowchart LR
C(("Publish")) --> S1((Sub-1))
C(("Publish")) --> S2((Sub-2))
style C fill:#009E73, color:white
style S1 fill:#0072B2, color:white
style S2 fill:#0072B2, color:white
# 🔵 sub-1.py
import rembus
def mytopic(data):
println("[Sub-1] mytopic:$data")
end
rb = await component(name="sub-1")
await subscribe(rb, mytopic)
await wait(rb)# 🔵 sub-2.jl
import rembus
def mytopic(data):
print(f"[Sub-2] DB update: {data}")
rb = rembus.node(name="sub-2")
rb.subscribe(mytopic)
rb.wait()# 🟢 publish.jl
using Rembus
rb = component([
"ws://:3001/client",
"ws://:3002/client"
])
sensor1 = Dict(
"T" => 18.3, "H" => 45.2
)
sensor2 = Dict(
"P" => 2.3
)
publish(
rb,
"mytopic",
Dict(
"sensor#1" => sensor1,
"sensor#2" => sensor2,
)
)