Rembus

Rembus is a middleware for Pub/Sub and RPC communication styles.

A Rembus node may play one o more roles:

  • RPC client
  • RPC server
  • Pub/Sub publisher
  • Pub/Sub subscriber
  • Broker

This meshup of roles enables a to implements a set of distributed architectures.

Installation

using Pkg
Pkg.add("Rembus")

Broker

A Broker is a process that routes messages between components.

A Broker is capable of making components that use different transport protocols talk to each other. For example a component that uses a ZeroMQ socket may talk to a component that uses the WebSocket protocol.

Starting a Broker is simple as:

using Rembus

broker()

Calling broker without arguments start by default a WebSocket server listening on port 8000.

A startup script could be useful and the following broker script will do:

#!/bin/bash
#=
SDIR=$( dirname -- "${BASH_SOURCE[0]}" )
BINDIR=$( cd -- $SDIR &> /dev/null && pwd )
exec julia -t auto --color=no -e "include(popfirst!(ARGS))" \
 --project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using ArgParse
using Rembus

function command_line(default_name="broker")
    s = ArgParseSettings()
    @add_arg_table! s begin
        "--name", "-n"
        help = "broker name"
        default = default_name
        arg_type = String
        "--reset", "-r"
        help = "factory reset, clean up broker configuration"
        action = :store_true
        "--secure", "-s"
        help = "accept wss and tls connections"
        action = :store_true
        "--authenticated", "-a"
        help = "only authenticated components allowed"
        action = :store_true
        "--http", "-p"
        help = "accept HTTP clients on port HTTP"
        arg_type = UInt16
        "--prometheus", "-m"
        help = "prometheus exposer port"
        arg_type = UInt16
        "--ws", "-w"
        help = "accept WebSocket clients on port WS"
        arg_type = UInt16
        "--tcp", "-t"
        help = "accept tcp clients on port TCP"
        arg_type = UInt16
        "--zmq", "-z"
        help = "accept zmq clients on port ZMQ"
        arg_type = UInt16
        "--policy"
        help = "set the broker routing policy"
        arg_type = Symbol
        "--debug", "-d"
        help = "enable debug logs"
        action = :store_true
        "--info", "-i"
        help = "enable info logs"
        action = :store_true
    end
    return parse_args(s)
end

args = command_line()
name = args["name"]
if args["reset"]
    Rembus.broker_reset(name)
end

if args["debug"]
    Rembus.debug!()
elseif args["info"]
    Rembus.info!()
end

wait(Rembus.broker(
    name=name,
    ws=args["ws"],
    tcp=args["tcp"],
    zmq=args["zmq"],
    prometheus=args["prometheus"],
    authenticated=args["authenticated"]
))

broker starts by default a WebSocket server listening on port 8000, for enabling tcp and/or zmq transports use the appropriate arguments:

shell> ./broker
usage: broker [-r] [-s] [-p HTTP] [-w WS] [-t TCP] [-z ZMQ] [-d] [-h]

optional arguments:
  -r, --reset      factory reset, clean up broker configuration
  -s, --secure     accept wss and tls connections
  -p, --http HTTP  accept HTTP clients on port HTTP (type: UInt16)
  -w, --ws WS      accept WebSocket clients on port WS (type: UInt16)
  -t, --tcp TCP    accept tcp clients on port TCP (type: UInt16)
  -z, --zmq ZMQ    accept zmq clients on port ZMQ (type: UInt16)
  -d, --debug      enable debug logs
  -h, --help       show this help message and exit

See Broker environment variables for customizing the runtime setting.

Component

A Component is a process that plays one or more of the following roles:

  • Publisher (Pub/Sub) : produce messages;
  • Subscriber (Pub/Sub): consume published messages;
  • Requestor (RPC): request a service;
  • Exposer (RPC): execute a service request and give back a response;

There are three type of components:

  • Anonymous
  • Named
  • Authenticated

An Anonymous component assume a random and ephemeral identity each time it connects to the broker. Example usage for anonymous components may be:

  • when it is not required to trace the originating source of messages;
  • for a Subscriber not interested to receive messages published when it was offline;
  • for preliminary prototyping;

A Named component has a unique and persistent name that make possible to receive messages published when the component was offline.

An Authenticated component is a named component that own a private key or a shared secret which can prove its identity.

Only authenticated components may use Pub/Sub private topics and private RPC methods.

An URL string defines the identity and the connection parameters of a component. The Macro-based API page documents the URL format.

Index

Rembus.RembusErrorType
struct RembusError <: Rembus.RembusException

Generic Rembus error.

Fields

  • code::UInt8: error code

  • topic::Union{Nothing, String}: topic name if available

  • reason::Union{Nothing, String}: detailed error message

source
Rembus.RembusTimeoutType
struct RembusTimeout{T} <: Rembus.RembusException

Exception thrown when a response is not received before the request timeout expires.

Fields

  • msg::Any: request message
source
Rembus.RpcMethodExceptionType
struct RpcMethodException <: Rembus.RembusException

Thrown when a RPC method throws an exception.

Fields

  • topic::String: service name

  • reason::String: remote exception description

Example

A component exposes a method that expect a string argument.

@expose foo(name::AbstractString) = "hello " * name

A RPC client invoke the method with an integer argument.

try
    @rpc foo(1)
catch e
    @error e.reason
end

The result is an exception:

┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│   foo(!Matched::AbstractString)
│    @ Main REPL[2]:1
└ @ Main REPL
source
Rembus.RpcMethodLoopbackType
struct RpcMethodLoopback <: Rembus.RembusException

Thrown when a RPC request would invoke a locally exposed method.

Fields

  • topic::String: service name
source
Rembus.RpcMethodNotFoundType
struct RpcMethodNotFound <: Rembus.RembusException

Exception thrown from a rpc request when the called method is unknown.

Fields

  • topic::String: service name

Example

An RPC Client request a method that does not exist.

@rpc coolservice()

The result is an exception:

ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
source
Rembus.RpcMethodUnavailableType
struct RpcMethodUnavailable <: Rembus.RembusException

Thrown when a RPC method is unavailable.

A method is considered unavailable when some component that exposed the method is currently disconnected from the broker.

Fields

  • topic::String: service name
source
Rembus.authorizeMethod
function authorize(rb, client::AbstractString, topic::AbstractString)

Authorize the client component to use the private topic.

The component must have the admin role for granting topic accessibility.

source
Rembus.brokerFunction
broker(; <keyword arguments>)

Start a broker node and return a handle for interacting with it.

The broker acts as a central node to manage routing of RPC requests and Pub/Sub messages between nodes.

It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security, authentication, and routing policies.

Keyword arguments

  • name::AbstractString="broker": The unique identifier for the broker supervisor process.

  • ws=nothing: The WebSocket (ws/wss) listening port. Set to nothing to disable.

  • tcp=nothing: The TCP (tcp/tls) listening port. Set to nothing to disable.

  • zmq=nothing: The ZMQ Router listening port. Set to nothing to disable.

  • prometheus=nothing: The Prometheus port for scraping monitoring metrics. Set to nothing to disable.

  • secure=false: If true, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.

  • authenticated=false: If true, only allows connections from named and authenticated nodes.

  • policy::String="first_up": The routing policy used when topics are served by multiple nodes. Possible options include:

    • "first_up": Selects the first connected node from the list of nodes exposing the RPC method.
    • "round_robin": Distributes requests evenly across nodes in a round-robin fashion.
    • "less_busy": Chooses the node with fewer outstanding requests.

    Default Behavior

    If ws, tcp, and zmq are all set to nothing, the broker will default to listening for WebSocket connections on port 8000.

source
Rembus.componentMethod
component(url::AbstractString; <keyword arguments>)

Start a component node and return a handle for interacting with it.

In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.

The url argument specifies the connection details for the component. For example, the URL ws://127.0.0.1:8000/foo specifies:

  • Protocol: ws (WebSocket). Other supported protocols: wss, tcp, tls, zmq.
  • Address: 127.0.0.1 (localhost).
  • Port: 8000.
  • Component Name: foo.

Anonymous connections omit the path part of the URL.

If not specified, Rembus considers the above values as the default values.

This means the URL ws://127.0.0.1:8000/foo is equivalent to simply foo.

Additionally, a component may listen for incoming connections on configured ports, enabling it to act as a broker. These ports are specified using keyword arguments.

Keyword Arguments

  • name=missing: Unique string identifier for the component's supervisor process. Defaults to the path part of the url argument if missing.
  • ws=nothing: WebSocket (ws/wss) listening port. Set to nothing to disable.
  • tcp=nothing: TCP (tcp/tls) listening port. Set to nothing to disable.
  • zmq=nothing: ZMQ Router listening port. Set to nothing to disable.
  • secure=false: If true, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.
  • authenticated=false: If true, only allows connections from named and authenticated nodes.
  • policy::String="first_up": The routing policy used when topics are served by multiple nodes. Options:
    • "first_up": Selects the first connected node from the list of nodes exposing the RPC method.
    • "round_robin": Distributes requests evenly across nodes in a round-robin fashion.
    • "less_busy": Chooses the node with fewer outstanding requests.
source
Rembus.componentMethod
component(urls::Vector)

Start a component that connects to a pool of nodes defined by the urls array.

source
Rembus.do_connectMethod
do_connect(twin::Twin)

Connect to the endpoint declared with REMBUS_BASE_URL env variable.

REMBUS_BASE_URL default to ws://127.0.0.1:8000

A component is considered anonymous when a different and random UUID is used as component identifier each time the application connect to the broker.

source
Rembus.exposeMethod
expose(rb, topic::AbstractString, fn::Function)
expose(rb, fn::Function)

Expose the methods of function fn to be executed by rpc clients using topic as RPC method name.

If the topic argument is omitted the function name equals to the RPC method name.

fn returns the RPC response.

Expose the methods of function fn to be invoked by RPC clients, using topic as the RPC method name.

If the topic argument is omitted, the function name is used as the RPC method name.

fn is expected to return the RPC response. Any exceptions thrown by fn are caught and returned as an RPC exception.

source
Rembus.injectFunction
inject(rb, ctx)

Bind a ctx context to the rb component.

When a ctx context is bound, the method signatures of subscribed and exposed functions change as follows:

  • the first argument is the ctx context.
  • the second argument is the rb component.
  • The remaining arguments correspond to the RPC request's arguments.

The ctx is useful for maintaining local state contextualized to the rb component.

Example

using Rembus

# keep the number of processed messages
mutable struct Context
    msgcount::UInt
end

function topic(context::Context, rb, arg1, arg2)
    context.msgcount += 1
    some_logic(arg1, arg2)
end

ctx = Context(0)
rb = component("myname")
subscribe(rb, topic)
inject(rb, ctx)

In this example, when a message is published:

publish(rb, "topic", arg1, arg2)

The invoked method will receive the context and component as the first two arguments:

foo(container, rb, arg2, arg2)
source
Rembus.private_topicMethod
private_topic(rb, topic::AbstractString)

Set the specified topic to private.

The component must have the admin role to change the privacy level.

source
Rembus.public_topicMethod
public_topic(rb, topic::AbstractString)

Set the specified topic to public.

The component must have the admin role to change the privacy level.

source
Rembus.publishMethod
publish(rb, topic::AbstractString, data...; qos=Rembus.QOS0)

Publish (Vararg) data values to a specified topic.

Each item in data is mapped to an argument of the remote method subscribed to the topic.

The data values can be of any type. However, if the components are implemented in different languages, the values must be either DataFrames or primitive types that are CBOR-encodable (see RFC 8949) for interoperability.

The keywork argument qos defines the quality of service (QoS) for message delivery. Possible values:

  • Rembus.QOS0 (default): At most one message is delivered to the subscriber (message may be lost).
  • QOS1: At least one message is delivered to the subscriber (message may be duplicated).
  • Rembus.QOS2: Exactly one message is delivered to the subscriber.

Examples

If the subscriber is a method that expects two arguments:

mytopic(x, y) = ...  # do something with x and y

You can publish a message with (Vararg) data consisting of two values:

rb = component("myname")
publish(rb, "mytopic", 1, 2)

If the remote subscribed method has no arguments invoke publish as:

publish(rb, "mytopic")
source
Rembus.reactiveFunction
reactive(rb, from::Union{Real,Period,Dates.CompoundPeriod}=Day(1))

Enable the reception of published messages for topics to which the node is subscribed via subscribe.

The from argument specifies the starting point in time from which messages published while the component was offline will be delivered upon reconnection.

This value applies to all subscribed topics but can be overridden by the from argument in the subscribe method for a specific topic — though only to define narrower time ranges.

Possible from values:

  • Rembus.Now: Equivalent to 0.0, ignores old messages, and starts receiving only new messages from now.
  • Rembus.LastReceived: Receives all messages published since the last disconnection.
  • n::Float64: Receives messages published within the last n seconds.
  • Dates.CompoundPeriod: Defines a custom period using a CompoundPeriod value.

Example

rb = component("myname")
subscribe(rb, "mytopic1", Rembus.Now)
subscribe(rb, "mytopic2", Rembus.LastReceived)
subscribe(rb, "mytopic3", Hour(1))

reactive(rb, Day(1))

In this example:

  • mytopic1 will receive messages starting from now.
  • mytopic2 will receive messages published within the last day, even if subscribe() uses Rembus.LastReceived.
  • mytopic3 will receive messages published within the last hour.
source
Rembus.registerMethod
register(
    cid::AbstractString,
    pin::AbstractString;
    tenant=Union{Nothing, AbstractString} = nothing,
    scheme::UInt8
)

Register the component with name cid.

To register a component a single pin or a set of tenants must be configured in the tenants.json file.

The pin shared secret is a 8 hex digits string (for example "deedbeef").

source
Rembus.ridMethod
rid(rb)

Return the identifier of the component (Rembus IDentifier).

rb = component("ws://myhost.org:8000/myname")
rid(rb) === "myname"
source
Rembus.rpcMethod
rpc(rb, service::AbstractString, data...)

Make a request for a remote service method using Vararg data values as method arguments.

Return a value or throw an error if a request timeout occurs or the remotely invoked method throws an exception.

Example

rb = component("myclient")
# invoke mysum(1, 2) on the remote site
result = rpc(rb, "mysum", 1, 2)
source
Rembus.serverMethod
server(; <keyword arguments>)

Start a server node and return a handle for interacting with it.

The server accepts connection from client nodes.

It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security and authentication.

Keyword arguments

  • name::AbstractString="broker": The unique identifier for the server supervisor process.
  • ws=nothing: The WebSocket (ws/wss) listening port. Set to nothing to disable.
  • tcp=nothing: The TCP (tcp/tls) listening port. Set to nothing to disable.
  • zmq=nothing: The ZMQ Router listening port. Set to nothing to disable.
  • prometheus=nothing: The Prometheus port for scraping monitoring metrics. Set to nothing to disable.
  • secure=false: If true, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.
  • authenticated=false: If true, only allows connections from named and authenticated nodes.

Default Behavior

If ws, tcp, and zmq are all set to nothing, the broker will default to listening for WebSocket connections on port 8000.

source
Rembus.start_brokerMethod
start_broker(;
    wait=true,
    secure=nothing,
    ws=nothing,
    tcp=nothing,
    zmq=nothing,
    http=nothing,
    prometheus=nothing,
    name="broker",
    authenticated=false,
    reset=nothing,
    plugin=nothing,
    context=nothing
)

Start the node.

Return immediately when wait is false, otherwise blocks until shutdown is requested.

Overwrite command line arguments if args is not empty.

source
Rembus.subscribeFunction
subscribe(rb, fn::Function, from=Rembus.Now)

Subscribe to messages published to a topic and register a callback function fn to handle incoming messages.

If the topic argument is omitted, the function name must be equal to the topic name.

The from (default=Rembus.Now) argument defines the starting point in time from which messages published while the component was offline will be sent upon reconnection. Possible from values:

  • Rembus.Now: Equivalent to 0.0, ignores old messages, and starts receiving only new messages from now.
  • Rembus.LastReceived: Receives all messages published since the last disconnection.
  • n::Float64: Receives messages published within the last n seconds.
  • Dates.CompoundPeriod: Defines a custom period using a CompoundPeriod value.

Example

rb = component("myname")

# Define a callback function for the "mytopic" topic
function mytopic(data)
    println("Received: ", data)
end

# Subscribe to "mytopic", receiving only new messages from now
subscribe(rb, mytopic, from=Rembus.Now)
source
Rembus.unauthorizeMethod
function unauthorize(rb, client::AbstractString, topic::AbstractString)

Revoke authorization to the client component for use of the private topic.

The component must have the admin role for revoking topic accessibility.

source
Rembus.unexposeMethod
unexpose(rb, service::AbstractString)

Stop servicing RPC requests targeting service.

source
Rembus.unexposeMethod
unexpose(rb, fn::Function)

Stop servicing RPC requests targeting fn function.

source
Rembus.unregisterMethod
unregister(twin)

Unregister the connected component.

Only a connected and authenticated component may execute the unregister command.

using Rembus

twin = connect("authenticated_component")
Rembus.unregister(twin)
close(twin)
source
Rembus.unsubscribeMethod
unsubscribe(rb, topic::AbstractString)

Stops delivering messages published on the specified topic to the rb component.

source
Rembus.@componentMacro
@component "url"

Set the name of the component and the protocol for connecting to the broker.

url may be:

  • "myname": use $REMBUS_BASE_URL for connection parameters
  • "tcp://host:port/myname": tcp connection
  • "ws://host:port/myname": web socket connection
  • "zmq://host:port/myname": ZeroMQ connection
source
Rembus.@exposeMacro
@expose fn

Expose all the methods of the function fn.

Example

Expose the function mycalc that implements a service that may accept two numbers or a string and number:

mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y

@expose mycalc

Call mycal service using the correct types of arguments:

# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003

# ok
julia> response = @rpc mycalc("hello",2.0)
10.0

If the RPC client call mycalc with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException

julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
source
Rembus.@injectMacro
 @inject container

Binds a container object, which is passed as the first argument to subscribed component functions.

See inject for more details.

source
Rembus.@publishMacro
@publish topic(arg1,arg2,...)

Publish a message to topic logic channel.

The function topic(arg1,arg2,...) will be called on each connected component subscribed to topic.

Publisher

@publish foo("gfr", 54.2)

Subscriber

function foo(name, value)
    println("do something with $name=$value")
end

@subscribe foo
@reactive

supervise()
source
Rembus.@rpcMacro
@rpc service(arg1,...)

Call the remote service method and return its outcome.

The outcome may be the a return value or a RpcMethodException if the remote throws an exception.

The service method must match the signature of an exposed remote service method.

Components may subscribe to service for receiving the service request.

Exposer

function mymethod(x, y)
    return evaluate(x,y)
end

@expose mymethod
supervise()

RPC client

response = @rpc mymethod(x,y)

Subscriber

function service(x, y)
    ...
end

@subscribe service
@reactive

supervise()
source
Rembus.@subscribeMacro
@subscribe topic [mode]

Setup a subscription to topic logic channel to handle messages from @publish or @rpc.

mode values`:

  • from_now (default): receive messages published from now.
  • before_now: receive messages published when the component was offline.

Messages starts to be delivered to topic when reactivity is enabled with @reactive macro.

Subscriber

function foo(arg1, arg2)
    ...
end

@subscribe foo
@reactive

supervise()

Publisher

@publish foo("gfr", 54.2)
source
Rembus.@waitMacro
@wait

Block forever waiting for Ctrl-C/InterruptException or root supervisor shutdown.

source