Python

Getting Started

Install rembus with pip or uv:

pip install rembus
uv add rembus

Then create a component object to interact with the others components:

# sync API
rb = rembus.node()

# async API
rb = await rembus.component()

The rb object provides methods for exposing functions implementation, subscribing to topics, publishing messages and requesting services:

  • rpc
  • publish
  • expose
  • subscribe

RPC

flowchart LR
  C(("Client")) --> S((Server))
  style C fill:#009E73, color:white
  style S fill:#0072B2, color:white

In this example a RPC request is made for the service stats which is implemented in a Julia S server:

# 🔵 stats.jl
using Statistics
using Rembus

# Return a stat summary of dataframes
# values from the value column.
function stats(df)
    return Dict(
        "min" => min(df.value...),
        "max" => max(df.value...),
        "mean" => mean(df.value),
        "std" => std(df.value)
    )
end

rb = component()

expose(rb, stats)

println("up and running")
wait(rb)

The python C client creates a pandas DataFrame and requests the stats service:

# 🟢 client.py
import rembus
from random import random
import pandas as pd

nrows = 1000000 
df = pd.DataFrame({
    "name": [f"kpi_{i}" for i in range(1,nrows)],
    "ts": range(1,nrows),
    "value": [random() for i in range(1,nrows)]
})

rb = rembus.node()
summary = rb.rpc("stats", df)

The sequence of steps is:

  • On the C client side a python pandas dataframe is created and used as argument of the rpc method;
  • On the S server side the stats method is called with a Julia DataFrame as argument;
  • On the S server side the return value of stats is a Julia Dictionary;
  • On the C client side the response is a Python dictionary.

Publish-Subscribe

flowchart LR
  C(("Publish")) --> S1((Sub-1))
  C(("Publish")) --> S2((Sub-2))
  style C fill:#009E73, color:white
  style S1 fill:#0072B2, color:white
  style S2 fill:#0072B2, color:white

# 🔵 sub-1.py
import rembus

def mytopic(data):
    println("[Sub-1] mytopic:$data")
end

rb = await component(name="sub-1")
await subscribe(rb, mytopic)
await wait(rb)
# 🔵 sub-2.jl
import rembus

def mytopic(data):
    print(f"[Sub-2] DB update: {data}")

rb = rembus.node(name="sub-2")
rb.subscribe(mytopic)
rb.wait()
# 🟢 publish.jl
using Rembus

rb = component([
  "ws://:3001/client",
  "ws://:3002/client"
])

sensor1 = Dict(
  "T" => 18.3, "H" => 45.2
)
sensor2 = Dict(
  "P" => 2.3
)

publish(
    rb,
    "mytopic",
    Dict(
        "sensor#1" => sensor1,
        "sensor#2" => sensor2,
    )
)