Rembus
Rembus is a Julia package designed for building distributed applications using both Publish/Subscribe (Pub/Sub) and Remote Procedure Call (RPC) communication patterns.
A key distinguishing feature of Rembus is its highly flexible role system, allowing a single application to act as a client, server, publisher, subscriber, and even a message broker concurrently.
This unique capability enables the implementation of a wide range of distributed architectures.
Key Features:
- Support multiple transport protocol: WebSocket, TCP, and ZeroMQ.
- Efficient CBOR encoding for primitive types.
- Optimized Arrow Table Format for encodings DataFrames.
Application Roles:
An application utilizing Rembus can assume one or more of the following roles:
- RPC Client (Requestor): Initiates requests for services from other components.
- RPC Server (Exposer): Provides and executes services in response to requests.
- Pub/Sub Publisher: Produces and disseminates messages to interested subscribers.
- Pub/Sub Subscriber: Consumes messages published on specific topics.
- Broker: Routes messages between connected components, potentially across different transport protocols.
- Broker and Component: Combines the routing capabilities of a broker with the application logic of a component.
- Server: Accepts connections from clients but does not route messages between them in the same way a broker does.
Installation
using Pkg
Pkg.add("Rembus")
Broker
A Rembus Broker acts as a central message router, facilitating communication between components. Importantly, a Broker can bridge components using different transport protocols (e.g., a ZeroMQ component can communicate with a WebSocket component).
Starting a basic WebSocket Broker:
using Rembus
component() # Starts a WebSocket server listening on port 8000
The connection point for this broker is ws://host:8000
.
A Broker can also function as a Component, connecting to another broker while simultaneously acting as a local broker:
using Rembus
rb = component("ws://myhost:8000/mynode", ws=9000)
Here, the mynode
component connects to the broker at myhost:8000
and also acts as a broker, accepting WebSocket connections on port 9000
and routing messages between its connected components.
Component
A Rembus Component is a process that embodies one or more of the communication roles (Publisher, Subscriber, Requestor, Exposer). To connect to a broker, a component uses a URL with the broker's connection point and a unique component identifier:
component_url = "[<protocol>://][<host>][:<port>/][<cid>]"
Where <protocol>
is one of ws
, wss
, tcp
, tls
, or zmq
. <host>
and <port>
specify the broker's address, and <cid>
is the component's unique name (optional for anonymous components).
Example connecting a named component:
rb = component("ws://host:8000/my_component")
A Component can also act as a Broker:
pub = component("ws://host:8000/my_pub", ws=9000)
The my_pub
component communicates with the broker at host:8000
and simultaneously acts as a WebSocket broker on port 9000
for other components..
Types of Components:
- Anonymous: Assumes a random, ephemeral identity on each connection. Useful when message origin tracing isn't required, for subscribers uninterested in offline messages, and for prototyping.
- Named: Possesses a unique, persistent name, enabling it to receive messages published while offline.
- Authenticated: A named component with cryptographic credentials (private key or shared secret) to prove its identity, allowing access to private Pub/Sub topics and RPC methods.
Server
Rembus simplifies the client-server architecture with a dedicated server API for creating components that accept client connections without acting as general-purpose message routers:
rb = server(ws=9876)
A server can expose RPC services and subscribe to Pub/Sub topics (typical server roles) but can also publish messages or request RPC services from its connected clients.
A Simple Broker Script
#!/bin/bash
#=
SDIR=$( dirname -- "${BASH_SOURCE[0]}" )
BINDIR=$( cd -- $SDIR &> /dev/null && pwd )
exec julia -t auto --color=no -e "include(popfirst!(ARGS))" \
--project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using Rembus
Rembus.brokerd()
This script starts a Rembus broker with a default WebSocket server on port 8000
. Use command-line arguments (e.g., ./broker -t 8001 -z 8002
) to enable TCP and ZeroMQ transports.
shell> ./broker
usage: broker [-n NAME] [-x] [-s] [-a] [-p HTTP] [-m PROMETHEUS]
[-w WS] [-t TCP] [-z ZMQ] [-r POLICY] [-d] [-i] [-h]
optional arguments:
-n, --name NAME broker name (default: "broker")
-x, --reset factory reset, clean up broker configuration
-s, --secure accept wss and tls connections
-a, --authenticated only authenticated components allowed
-p, --http HTTP accept HTTP clients on port HTTP (type:
UInt16)
-m, --prometheus PROMETHEUS
prometheus exposer port (type: UInt16)
-w, --ws WS accept WebSocket clients on port WS (type:
UInt16)
-t, --tcp TCP accept tcp clients on port TCP (type: UInt16)
-z, --zmq ZMQ accept zmq clients on port ZMQ (type: UInt16)
-r, --policy POLICY set the broker routing policy: first_up,
round_robin, less_busy (default: "first_up")
-d, --debug enable debug logs
-i, --info enable info logs
-h, --help show this help message and exit
See Configuration for customizing the runtime setting.
Index
Rembus.RembusError
Rembus.RembusTimeout
Rembus.RpcMethodException
Rembus.RpcMethodLoopback
Rembus.RpcMethodNotFound
Rembus.RpcMethodUnavailable
Base.close
Base.isopen
Base.wait
Rembus.authorize
Rembus.broker
Rembus.component
Rembus.component
Rembus.connect
Rembus.do_connect
Rembus.expose
Rembus.get_private_topics
Rembus.ifdown_block
Rembus.inject
Rembus.isauthenticated
Rembus.private_topic
Rembus.public_topic
Rembus.publish
Rembus.reactive
Rembus.register
Rembus.request_timeout
Rembus.request_timeout!
Rembus.rid
Rembus.rpc
Rembus.server
Rembus.start_broker
Rembus.subscribe
Rembus.subscribe
Rembus.unauthorize
Rembus.unexpose
Rembus.unexpose
Rembus.unreactive
Rembus.unregister
Rembus.unsubscribe
Rembus.unsubscribe
Visor.shutdown
Rembus.@component
Rembus.@expose
Rembus.@expose
Rembus.@inject
Rembus.@publish
Rembus.@reactive
Rembus.@rpc
Rembus.@subscribe
Rembus.@subscribe
Rembus.@unexpose
Rembus.@unreactive
Rembus.@unsubscribe
Rembus.@wait
Rembus.RembusError
— Typestruct RembusError <: Rembus.RembusException
Generic Rembus error.
Fields
code::UInt8
: error codetopic::Union{Nothing, String}
: topic name if availablereason::Union{Nothing, String}
: detailed error message
Rembus.RembusTimeout
— Typestruct RembusTimeout{T} <: Rembus.RembusException
Exception thrown when a response is not received before the request timeout expires.
Fields
msg::Any
: request message
Rembus.RpcMethodException
— Typestruct RpcMethodException <: Rembus.RembusException
Thrown when a RPC method throws an exception.
Fields
topic::String
: service namereason::String
: remote exception description
Example
A component exposes a method that expect a string argument.
@expose foo(name::AbstractString) = "hello " * name
A RPC client invoke the method with an integer argument.
try
@rpc foo(1)
catch e
@error e.reason
end
The result is an exception:
┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│ foo(!Matched::AbstractString)
│ @ Main REPL[2]:1
└ @ Main REPL
Rembus.RpcMethodLoopback
— Typestruct RpcMethodLoopback <: Rembus.RembusException
Thrown when a RPC request would invoke a locally exposed method.
Fields
topic::String
: service name
Rembus.RpcMethodNotFound
— Typestruct RpcMethodNotFound <: Rembus.RembusException
Exception thrown from a rpc request when the called method is unknown.
Fields
topic::String
: service name
Example
An RPC Client request a method that does not exist.
@rpc coolservice()
The result is an exception:
ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
Rembus.RpcMethodUnavailable
— Typestruct RpcMethodUnavailable <: Rembus.RembusException
Thrown when a RPC method is unavailable.
A method is considered unavailable when some component that exposed the method is currently disconnected from the broker.
Fields
topic::String
: service name
Base.close
— Methodclose(rb::Rembus.Twin)
Close the connection and terminate the component.
Base.isopen
— Methodisopen(rb::Rembus.Twin) -> Bool
Check if the component is connected to the broker.
Base.wait
— Methodwait(rb::Rembus.Twin)
Wait for RPC requests and Pub/Sub messages.
Rembus.authorize
— Methodfunction authorize(rb, client::AbstractString, topic::AbstractString)
Authorize the client
component to use the private topic
.
The component must have the admin role for granting topic accessibility.
Rembus.broker
— Functionbroker(; <keyword arguments>)
Start a broker node and return a handle for interacting with it.
The broker acts as a central node to manage routing of RPC requests and Pub/Sub messages between nodes.
It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security, authentication, and routing policies.
Keyword arguments
name::AbstractString="broker"
: The unique identifier for the broker supervisor process.ws=nothing
: The WebSocket (ws/wss) listening port. Set tonothing
to disable.tcp=nothing
: The TCP (tcp/tls) listening port. Set tonothing
to disable.zmq=nothing
: The ZMQ Router listening port. Set tonothing
to disable.prometheus=nothing
: The Prometheus port for scraping monitoring metrics. Set tonothing
to disable.secure=false
: Iftrue
, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.authenticated=false
: Iftrue
, only allows connections from named and authenticated nodes.policy::String="first_up"
: The routing policy used when topics are served by multiple nodes. Possible options include:"first_up"
: Selects the first connected node from the list of nodes exposing the RPC method."round_robin"
: Distributes requests evenly across nodes in a round-robin fashion."less_busy"
: Chooses the node with fewer outstanding requests.
Default Behavior
If
ws
,tcp
, andzmq
are all set tonothing
, the broker will default to listening for WebSocket connections on port8000
.
Rembus.component
— Methodcomponent(url::AbstractString; <keyword arguments>)
Start a component node and return a handle for interacting with it.
In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.
The url
argument specifies the connection details for the component. For example, the URL ws://127.0.0.1:8000/foo
specifies:
- Protocol:
ws
(WebSocket). Other supported protocols:wss
,tcp
,tls
,zmq
. - Address:
127.0.0.1
(localhost). - Port:
8000
. - Component Name:
foo
.
Anonymous connections omit the path part of the URL.
If not specified, Rembus considers the above values as the default values.
This means the URL ws://127.0.0.1:8000/foo
is equivalent to simply foo
.
Additionally, a component may listen for incoming connections on configured ports, enabling it to act as a broker. These ports are specified using keyword arguments.
Keyword Arguments
name=missing
: Unique string identifier for the component's supervisor process. Defaults to the path part of theurl
argument ifmissing
.ws=nothing
: WebSocket (ws/wss) listening port. Set tonothing
to disable.tcp=nothing
: TCP (tcp/tls) listening port. Set tonothing
to disable.zmq=nothing
: ZMQ Router listening port. Set tonothing
to disable.secure=false
: Iftrue
, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.authenticated=false
: Iftrue
, only allows connections from named and authenticated nodes.policy::String="first_up"
: The routing policy used when topics are served by multiple nodes. Options:"first_up"
: Selects the first connected node from the list of nodes exposing the RPC method."round_robin"
: Distributes requests evenly across nodes in a round-robin fashion."less_busy"
: Chooses the node with fewer outstanding requests.
Rembus.component
— Methodcomponent(urls::Vector)
Start a component that connects to a pool of nodes defined by the urls
array.
Rembus.connect
— Methodconnect(url::AbstractString) -> Rembus.Twin
Connect to the remote endpoint defined by url
.
A disconnection from the remote endpoint will not trigger automatic reconnection.
Example
rb = connect("ws://127.0.0.1:8000/mycomponent")
Rembus.do_connect
— Methoddo_connect(twin::Twin)
Connect to the endpoint declared with REMBUS_BASE_URL
env variable.
REMBUS_BASE_URL
default to ws://127.0.0.1:8000
A component is considered anonymous when a different and random UUID is used as component identifier each time the application connect to the broker.
Rembus.expose
— Methodexpose(rb, topic::AbstractString, fn::Function)
expose(rb, fn::Function)
Expose the methods of function fn
to be executed by rpc clients using topic
as RPC method name.
If the topic
argument is omitted the function name equals to the RPC method name.
fn
returns the RPC response.
Expose the methods of function fn
to be invoked by RPC clients, using topic
as the RPC method name.
If the topic
argument is omitted, the function name is used as the RPC method name.
fn
is expected to return the RPC response. Any exceptions thrown by fn
are caught and returned as an RPC exception.
Rembus.get_private_topics
— Methodget_private_topics(rb)
Return a dictionary mapping private topics to their lists of authorized components.
Rembus.ifdown_block
— Methodifdown_block(rb::Rembus.Twin)
Block rpc
and publish
calls until the twin is connected to the broker.
Rembus.inject
— Functioninject(rb, ctx)
Bind a ctx
context to the rb
component.
When a ctx
context is bound, the method signatures of subscribed and exposed functions change as follows:
- the first argument is the
ctx
context. - the second argument is the
rb
component. - The remaining arguments correspond to the RPC request's arguments.
The ctx
is useful for maintaining local state contextualized to the rb
component.
Example
using Rembus
# keep the number of processed messages
mutable struct Context
msgcount::UInt
end
function topic(context::Context, rb, arg1, arg2)
context.msgcount += 1
some_logic(arg1, arg2)
end
ctx = Context(0)
rb = component("myname")
subscribe(rb, topic)
inject(rb, ctx)
In this example, when a message is published:
publish(rb, "topic", arg1, arg2)
The invoked method will receive the context and component as the first two arguments:
foo(container, rb, arg2, arg2)
Rembus.isauthenticated
— Methodisauthenticated(rb)
Return true if the component is authenticated.
Rembus.private_topic
— Methodprivate_topic(rb, topic::AbstractString)
Set the specified topic
to private.
The component must have the admin role to change the privacy level.
Rembus.public_topic
— Methodpublic_topic(rb, topic::AbstractString)
Set the specified topic
to public.
The component must have the admin role to change the privacy level.
Rembus.publish
— Methodpublish(rb, topic::AbstractString, data...; qos=Rembus.QOS0)
Publish (Vararg
) data values to a specified topic
.
Each item in data
is mapped to an argument of the remote method subscribed to the topic
.
The data
values can be of any type. However, if the components are implemented in different languages, the values must be either DataFrames
or primitive types that are CBOR-encodable (see RFC 8949) for interoperability.
The keywork argument qos
defines the quality of service (QoS) for message delivery. Possible values:
Rembus.QOS0
: (default): At most one message is delivered to the subscriber (message may be lost).Rembus.QOS1
: At least one message is delivered to the subscriber (message may be duplicated).Rembus.QOS2
: Exactly one message is delivered to the subscriber.
Examples
If the subscriber is a method that expects two arguments:
mytopic(x, y) = ... # do something with x and y
You can publish a message with (Vararg
) data consisting of two values:
rb = component("myname")
publish(rb, "mytopic", 1, 2)
If the remote subscribed method has no arguments invoke publish
as:
publish(rb, "mytopic")
Rembus.reactive
— Functionreactive(rb, from::Union{Real,Period,Dates.CompoundPeriod}=Day(1))
Enable the reception of published messages for topics to which the node is subscribed via subscribe
.
The from
argument specifies the starting point in time from which messages published while the component was offline will be delivered upon reconnection.
This value applies to all subscribed topics but can be overridden by the from
argument in the subscribe
method for a specific topic — though only to define narrower time ranges.
Possible from
values:
Rembus.Now
: Equivalent to0.0
, ignores old messages, and starts receiving only new messages from now.Rembus.LastReceived
: Receives all messages published since the last disconnection.n::Float64
: Receives messages published within the lastn
seconds.Dates.CompoundPeriod
: Defines a custom period using aCompoundPeriod
value.
Example
rb = component("myname")
subscribe(rb, "mytopic1", Rembus.Now)
subscribe(rb, "mytopic2", Rembus.LastReceived)
subscribe(rb, "mytopic3", Hour(1))
reactive(rb, Day(1))
In this example:
mytopic1
will receive messages starting from now.mytopic2
will receive messages published within the last day, even ifsubscribe()
usesRembus.LastReceived
.mytopic3
will receive messages published within the last hour.
Rembus.register
— Methodregister(
cid::AbstractString,
pin::AbstractString;
scheme::UInt8
)
Register the component with name cid
.
To register a component a single pin
or a set of tenants must be configured in the tenants.json
file.
The pin
shared secret is a 8 hex digits string (for example "deedbeef").
Rembus.request_timeout!
— Methodrequest_timeout!(rb, value::Real)
Set the request timeout value for the component rb
.
Rembus.request_timeout
— Methodrequest_timeout(rb)
Get the request timeout value for the component rb
.
Rembus.rid
— Methodrid(rb::Rembus.Twin) -> String
Return the identifier of the component (R
embus ID
entifier).
rb = component("ws://myhost.org:8000/myname")
rid(rb) === "myname"
Rembus.rpc
— Methodrpc(rb, service::AbstractString, data...)
Make a request for a remote service
method using Vararg data
values as method arguments.
Return a value or throw an error if a request timeout occurs or the remotely invoked method throws an exception.
Example
rb = component("myclient")
# invoke mysum(1, 2) on the remote site
result = rpc(rb, "mysum", 1, 2)
Rembus.server
— Methodserver(; <keyword arguments>)
Start a server node and return a handle for interacting with it.
The server accepts connection from client nodes.
It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security and authentication.
Keyword arguments
name::AbstractString="broker"
: The unique identifier for the server supervisor process.ws=nothing
: The WebSocket (ws/wss) listening port. Set tonothing
to disable.tcp=nothing
: The TCP (tcp/tls) listening port. Set tonothing
to disable.zmq=nothing
: The ZMQ Router listening port. Set tonothing
to disable.prometheus=nothing
: The Prometheus port for scraping monitoring metrics. Set tonothing
to disable.secure=false
: Iftrue
, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.authenticated=false
: Iftrue
, only allows connections from named and authenticated nodes.
Default Behavior
If ws
, tcp
, and zmq
are all set to nothing
, the broker will default to listening for WebSocket connections on port 8000
.
Rembus.start_broker
— Methodstart_broker(;
wait=true,
secure=nothing,
ws=nothing,
tcp=nothing,
zmq=nothing,
http=nothing,
prometheus=nothing,
name="broker",
authenticated=false,
reset=nothing,
plugin=nothing,
context=nothing
)
Start the node.
Return immediately when wait
is false, otherwise blocks until shutdown is requested.
Overwrite command line arguments if args is not empty.
Rembus.subscribe
— Functionsubscribe(rb, fn::Function, from=Rembus.Now)
Subscribe to messages published to a topic
and register a callback function fn
to handle incoming messages.
If the topic
argument is omitted, the function name must be equal to the topic name.
The from
(default=Rembus.Now
) argument defines the starting point in time from which messages published while the component was offline will be sent upon reconnection. Possible from
values:
Rembus.Now
: Equivalent to0.0
, ignores old messages, and starts receiving only new messages from now.Rembus.LastReceived
: Receives all messages published since the last disconnection.n::Float64
: Receives messages published within the lastn
seconds.Dates.CompoundPeriod
: Defines a custom period using aCompoundPeriod
value.
To enable the reception of published messages, the reactive
function must be called.
Example
rb = component("myname")
# Define a callback function for the "mytopic" topic
function mytopic(data)
println("Received: ", data)
end
# Subscribe to "mytopic", receiving only new messages from now
subscribe(rb, mytopic, from=Rembus.Now)
reactive(rb)
Rembus.subscribe
— Functionsubscribe(rb, topic::AbstractString, fn::Function, from=Rembus.Now)
Rembus.unauthorize
— Methodfunction unauthorize(rb, client::AbstractString, topic::AbstractString)
Revoke authorization to the client
component for use of the private topic
.
The component must have the admin role for revoking topic accessibility.
Rembus.unexpose
— Methodunexpose(rb, service::AbstractString)
Stop servicing RPC requests targeting service
.
Rembus.unexpose
— Methodunexpose(rb, fn::Function)
Stop servicing RPC requests targeting fn
function.
Rembus.unreactive
— Methodunreactive(rb)
Stops the delivery of published messages to the rb
component.
Rembus.unregister
— Methodunregister(twin)
Unregister the connected component.
Only a connected and authenticated component may execute the unregister command.
using Rembus
twin = connect("authenticated_component")
Rembus.unregister(twin)
close(twin)
Rembus.unsubscribe
— Methodunsubscribe(rb, topic::AbstractString)
Stops delivering messages published on the specified topic
to the rb
component.
Rembus.unsubscribe
— Methodunsubscribe(rb, fn::Function)
Stops delivering messages to the specified fn
function.
Visor.shutdown
— Methodshutdown(rb::Rembus.Twin)
Close the connection and terminate the component.
Rembus.@component
— Macro@component "url"
Set the name of the component and the protocol for connecting to the broker.
url
may be:
- "myname": use $REMBUS_BASE_URL for connection parameters
- "tcp://host:port/myname": tcp connection
- "ws://host:port/myname": web socket connection
- "zmq://host:port/myname": ZeroMQ connection
Rembus.@expose
— Macro@expose function fn(arg1,...)
...
end
Expose the function expression.
Rembus.@expose
— Macro@expose fn
Expose all the methods of the function fn
.
Example
Expose the function mycalc
that implements a service that may accept two numbers or a string and number:
mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y
@expose mycalc
Call mycal
service using the correct types of arguments:
# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003
# ok
julia> response = @rpc mycalc("hello",2.0)
10.0
If the RPC client call mycalc
with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException
julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
Rembus.@inject
— Macro @inject container
Binds a container
object, which is passed as the first argument to subscribed component functions.
See inject
for more details.
Rembus.@publish
— Macro@publish topic(arg1,arg2,...)
Publish a message to topic
logic channel.
The function topic(arg1,arg2,...)
will be called on each connected component subscribed to topic
.
Publisher
@publish foo("gfr", 54.2)
Subscriber
function foo(name, value)
println("do something with $name=$value")
end
@subscribe foo
@reactive
supervise()
Rembus.@reactive
— Macro@reactive
The subscribed methods start to handle published messages.
Rembus.@rpc
— Macro@rpc service(arg1,...)
Call the remote service
method and return its outcome.
The outcome may be the a return value or a RpcMethodException
if the remote throws an exception.
The service
method must match the signature of an exposed remote service
method.
Components may subscribe to service
for receiving the service
request.
Exposer
function mymethod(x, y)
return evaluate(x,y)
end
@expose mymethod
supervise()
RPC client
response = @rpc mymethod(x,y)
Subscriber
function service(x, y)
...
end
@subscribe service
@reactive
supervise()
Rembus.@subscribe
— Macro@subscribe topic [mode]
Setup a subscription to topic
logic channel to handle messages from @publish
or @rpc
.
mode
values`:
from_now
(default): receive messages published from now.before_now
: receive messages published when the component was offline.
Messages starts to be delivered to topic
when reactivity is enabled with @reactive
macro.
Subscriber
function foo(arg1, arg2)
...
end
@subscribe foo
@reactive
supervise()
Publisher
@publish foo("gfr", 54.2)
Rembus.@subscribe
— Macro@subscribe function fn(args...)
...
end [mode]
Subscribe the function expression.
Rembus.@unexpose
— Macro@unexpose fn
The methods of fn
function is no more available to rpc clients.
Rembus.@unreactive
— Macro@unreactive
The subscribed methods stop to handle published messages.
Rembus.@unsubscribe
— Macro@unsubscribe mytopic
mytopic
's methods stop to handle messages published to topic mytopic
.
Rembus.@wait
— Macro@wait
Block forever waiting for Ctrl-C/InterruptException or root supervisor shutdown.