flowchart LR C(("Client")) --> S((Server)) style C fill:#009E73, color:white style S fill:#0072B2, color:white
Python
Getting Started
Install rembus
with pip
or uv
:
pip install rembus
uv add rembus
Then create a component object to interact with the others components:
# sync API
= rembus.node()
rb
# async API
= await rembus.component() rb
The rb
object provides methods for exposing functions implementation, subscribing to topics, publishing messages and requesting services:
rpc
publish
expose
subscribe
RPC
In this example a RPC request is made for the service stats
which is implemented in a Julia
S
server:
# 🔵 stats.jl
using Statistics
using Rembus
# Return a stat summary of dataframes
# values from the value column.
function stats(df)
return Dict(
"min" => min(df.value...),
"max" => max(df.value...),
"mean" => mean(df.value),
"std" => std(df.value)
)end
= component()
rb
expose(rb, stats)
println("up and running")
wait(rb)
The python
C
client creates a pandas DataFrame and requests the stats
service:
# 🟢 client.py
import rembus
from random import random
import pandas as pd
= 1000000
nrows = pd.DataFrame({
df "name": [f"kpi_{i}" for i in range(1,nrows)],
"ts": range(1,nrows),
"value": [random() for i in range(1,nrows)]
})
= rembus.node()
rb = rb.rpc("stats", df) summary
The sequence of steps is:
- On the
C
client side a python pandas dataframe is created and used as argument of the
rpc
method; - On the
S
server side the
stats
method is called with a Julia DataFrame as argument; - On the
S
server side the return value of
stats
is a Julia Dictionary; - On the C client side the response is a Python dictionary.
Publish-Subscribe
flowchart LR C(("Publish")) --> S1((Sub-1)) C(("Publish")) --> S2((Sub-2)) style C fill:#009E73, color:white style S1 fill:#0072B2, color:white style S2 fill:#0072B2, color:white
# 🔵 sub-1.py
import rembus
def mytopic(data):
"[Sub-1] mytopic:$data")
println(
end
= await component(name="sub-1")
rb await subscribe(rb, mytopic)
await wait(rb)
# 🔵 sub-2.jl
import rembus
def mytopic(data):
print(f"[Sub-2] DB update: {data}")
= rembus.node(name="sub-2")
rb
rb.subscribe(mytopic) rb.wait()
# 🟢 publish.jl
using Rembus
= component([
rb "ws://:3001/client",
"ws://:3002/client"
])
= Dict(
sensor1 "T" => 18.3, "H" => 45.2
)= Dict(
sensor2 "P" => 2.3
)
publish(
rb,"mytopic",
Dict(
"sensor#1" => sensor1,
"sensor#2" => sensor2,
) )