Python

Getting Started

Install rembus using pip or uv:

pip install rembus
uv add rembus

For the basic concepts about node types and addressing see the Julia docs

Rembus provides both synchronous and asynchronous APIs:

import rembus as rb

# Synchronous API
cli = rb.node("mynode")
import rembus as rb

# Asynchronous API
cli = await rembus.component("mynode")

The returned cli object represents a Rembus component and exposes the core communication primitives:

  • rpc — request a remote service
  • expose — expose a local function as an RPC service
  • publish — publish messages to a topic
  • subscribe — subscribe to topic updates

RPC Call

flowchart LR
  C(("Client")) --> S((Server))
  style C fill:#009E73, color:white
  style S fill:#0072B2, color:white

This example shows a Python client invoking an RPC service implemented in Julia.

Julia Server

using Statistics
using Rembus

function stats(df)
    return Dict(
        "min"  => minimum(df.value),
        "max"  => maximum(df.value),
        "mean" => mean(df.value),
        "std"  => std(df.value)
    )
end

bro = component()
expose(bro, stats)

println("up and running")
wait(bro)

Python Client

import rembus as rb
import pandas as pd
from random import random

nrows = 1_000_000

df = pd.DataFrame({
    "name": [f"kpi_{i}" for i in range(nrows)],
    "ts": range(nrows),
    "value": [random() for _ in range(nrows)],
})

cli = rembus.node("foo")
summary = cli.rpc("stats", df)
cli.close()

Data Flow

  1. The Python client sends a pandas DataFrame.
  2. Rembus converts it into a Julia DataFrame.
  3. The Julia service executes the computation.
  4. The result is returned as a Python dictionary.
  5. The WS connection is closed.

Publish / Subscribe

This example shows a couple of subsribed nodes implemented in Python and a Julia publisher.

flowchart LR
  C(("Publisher")) --> S1((Subscriber 1))
  C --> S2((Subscriber 2))
  style C fill:#009E73, color:white
  style S1 fill:#0072B2, color:white
  style S2 fill:#0072B2, color:white

Subscribers (Python)

import rembus as rb

def mytopic(data):
    print(f"[Sub-1] mytopic: {data}")

sub = rembus.node(name="sub-1")
sub.subscribe(mytopic)

print("up and running")
sub.wait()
import rembus

def mytopic(data):
    print(f"[Sub-2] mytopic: {data}")

sub = rembus.node(name="sub-2")
sub.subscribe(mytopic)

print("up and running")
sub.wait()

Publisher (Julia)

using Rembus

pub = component([
  "ws://:3001/client",
  "ws://:3002/client"
])

sensor1 = Dict("T" => 18.3, "H" => 45.2)
sensor2 = Dict("P" => 2.3)

publish(
    pub,
    "mytopic",
    Dict(
        "sensor#1" => sensor1,
        "sensor#2" => sensor2,
    )
)

close(pub)