Rembus
Rembus is a middleware for Pub/Sub and RPC communication styles.
A Rembus node may play one o more roles:
- RPC client
- RPC server
- Pub/Sub publisher
- Pub/Sub subscriber
- Broker
This meshup of roles enables a to implements a set of distributed architectures.
Installation
using Pkg
Pkg.add("Rembus")
Broker
A Broker
is a process that routes messages between components.
A Broker
is capable of making components that use different transport protocols talk to each other. For example a component that uses a ZeroMQ socket may talk to a component that uses the WebSocket protocol.
Starting a Broker
is simple as:
using Rembus
broker()
Calling broker
without arguments start by default a WebSocket server listening on port 8000.
A startup script could be useful and the following broker
script will do:
#!/bin/bash
#=
SDIR=$( dirname -- "${BASH_SOURCE[0]}" )
BINDIR=$( cd -- $SDIR &> /dev/null && pwd )
exec julia -t auto --color=no -e "include(popfirst!(ARGS))" \
--project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using ArgParse
using Rembus
function command_line(default_name="broker")
s = ArgParseSettings()
@add_arg_table! s begin
"--name", "-n"
help = "broker name"
default = default_name
arg_type = String
"--reset", "-r"
help = "factory reset, clean up broker configuration"
action = :store_true
"--secure", "-s"
help = "accept wss and tls connections"
action = :store_true
"--authenticated", "-a"
help = "only authenticated components allowed"
action = :store_true
"--http", "-p"
help = "accept HTTP clients on port HTTP"
arg_type = UInt16
"--prometheus", "-m"
help = "prometheus exposer port"
arg_type = UInt16
"--ws", "-w"
help = "accept WebSocket clients on port WS"
arg_type = UInt16
"--tcp", "-t"
help = "accept tcp clients on port TCP"
arg_type = UInt16
"--zmq", "-z"
help = "accept zmq clients on port ZMQ"
arg_type = UInt16
"--policy"
help = "set the broker routing policy"
arg_type = Symbol
"--debug", "-d"
help = "enable debug logs"
action = :store_true
"--info", "-i"
help = "enable info logs"
action = :store_true
end
return parse_args(s)
end
args = command_line()
name = args["name"]
if args["reset"]
Rembus.broker_reset(name)
end
if args["debug"]
Rembus.debug!()
elseif args["info"]
Rembus.info!()
end
wait(Rembus.broker(
name=name,
ws=args["ws"],
tcp=args["tcp"],
zmq=args["zmq"],
prometheus=args["prometheus"],
authenticated=args["authenticated"]
))
broker
starts by default a WebSocket server listening on port 8000, for enabling tcp
and/or zmq
transports use the appropriate arguments:
shell> ./broker
usage: broker [-r] [-s] [-p HTTP] [-w WS] [-t TCP] [-z ZMQ] [-d] [-h]
optional arguments:
-r, --reset factory reset, clean up broker configuration
-s, --secure accept wss and tls connections
-p, --http HTTP accept HTTP clients on port HTTP (type: UInt16)
-w, --ws WS accept WebSocket clients on port WS (type: UInt16)
-t, --tcp TCP accept tcp clients on port TCP (type: UInt16)
-z, --zmq ZMQ accept zmq clients on port ZMQ (type: UInt16)
-d, --debug enable debug logs
-h, --help show this help message and exit
See Broker environment variables for customizing the runtime setting.
Component
A Component
is a process that plays one or more of the following roles:
- Publisher (Pub/Sub) : produce messages;
- Subscriber (Pub/Sub): consume published messages;
- Requestor (RPC): request a service;
- Exposer (RPC): execute a service request and give back a response;
There are three type of components:
- Anonymous
- Named
- Authenticated
An Anonymous
component assume a random and ephemeral identity each time it connects to the broker. Example usage for anonymous components may be:
- when it is not required to trace the originating source of messages;
- for a
Subscriber
not interested to receive messages published when it was offline; - for preliminary prototyping;
A Named
component has a unique and persistent name that make possible to receive messages published when the component was offline.
An Authenticated
component is a named component that own a private key or a shared secret which can prove its identity.
Only authenticated components may use Pub/Sub private topics and private RPC methods.
An URL string defines the identity and the connection parameters of a component. The Macro-based API page documents the URL format.
Index
Rembus.RembusError
Rembus.RembusTimeout
Rembus.RpcMethodException
Rembus.RpcMethodLoopback
Rembus.RpcMethodNotFound
Rembus.RpcMethodUnavailable
Rembus.authorize
Rembus.broker
Rembus.component
Rembus.component
Rembus.do_connect
Rembus.expose
Rembus.get_private_topics
Rembus.ifdown_block
Rembus.inject
Rembus.isauthenticated
Rembus.private_topic
Rembus.public_topic
Rembus.publish
Rembus.reactive
Rembus.register
Rembus.request_timeout
Rembus.request_timeout
Rembus.request_timeout!
Rembus.request_timeout!
Rembus.rid
Rembus.rpc
Rembus.server
Rembus.start_broker
Rembus.subscribe
Rembus.subscribe
Rembus.unauthorize
Rembus.unexpose
Rembus.unexpose
Rembus.unreactive
Rembus.unregister
Rembus.unsubscribe
Rembus.unsubscribe
Rembus.@component
Rembus.@expose
Rembus.@expose
Rembus.@inject
Rembus.@publish
Rembus.@reactive
Rembus.@rpc
Rembus.@subscribe
Rembus.@subscribe
Rembus.@unexpose
Rembus.@unreactive
Rembus.@unsubscribe
Rembus.@wait
Rembus.RembusError
— Typestruct RembusError <: Rembus.RembusException
Generic Rembus error.
Fields
code::UInt8
: error codetopic::Union{Nothing, String}
: topic name if availablereason::Union{Nothing, String}
: detailed error message
Rembus.RembusTimeout
— Typestruct RembusTimeout{T} <: Rembus.RembusException
Exception thrown when a response is not received before the request timeout expires.
Fields
msg::Any
: request message
Rembus.RpcMethodException
— Typestruct RpcMethodException <: Rembus.RembusException
Thrown when a RPC method throws an exception.
Fields
topic::String
: service namereason::String
: remote exception description
Example
A component exposes a method that expect a string argument.
@expose foo(name::AbstractString) = "hello " * name
A RPC client invoke the method with an integer argument.
try
@rpc foo(1)
catch e
@error e.reason
end
The result is an exception:
┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│ foo(!Matched::AbstractString)
│ @ Main REPL[2]:1
└ @ Main REPL
Rembus.RpcMethodLoopback
— Typestruct RpcMethodLoopback <: Rembus.RembusException
Thrown when a RPC request would invoke a locally exposed method.
Fields
topic::String
: service name
Rembus.RpcMethodNotFound
— Typestruct RpcMethodNotFound <: Rembus.RembusException
Exception thrown from a rpc request when the called method is unknown.
Fields
topic::String
: service name
Example
An RPC Client request a method that does not exist.
@rpc coolservice()
The result is an exception:
ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
Rembus.RpcMethodUnavailable
— Typestruct RpcMethodUnavailable <: Rembus.RembusException
Thrown when a RPC method is unavailable.
A method is considered unavailable when some component that exposed the method is currently disconnected from the broker.
Fields
topic::String
: service name
Rembus.authorize
— Methodfunction authorize(rb, client::AbstractString, topic::AbstractString)
Authorize the client
component to use the private topic
.
The component must have the admin role for granting topic accessibility.
Rembus.broker
— Functionbroker(; <keyword arguments>)
Start a broker node and return a handle for interacting with it.
The broker acts as a central node to manage routing of RPC requests and Pub/Sub messages between nodes.
It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security, authentication, and routing policies.
Keyword arguments
name::AbstractString="broker"
: The unique identifier for the broker supervisor process.ws=nothing
: The WebSocket (ws/wss) listening port. Set tonothing
to disable.tcp=nothing
: The TCP (tcp/tls) listening port. Set tonothing
to disable.zmq=nothing
: The ZMQ Router listening port. Set tonothing
to disable.prometheus=nothing
: The Prometheus port for scraping monitoring metrics. Set tonothing
to disable.secure=false
: Iftrue
, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.authenticated=false
: Iftrue
, only allows connections from named and authenticated nodes.policy::String="first_up"
: The routing policy used when topics are served by multiple nodes. Possible options include:"first_up"
: Selects the first connected node from the list of nodes exposing the RPC method."round_robin"
: Distributes requests evenly across nodes in a round-robin fashion."less_busy"
: Chooses the node with fewer outstanding requests.
Default Behavior
If
ws
,tcp
, andzmq
are all set tonothing
, the broker will default to listening for WebSocket connections on port8000
.
Rembus.component
— Methodcomponent(url::AbstractString; <keyword arguments>)
Start a component node and return a handle for interacting with it.
In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.
The url
argument specifies the connection details for the component. For example, the URL ws://127.0.0.1:8000/foo
specifies:
- Protocol:
ws
(WebSocket). Other supported protocols:wss
,tcp
,tls
,zmq
. - Address:
127.0.0.1
(localhost). - Port:
8000
. - Component Name:
foo
.
Anonymous connections omit the path part of the URL.
If not specified, Rembus considers the above values as the default values.
This means the URL ws://127.0.0.1:8000/foo
is equivalent to simply foo
.
Additionally, a component may listen for incoming connections on configured ports, enabling it to act as a broker. These ports are specified using keyword arguments.
Keyword Arguments
name=missing
: Unique string identifier for the component's supervisor process. Defaults to the path part of theurl
argument ifmissing
.ws=nothing
: WebSocket (ws/wss) listening port. Set tonothing
to disable.tcp=nothing
: TCP (tcp/tls) listening port. Set tonothing
to disable.zmq=nothing
: ZMQ Router listening port. Set tonothing
to disable.secure=false
: Iftrue
, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.authenticated=false
: Iftrue
, only allows connections from named and authenticated nodes.policy::String="first_up"
: The routing policy used when topics are served by multiple nodes. Options:"first_up"
: Selects the first connected node from the list of nodes exposing the RPC method."round_robin"
: Distributes requests evenly across nodes in a round-robin fashion."less_busy"
: Chooses the node with fewer outstanding requests.
Rembus.component
— Methodcomponent(urls::Vector)
Start a component that connects to a pool of nodes defined by the urls
array.
Rembus.do_connect
— Methoddo_connect(twin::Twin)
Connect to the endpoint declared with REMBUS_BASE_URL
env variable.
REMBUS_BASE_URL
default to ws://127.0.0.1:8000
A component is considered anonymous when a different and random UUID is used as component identifier each time the application connect to the broker.
Rembus.expose
— Methodexpose(rb, topic::AbstractString, fn::Function)
expose(rb, fn::Function)
Expose the methods of function fn
to be executed by rpc clients using topic
as RPC method name.
If the topic
argument is omitted the function name equals to the RPC method name.
fn
returns the RPC response.
Expose the methods of function fn
to be invoked by RPC clients, using topic
as the RPC method name.
If the topic
argument is omitted, the function name is used as the RPC method name.
fn
is expected to return the RPC response. Any exceptions thrown by fn
are caught and returned as an RPC exception.
Rembus.get_private_topics
— Methodget_private_topics(rb)
Return a dictionary mapping private topics to their lists of authorized components.
Rembus.ifdown_block
— MethodRembus.inject
— Functioninject(rb, ctx)
Bind a ctx
context to the rb
component.
When a ctx
context is bound, the method signatures of subscribed and exposed functions change as follows:
- the first argument is the
ctx
context. - the second argument is the
rb
component. - The remaining arguments correspond to the RPC request's arguments.
The ctx
is useful for maintaining local state contextualized to the rb
component.
Example
using Rembus
# keep the number of processed messages
mutable struct Context
msgcount::UInt
end
function topic(context::Context, rb, arg1, arg2)
context.msgcount += 1
some_logic(arg1, arg2)
end
ctx = Context(0)
rb = component("myname")
subscribe(rb, topic)
inject(rb, ctx)
In this example, when a message is published:
publish(rb, "topic", arg1, arg2)
The invoked method will receive the context and component as the first two arguments:
foo(container, rb, arg2, arg2)
Rembus.isauthenticated
— Methodisauthenticated(rb)
Return true if the component is authenticated.
Rembus.private_topic
— Methodprivate_topic(rb, topic::AbstractString)
Set the specified topic
to private.
The component must have the admin role to change the privacy level.
Rembus.public_topic
— Methodpublic_topic(rb, topic::AbstractString)
Set the specified topic
to public.
The component must have the admin role to change the privacy level.
Rembus.publish
— Methodpublish(rb, topic::AbstractString, data...; qos=Rembus.QOS0)
Publish (Vararg
) data values to a specified topic
.
Each item in data
is mapped to an argument of the remote method subscribed to the topic
.
The data
values can be of any type. However, if the components are implemented in different languages, the values must be either DataFrames
or primitive types that are CBOR-encodable (see RFC 8949) for interoperability.
The keywork argument qos
defines the quality of service (QoS) for message delivery. Possible values:
Rembus.QOS0
(default): At most one message is delivered to the subscriber (message may be lost).QOS1
: At least one message is delivered to the subscriber (message may be duplicated).Rembus.QOS2
: Exactly one message is delivered to the subscriber.
Examples
If the subscriber is a method that expects two arguments:
mytopic(x, y) = ... # do something with x and y
You can publish a message with (Vararg
) data consisting of two values:
rb = component("myname")
publish(rb, "mytopic", 1, 2)
If the remote subscribed method has no arguments invoke publish
as:
publish(rb, "mytopic")
Rembus.reactive
— Functionreactive(rb, from::Union{Real,Period,Dates.CompoundPeriod}=Day(1))
Enable the reception of published messages for topics to which the node is subscribed via subscribe
.
The from
argument specifies the starting point in time from which messages published while the component was offline will be delivered upon reconnection.
This value applies to all subscribed topics but can be overridden by the from
argument in the subscribe
method for a specific topic — though only to define narrower time ranges.
Possible from
values:
Rembus.Now
: Equivalent to0.0
, ignores old messages, and starts receiving only new messages from now.Rembus.LastReceived
: Receives all messages published since the last disconnection.n::Float64
: Receives messages published within the lastn
seconds.Dates.CompoundPeriod
: Defines a custom period using aCompoundPeriod
value.
Example
rb = component("myname")
subscribe(rb, "mytopic1", Rembus.Now)
subscribe(rb, "mytopic2", Rembus.LastReceived)
subscribe(rb, "mytopic3", Hour(1))
reactive(rb, Day(1))
In this example:
mytopic1
will receive messages starting from now.mytopic2
will receive messages published within the last day, even ifsubscribe()
usesRembus.LastReceived
.mytopic3
will receive messages published within the last hour.
Rembus.register
— Methodregister(
cid::AbstractString,
pin::AbstractString;
tenant=Union{Nothing, AbstractString} = nothing,
scheme::UInt8
)
Register the component with name cid
.
To register a component a single pin
or a set of tenants must be configured in the tenants.json
file.
The pin
shared secret is a 8 hex digits string (for example "deedbeef").
Rembus.request_timeout!
— Methodrequest_timeout!(value::Real)
Set the default request timeout used when creating new nodes with the broker
, component
, connect
, or server
functions.
Rembus.request_timeout!
— Methodrequest_timeout!(rb, value::Real)
Set the request timeout value for the component rb
.
Rembus.request_timeout
— Methodrequest_timeout(rb)
Get the request timeout value for the component rb
.
Rembus.request_timeout
— Methodrequest_timeout()
Returns the default request timeout used when creating new nodes with the broker
, component
, connect
, or server
functions.
Rembus.rid
— Methodrid(rb)
Return the identifier of the component (R
embus ID
entifier).
rb = component("ws://myhost.org:8000/myname")
rid(rb) === "myname"
Rembus.rpc
— Methodrpc(rb, service::AbstractString, data...)
Make a request for a remote service
method using Vararg data
values as method arguments.
Return a value or throw an error if a request timeout occurs or the remotely invoked method throws an exception.
Example
rb = component("myclient")
# invoke mysum(1, 2) on the remote site
result = rpc(rb, "mysum", 1, 2)
Rembus.server
— Methodserver(; <keyword arguments>)
Start a server node and return a handle for interacting with it.
The server accepts connection from client nodes.
It supports multiple communication protocols (WebSocket, TCP, and ZMQ) and allows for customizable security and authentication.
Keyword arguments
name::AbstractString="broker"
: The unique identifier for the server supervisor process.ws=nothing
: The WebSocket (ws/wss) listening port. Set tonothing
to disable.tcp=nothing
: The TCP (tcp/tls) listening port. Set tonothing
to disable.zmq=nothing
: The ZMQ Router listening port. Set tonothing
to disable.prometheus=nothing
: The Prometheus port for scraping monitoring metrics. Set tonothing
to disable.secure=false
: Iftrue
, enables WSS (WebSocket Secure) and TLS (Transport Layer Security) protocols for encrypted connections.authenticated=false
: Iftrue
, only allows connections from named and authenticated nodes.
Default Behavior
If ws
, tcp
, and zmq
are all set to nothing
, the broker will default to listening for WebSocket connections on port 8000
.
Rembus.start_broker
— Methodstart_broker(;
wait=true,
secure=nothing,
ws=nothing,
tcp=nothing,
zmq=nothing,
http=nothing,
prometheus=nothing,
name="broker",
authenticated=false,
reset=nothing,
plugin=nothing,
context=nothing
)
Start the node.
Return immediately when wait
is false, otherwise blocks until shutdown is requested.
Overwrite command line arguments if args is not empty.
Rembus.subscribe
— Functionsubscribe(rb, topic::AbstractString, fn::Function, from=Rembus.Now)
Rembus.subscribe
— Functionsubscribe(rb, fn::Function, from=Rembus.Now)
Subscribe to messages published to a topic
and register a callback function fn
to handle incoming messages.
If the topic
argument is omitted, the function name must be equal to the topic name.
The from
(default=Rembus.Now
) argument defines the starting point in time from which messages published while the component was offline will be sent upon reconnection. Possible from
values:
Rembus.Now
: Equivalent to0.0
, ignores old messages, and starts receiving only new messages from now.Rembus.LastReceived
: Receives all messages published since the last disconnection.n::Float64
: Receives messages published within the lastn
seconds.Dates.CompoundPeriod
: Defines a custom period using aCompoundPeriod
value.
Example
rb = component("myname")
# Define a callback function for the "mytopic" topic
function mytopic(data)
println("Received: ", data)
end
# Subscribe to "mytopic", receiving only new messages from now
subscribe(rb, mytopic, from=Rembus.Now)
Rembus.unauthorize
— Methodfunction unauthorize(rb, client::AbstractString, topic::AbstractString)
Revoke authorization to the client
component for use of the private topic
.
The component must have the admin role for revoking topic accessibility.
Rembus.unexpose
— Methodunexpose(rb, service::AbstractString)
Stop servicing RPC requests targeting service
.
Rembus.unexpose
— Methodunexpose(rb, fn::Function)
Stop servicing RPC requests targeting fn
function.
Rembus.unreactive
— Methodunreactive(rb)
Stops the delivery of published messages to the rb
component.
Rembus.unregister
— Methodunregister(twin)
Unregister the connected component.
Only a connected and authenticated component may execute the unregister command.
using Rembus
twin = connect("authenticated_component")
Rembus.unregister(twin)
close(twin)
Rembus.unsubscribe
— Methodunsubscribe(rb, topic::AbstractString)
Stops delivering messages published on the specified topic
to the rb
component.
Rembus.unsubscribe
— Methodunsubscribe(rb, fn::Function)
Stops delivering messages to the specified fn
function.
Rembus.@component
— Macro@component "url"
Set the name of the component and the protocol for connecting to the broker.
url
may be:
- "myname": use $REMBUS_BASE_URL for connection parameters
- "tcp://host:port/myname": tcp connection
- "ws://host:port/myname": web socket connection
- "zmq://host:port/myname": ZeroMQ connection
Rembus.@expose
— Macro@expose function fn(arg1,...)
...
end
Expose the function expression.
Rembus.@expose
— Macro@expose fn
Expose all the methods of the function fn
.
Example
Expose the function mycalc
that implements a service that may accept two numbers or a string and number:
mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y
@expose mycalc
Call mycal
service using the correct types of arguments:
# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003
# ok
julia> response = @rpc mycalc("hello",2.0)
10.0
If the RPC client call mycalc
with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException
julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
Rembus.@inject
— Macro @inject container
Binds a container
object, which is passed as the first argument to subscribed component functions.
See inject
for more details.
Rembus.@publish
— Macro@publish topic(arg1,arg2,...)
Publish a message to topic
logic channel.
The function topic(arg1,arg2,...)
will be called on each connected component subscribed to topic
.
Publisher
@publish foo("gfr", 54.2)
Subscriber
function foo(name, value)
println("do something with $name=$value")
end
@subscribe foo
@reactive
supervise()
Rembus.@reactive
— Macro@reactive
The subscribed methods start to handle published messages.
Rembus.@rpc
— Macro@rpc service(arg1,...)
Call the remote service
method and return its outcome.
The outcome may be the a return value or a RpcMethodException
if the remote throws an exception.
The service
method must match the signature of an exposed remote service
method.
Components may subscribe to service
for receiving the service
request.
Exposer
function mymethod(x, y)
return evaluate(x,y)
end
@expose mymethod
supervise()
RPC client
response = @rpc mymethod(x,y)
Subscriber
function service(x, y)
...
end
@subscribe service
@reactive
supervise()
Rembus.@subscribe
— Macro@subscribe topic [mode]
Setup a subscription to topic
logic channel to handle messages from @publish
or @rpc
.
mode
values`:
from_now
(default): receive messages published from now.before_now
: receive messages published when the component was offline.
Messages starts to be delivered to topic
when reactivity is enabled with @reactive
macro.
Subscriber
function foo(arg1, arg2)
...
end
@subscribe foo
@reactive
supervise()
Publisher
@publish foo("gfr", 54.2)
Rembus.@subscribe
— Macro@subscribe function fn(args...)
...
end [mode]
Subscribe the function expression.
Rembus.@unexpose
— Macro@unexpose fn
The methods of fn
function is no more available to rpc clients.
Rembus.@unreactive
— Macro@unreactive
The subscribed methods stop to handle published messages.
Rembus.@unsubscribe
— Macro@unsubscribe mytopic
mytopic
's methods stop to handle messages published to topic mytopic
.
Rembus.@wait
— Macro@wait
Block forever waiting for Ctrl-C/InterruptException or root supervisor shutdown.