Rembus

Rembus is a Julia package designed for building distributed applications using both Publish/Subscribe (Pub/Sub) and Remote Procedure Call (RPC) communication patterns.

A key distinguishing feature of Rembus is its highly flexible role system, allowing a single application to act as a client, server, publisher, subscriber, and even a message broker concurrently.

This unique capability enables the implementation of a wide range of distributed architectures.

Key Features:

  • Support multiple transport protocol: WebSocket, TCP, and ZeroMQ.
  • Efficient CBOR encoding for primitive types.
  • Optimized Arrow Table Format for encodings DataFrames.

Application Roles:

An application utilizing Rembus can assume one or more of the following roles:

  • RPC Client (Requestor): Initiates requests for services from other components.
  • RPC Server (Exposer): Provides and executes services in response to requests.
  • Pub/Sub Publisher: Produces and disseminates messages to interested subscribers.
  • Pub/Sub Subscriber: Consumes messages published on specific topics.
  • Broker: Routes messages between connected components, potentially across different transport protocols.
  • Broker and Component: Combines the routing capabilities of a broker with the application logic of a component.
  • Server: Accepts connections from clients but does not route messages between them in the same way a broker does.

Installation

using Pkg
Pkg.add("Rembus")

Broker

A Rembus Broker acts as a central message router, facilitating communication between components. Importantly, a Broker can bridge components using different transport protocols (e.g., a ZeroMQ component can communicate with a WebSocket component).

Starting a basic WebSocket Broker:

using Rembus

component() # Starts a WebSocket server listening on port 8000

The connection point for this broker is ws://host:8000.

A Broker can also function as a Component, connecting to another broker while simultaneously acting as a local broker:

using Rembus
rb = component("ws://myhost:8000/mynode", ws=9000)

Here, the mynode component connects to the broker at myhost:8000 and also acts as a broker, accepting WebSocket connections on port 9000 and routing messages between its connected components.

Component

A Rembus Component is a process that embodies one or more of the communication roles (Publisher, Subscriber, Requestor, Exposer). To connect to a broker, a component uses a URL with the broker's connection point and a unique component identifier:

component_url = "[<protocol>://][<host>][:<port>/][<cid>]"

Where <protocol> is one of ws, wss, tcp, tls, or zmq. <host> and <port> specify the broker's address, and <cid> is the component's unique name (optional for anonymous components).

Example connecting a named component:

rb = component("ws://host:8000/my_component")

A Component can also act as a Broker:

pub = component("ws://host:8000/my_pub", ws=9000)

The my_pub component communicates with the broker at host:8000 and simultaneously acts as a WebSocket broker on port 9000 for other components..

Types of Components:

  • Anonymous: Assumes a random, ephemeral identity on each connection. Useful when message origin tracing isn't required, for subscribers uninterested in offline messages, and for prototyping.
  • Named: Possesses a unique, persistent name, enabling it to receive messages published while offline.
  • Authenticated: A named component with cryptographic credentials (private key or shared secret) to prove its identity, allowing access to private Pub/Sub topics and RPC methods.

Server

Rembus simplifies the client-server architecture with a dedicated server API for creating components that accept client connections without acting as general-purpose message routers:

rb = server(ws=9876)

A server can expose RPC services and subscribe to Pub/Sub topics (typical server roles) but can also publish messages or request RPC services from its connected clients.

A Simple Broker Script

#!/bin/bash
#=
SDIR=$( dirname -- "${BASH_SOURCE[0]}" )
BINDIR=$( cd -- $SDIR &> /dev/null && pwd )
exec julia -t auto --color=no -e "include(popfirst!(ARGS))" \
 --project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using Rembus
Rembus.brokerd()

This script starts a Rembus broker with a default WebSocket server on port 8000. Use command-line arguments (e.g., ./broker -t 8001 -z 8002) to enable TCP and ZeroMQ transports.

shell> ./broker
usage: broker [-n NAME] [-x] [-s] [-a] [-p HTTP] [-m PROMETHEUS]
              [-w WS] [-t TCP] [-z ZMQ] [-r POLICY] [-d] [-i] [-h]

optional arguments:
  -n, --name NAME       broker name (default: "broker")
  -x, --reset           factory reset, clean up broker configuration
  -s, --secure          accept wss and tls connections
  -a, --authenticated   only authenticated components allowed
  -p, --http HTTP       accept HTTP clients on port HTTP (type:
                        UInt16)
  -m, --prometheus PROMETHEUS
                        prometheus exposer port (type: UInt16)
  -w, --ws WS           accept WebSocket clients on port WS (type:
                        UInt16)
  -t, --tcp TCP         accept tcp clients on port TCP (type: UInt16)
  -z, --zmq ZMQ         accept zmq clients on port ZMQ (type: UInt16)
  -r, --policy POLICY   set the broker routing policy: first_up,
                        round_robin, less_busy (default: "first_up")
  -d, --debug           enable debug logs
  -i, --info            enable info logs
  -h, --help            show this help message and exit

See Configuration for customizing the runtime setting.

Index

Rembus.RembusErrorType
struct RembusError <: Rembus.RembusException

Generic Rembus error.

Fields

  • code::UInt8: error code

  • topic::Union{Nothing, String}: topic name if available

  • reason::Union{Nothing, String}: detailed error message

source
Rembus.RembusTimeoutType
struct RembusTimeout{T} <: Rembus.RembusException

Exception thrown when a response is not received before the request timeout expires.

Fields

  • msg::Any: request message
source
Rembus.RpcMethodExceptionType
struct RpcMethodException <: Rembus.RembusException

Thrown when a RPC method throws an exception.

Fields

  • topic::String: service name

  • reason::String: remote exception description

Example

A component exposes a method that expect a string argument.

@expose foo(name::AbstractString) = "hello " * name

A RPC client invoke the method with an integer argument.

try
    @rpc foo(1)
catch e
    @error e.reason
end

The result is an exception:

┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│   foo(!Matched::AbstractString)
│    @ Main REPL[2]:1
└ @ Main REPL
source
Rembus.RpcMethodLoopbackType
struct RpcMethodLoopback <: Rembus.RembusException

Thrown when a RPC request would invoke a locally exposed method.

Fields

  • topic::String: service name
source
Rembus.RpcMethodNotFoundType
struct RpcMethodNotFound <: Rembus.RembusException

Exception thrown from a rpc request when the called method is unknown.

Fields

  • topic::String: service name

Example

An RPC Client request a method that does not exist.

@rpc coolservice()

The result is an exception:

ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
source
Rembus.RpcMethodUnavailableType
struct RpcMethodUnavailable <: Rembus.RembusException

Thrown when a RPC method is unavailable.

A method is considered unavailable when some component that exposed the method is currently disconnected from the broker.

Fields

  • topic::String: service name
source
Base.closeMethod
close(rb::Rembus.Twin)

Close the connection and terminate the component.

source
Base.isopenMethod
isopen(rb::Rembus.Twin) -> Bool

Check if the component is connected to the broker.

source
Base.waitMethod
wait(rb::Rembus.Twin)

Wait for RPC requests and Pub/Sub messages.

source
Rembus.authorizeMethod
function authorize(rb, client::AbstractString, topic::AbstractString)

Authorize the client component to use the private topic.

The component must have the admin role for granting topic accessibility.

source
Rembus.brokerFunction
broker(; <keyword arguments>)

Start a broker node and return a handle for interacting with it.

The broker acts as a central node to manage routing of RPC requests and Pub/Sub messages between nodes.

It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security, authentication, and routing policies.

Keyword arguments

  • name::AbstractString="broker": The unique identifier for the broker supervisor process.

  • ws=nothing: The WebSocket (ws/wss) listening port. Set to nothing to disable.

  • tcp=nothing: The TCP (tcp/tls) listening port. Set to nothing to disable.

  • zmq=nothing: The ZMQ Router listening port. Set to nothing to disable.

  • prometheus=nothing: The Prometheus port for scraping monitoring metrics. Set to nothing to disable.

  • secure=false: If true, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.

  • authenticated=false: If true, only allows connections from named and authenticated nodes.

  • policy::String="first_up": The routing policy used when topics are served by multiple nodes. Possible options include:

    • "first_up": Selects the first connected node from the list of nodes exposing the RPC method.
    • "round_robin": Distributes requests evenly across nodes in a round-robin fashion.
    • "less_busy": Chooses the node with fewer outstanding requests.

    Default Behavior

    If ws, tcp, and zmq are all set to nothing, the broker will default to listening for WebSocket connections on port 8000.

source
Rembus.componentMethod
component(url::AbstractString; <keyword arguments>)

Start a component node and return a handle for interacting with it.

In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.

The url argument specifies the connection details for the component. For example, the URL ws://127.0.0.1:8000/foo specifies:

  • Protocol: ws (WebSocket). Other supported protocols: wss, tcp, tls, zmq.
  • Address: 127.0.0.1 (localhost).
  • Port: 8000.
  • Component Name: foo.

Anonymous connections omit the path part of the URL.

If not specified, Rembus considers the above values as the default values.

This means the URL ws://127.0.0.1:8000/foo is equivalent to simply foo.

Additionally, a component may listen for incoming connections on configured ports, enabling it to act as a broker. These ports are specified using keyword arguments.

Keyword Arguments

  • name=missing: Unique string identifier for the component's supervisor process. Defaults to the path part of the url argument if missing.
  • ws=nothing: WebSocket (ws/wss) listening port. Set to nothing to disable.
  • tcp=nothing: TCP (tcp/tls) listening port. Set to nothing to disable.
  • zmq=nothing: ZMQ Router listening port. Set to nothing to disable.
  • secure=false: If true, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.
  • authenticated=false: If true, only allows connections from named and authenticated nodes.
  • policy::String="first_up": The routing policy used when topics are served by multiple nodes. Options:
    • "first_up": Selects the first connected node from the list of nodes exposing the RPC method.
    • "round_robin": Distributes requests evenly across nodes in a round-robin fashion.
    • "less_busy": Chooses the node with fewer outstanding requests.
source
Rembus.componentMethod
component(urls::Vector)

Start a component that connects to a pool of nodes defined by the urls array.

source
Rembus.connectMethod
connect(url::AbstractString) -> Rembus.Twin

Connect to the remote endpoint defined by url.

A disconnection from the remote endpoint will not trigger automatic reconnection.

Example

rb = connect("ws://127.0.0.1:8000/mycomponent")

source
Rembus.do_connectMethod
do_connect(twin::Twin)

Connect to the endpoint declared with REMBUS_BASE_URL env variable.

REMBUS_BASE_URL default to ws://127.0.0.1:8000

A component is considered anonymous when a different and random UUID is used as component identifier each time the application connect to the broker.

source
Rembus.exposeMethod
expose(rb, topic::AbstractString, fn::Function)
expose(rb, fn::Function)

Expose the methods of function fn to be executed by rpc clients using topic as RPC method name.

If the topic argument is omitted the function name equals to the RPC method name.

fn returns the RPC response.

Expose the methods of function fn to be invoked by RPC clients, using topic as the RPC method name.

If the topic argument is omitted, the function name is used as the RPC method name.

fn is expected to return the RPC response. Any exceptions thrown by fn are caught and returned as an RPC exception.

source
Rembus.injectFunction
inject(rb, ctx)

Bind a ctx context to the rb component.

When a ctx context is bound, the method signatures of subscribed and exposed functions change as follows:

  • the first argument is the ctx context.
  • the second argument is the rb component.
  • The remaining arguments correspond to the RPC request's arguments.

The ctx is useful for maintaining local state contextualized to the rb component.

Example

using Rembus

# keep the number of processed messages
mutable struct Context
    msgcount::UInt
end

function topic(context::Context, rb, arg1, arg2)
    context.msgcount += 1
    some_logic(arg1, arg2)
end

ctx = Context(0)
rb = component("myname")
subscribe(rb, topic)
inject(rb, ctx)

In this example, when a message is published:

publish(rb, "topic", arg1, arg2)

The invoked method will receive the context and component as the first two arguments:

foo(container, rb, arg2, arg2)
source
Rembus.private_topicMethod
private_topic(rb, topic::AbstractString)

Set the specified topic to private.

The component must have the admin role to change the privacy level.

source
Rembus.public_topicMethod
public_topic(rb, topic::AbstractString)

Set the specified topic to public.

The component must have the admin role to change the privacy level.

source
Rembus.publishMethod
publish(rb, topic::AbstractString, data...; qos=Rembus.QOS0)

Publish (Vararg) data values to a specified topic.

Each item in data is mapped to an argument of the remote method subscribed to the topic.

The data values can be of any type. However, if the components are implemented in different languages, the values must be either DataFrames or primitive types that are CBOR-encodable (see RFC 8949) for interoperability.

The keywork argument qos defines the quality of service (QoS) for message delivery. Possible values:

  • Rembus.QOS0: (default): At most one message is delivered to the subscriber (message may be lost).
  • Rembus.QOS1: At least one message is delivered to the subscriber (message may be duplicated).
  • Rembus.QOS2: Exactly one message is delivered to the subscriber.

Examples

If the subscriber is a method that expects two arguments:

mytopic(x, y) = ...  # do something with x and y

You can publish a message with (Vararg) data consisting of two values:

rb = component("myname")
publish(rb, "mytopic", 1, 2)

If the remote subscribed method has no arguments invoke publish as:

publish(rb, "mytopic")
source
Rembus.reactiveFunction
reactive(rb, from::Union{Real,Period,Dates.CompoundPeriod}=Day(1))

Enable the reception of published messages for topics to which the node is subscribed via subscribe.

The from argument specifies the starting point in time from which messages published while the component was offline will be delivered upon reconnection.

This value applies to all subscribed topics but can be overridden by the from argument in the subscribe method for a specific topic — though only to define narrower time ranges.

Possible from values:

  • Rembus.Now: Equivalent to 0.0, ignores old messages, and starts receiving only new messages from now.
  • Rembus.LastReceived: Receives all messages published since the last disconnection.
  • n::Float64: Receives messages published within the last n seconds.
  • Dates.CompoundPeriod: Defines a custom period using a CompoundPeriod value.

Example

rb = component("myname")
subscribe(rb, "mytopic1", Rembus.Now)
subscribe(rb, "mytopic2", Rembus.LastReceived)
subscribe(rb, "mytopic3", Hour(1))

reactive(rb, Day(1))

In this example:

  • mytopic1 will receive messages starting from now.
  • mytopic2 will receive messages published within the last day, even if subscribe() uses Rembus.LastReceived.
  • mytopic3 will receive messages published within the last hour.
source
Rembus.registerMethod
register(
    cid::AbstractString,
    pin::AbstractString;
    scheme::UInt8
)

Register the component with name cid.

To register a component a single pin or a set of tenants must be configured in the tenants.json file.

The pin shared secret is a 8 hex digits string (for example "deedbeef").

source
Rembus.ridMethod
rid(rb::Rembus.Twin) -> String

Return the identifier of the component (Rembus IDentifier).

rb = component("ws://myhost.org:8000/myname")
rid(rb) === "myname"
source
Rembus.rpcMethod
rpc(rb, service::AbstractString, data...)

Make a request for a remote service method using Vararg data values as method arguments.

Return a value or throw an error if a request timeout occurs or the remotely invoked method throws an exception.

Example

rb = component("myclient")
# invoke mysum(1, 2) on the remote site
result = rpc(rb, "mysum", 1, 2)
source
Rembus.serverMethod
server(; <keyword arguments>)

Start a server node and return a handle for interacting with it.

The server accepts connection from client nodes.

It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security and authentication.

Keyword arguments

  • name::AbstractString="broker": The unique identifier for the server supervisor process.
  • ws=nothing: The WebSocket (ws/wss) listening port. Set to nothing to disable.
  • tcp=nothing: The TCP (tcp/tls) listening port. Set to nothing to disable.
  • zmq=nothing: The ZMQ Router listening port. Set to nothing to disable.
  • prometheus=nothing: The Prometheus port for scraping monitoring metrics. Set to nothing to disable.
  • secure=false: If true, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.
  • authenticated=false: If true, only allows connections from named and authenticated nodes.

Default Behavior

If ws, tcp, and zmq are all set to nothing, the broker will default to listening for WebSocket connections on port 8000.

source
Rembus.start_brokerMethod
start_broker(;
    wait=true,
    secure=nothing,
    ws=nothing,
    tcp=nothing,
    zmq=nothing,
    http=nothing,
    prometheus=nothing,
    name="broker",
    authenticated=false,
    reset=nothing,
    plugin=nothing,
    context=nothing
)

Start the node.

Return immediately when wait is false, otherwise blocks until shutdown is requested.

Overwrite command line arguments if args is not empty.

source
Rembus.subscribeFunction
subscribe(rb, fn::Function, from=Rembus.Now)

Subscribe to messages published to a topic and register a callback function fn to handle incoming messages.

If the topic argument is omitted, the function name must be equal to the topic name.

The from (default=Rembus.Now) argument defines the starting point in time from which messages published while the component was offline will be sent upon reconnection. Possible from values:

  • Rembus.Now: Equivalent to 0.0, ignores old messages, and starts receiving only new messages from now.
  • Rembus.LastReceived: Receives all messages published since the last disconnection.
  • n::Float64: Receives messages published within the last n seconds.
  • Dates.CompoundPeriod: Defines a custom period using a CompoundPeriod value.

To enable the reception of published messages, the reactive function must be called.

Example

rb = component("myname")

# Define a callback function for the "mytopic" topic
function mytopic(data)
    println("Received: ", data)
end

# Subscribe to "mytopic", receiving only new messages from now
subscribe(rb, mytopic, from=Rembus.Now)
reactive(rb)
source
Rembus.unauthorizeMethod
function unauthorize(rb, client::AbstractString, topic::AbstractString)

Revoke authorization to the client component for use of the private topic.

The component must have the admin role for revoking topic accessibility.

source
Rembus.unexposeMethod
unexpose(rb, service::AbstractString)

Stop servicing RPC requests targeting service.

source
Rembus.unexposeMethod
unexpose(rb, fn::Function)

Stop servicing RPC requests targeting fn function.

source
Rembus.unregisterMethod
unregister(twin)

Unregister the connected component.

Only a connected and authenticated component may execute the unregister command.

using Rembus

twin = connect("authenticated_component")
Rembus.unregister(twin)
close(twin)
source
Rembus.unsubscribeMethod
unsubscribe(rb, topic::AbstractString)

Stops delivering messages published on the specified topic to the rb component.

source
Visor.shutdownMethod
shutdown(rb::Rembus.Twin)

Close the connection and terminate the component.

source
Rembus.@componentMacro
@component "url"

Set the name of the component and the protocol for connecting to the broker.

url may be:

  • "myname": use $REMBUS_BASE_URL for connection parameters
  • "tcp://host:port/myname": tcp connection
  • "ws://host:port/myname": web socket connection
  • "zmq://host:port/myname": ZeroMQ connection
source
Rembus.@exposeMacro
@expose fn

Expose all the methods of the function fn.

Example

Expose the function mycalc that implements a service that may accept two numbers or a string and number:

mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y

@expose mycalc

Call mycal service using the correct types of arguments:

# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003

# ok
julia> response = @rpc mycalc("hello",2.0)
10.0

If the RPC client call mycalc with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException

julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
source
Rembus.@injectMacro
 @inject container

Binds a container object, which is passed as the first argument to subscribed component functions.

See inject for more details.

source
Rembus.@publishMacro
@publish topic(arg1,arg2,...)

Publish a message to topic logic channel.

The function topic(arg1,arg2,...) will be called on each connected component subscribed to topic.

Publisher

@publish foo("gfr", 54.2)

Subscriber

function foo(name, value)
    println("do something with $name=$value")
end

@subscribe foo
@reactive

supervise()
source
Rembus.@rpcMacro
@rpc service(arg1,...)

Call the remote service method and return its outcome.

The outcome may be the a return value or a RpcMethodException if the remote throws an exception.

The service method must match the signature of an exposed remote service method.

Components may subscribe to service for receiving the service request.

Exposer

function mymethod(x, y)
    return evaluate(x,y)
end

@expose mymethod
supervise()

RPC client

response = @rpc mymethod(x,y)

Subscriber

function service(x, y)
    ...
end

@subscribe service
@reactive

supervise()
source
Rembus.@subscribeMacro
@subscribe topic [mode]

Setup a subscription to topic logic channel to handle messages from @publish or @rpc.

mode values`:

  • from_now (default): receive messages published from now.
  • before_now: receive messages published when the component was offline.

Messages starts to be delivered to topic when reactivity is enabled with @reactive macro.

Subscriber

function foo(arg1, arg2)
    ...
end

@subscribe foo
@reactive

supervise()

Publisher

@publish foo("gfr", 54.2)
source
Rembus.@waitMacro
@wait

Block forever waiting for Ctrl-C/InterruptException or root supervisor shutdown.

source