Rembus
Rembus is a middleware for Pub/Sub and RPC communication styles.
There are two types of processes: Components and Brokers.
- A Component connect to a Broker;
- A Broker dispatch messages between Components;
- A Component expose RPC services and/or subscribe to Pub/Sub topics;
- A Component make RPC requests and/or publish messages to Pub/Sub topics;
Installation
using Pkg
Pkg.add("Rembus")
Rembus installs and compiles in a minute or two.
Broker
A Broker
is a process that routes messages between components.
A Broker
is capable of making components that use different transport protocols talk to each other. For example a component that uses a ZeroMQ socket may talk to a component that uses the WebSocket protocol.
Starting a Broker
is simple as:
using Rembus
broker()
A startup script could be useful and the following broker
script will do:
##!/bin/bash
#=
BINDIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
exec julia --threads auto --color=no -e "include(popfirst!(ARGS))" \
--project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using Rembus
broker()
broker
starts by default a WebSocket server listening on port 8000, for enabling tcp
and/or zmq
transports use the appropriate arguments:
shell> ./broker
usage: broker [-r] [-s] [-p HTTP] [-w WS] [-t TCP] [-z ZMQ] [-d] [-h]
optional arguments:
-r, --reset factory reset, clean up broker configuration
-s, --secure accept wss and tls connections
-p, --http HTTP accept HTTP clients on port HTTP (type: UInt16)
-w, --ws WS accept WebSocket clients on port WS (type: UInt16)
-t, --tcp TCP accept tcp clients on port TCP (type: UInt16)
-z, --zmq ZMQ accept zmq clients on port ZMQ (type: UInt16)
-d, --debug enable debug logs
-h, --help show this help message and exit
See Broker environment variables for customizing the runtime setting.
Component
A Component
is a process that plays one or more of the following roles:
- Publisher (Pub/Sub) : produce messages;
- Subscriber (Pub/Sub): consume published messages;
- Requestor (RPC): request a service;
- Exposer (RPC): execute a service request and give back a response;
There are three type of components:
- Anonymous
- Named
- Authenticated
An Anonymous
component assume a random and ephemeral identity each time it connects to the broker. Example usage for anonymous components may be:
- when it is not required to trace the originating source of messages;
- for a
Subscriber
not interested to receive messages published when it was offline; - for preliminary prototyping;
A Named
component has a unique and persistent name that make possible to receive messages published when the component was offline.
An Authenticated
component is a named component that own a private key or a shared secret which can prove its identity.
Only authenticated components may use Pub/Sub private topics and private RPC methods.
An URL string defines the identity and the connection parameters of a component. The Macro-based API page documents the URL format.
Index
Rembus.RembusTimeout
Rembus.RpcMethodException
Rembus.RpcMethodLoopback
Rembus.RpcMethodNotFound
Rembus.RpcMethodUnavailable
Rembus.authorize
Rembus.broker
Rembus.component
Rembus.component
Rembus.connect
Rembus.connect
Rembus.expose
Rembus.forever
Rembus.forever
Rembus.forever
Rembus.inject
Rembus.isauthenticated
Rembus.private_topic
Rembus.public_topic
Rembus.publish
Rembus.reactive
Rembus.register
Rembus.rpc
Rembus.server
Rembus.subscribe
Rembus.unauthorize
Rembus.unexpose
Rembus.unreactive
Rembus.unregister
Rembus.unsubscribe
Rembus.@component
Rembus.@expose
Rembus.@expose
Rembus.@forever
Rembus.@inject
Rembus.@publish
Rembus.@reactive
Rembus.@reactive
Rembus.@rpc
Rembus.@rpc_timeout
Rembus.@subscribe
Rembus.@subscribe
Rembus.@terminate
Rembus.@unexpose
Rembus.@unreactive
Rembus.@unsubscribe
Rembus.RembusTimeout
— TypeRembusTimeout
Thrown when a response it is not received.
Rembus.RpcMethodException
— TypeRpcMethodException
Thrown when a RPC method throws an exception.
Fields
cid
: component nametopic
: service namereason
: remote exception description
Exposer
@expose foo(name::AbstractString) = "hello " * name
RPC client
try
@rpc foo(1)
catch e
@error e.reason
end
Output:
┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│ foo(!Matched::AbstractString)
│ @ Main REPL[2]:1
└ @ Main REPL
Rembus.RpcMethodLoopback
— TypeRpcMethodLoopback
Thrown when a RPC request would invoke a locally exposed method.
Fields
cid
: component nametopic
: service name
Rembus.RpcMethodNotFound
— TypeRpcMethodNotFound
is thrown from a rpc request when the called method is unknown.
fields:
cid
: component nametopic
: service name
RPC Client
@rpc coolservice()
Output:
ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
Rembus.RpcMethodUnavailable
— TypeRpcMethodUnavailable
Thrown when a RPC method is unavailable.
A method is considered unavailable when some component that exposed the method is currently disconnected from the broker.
Fields
cid
: component nametopic
: service name
Rembus.authorize
— Methodfunction authorize(
rb::RBHandle, client::AbstractString, topic::AbstractString;
raise=true
)
Authorize the client
component to use the private topic
.
The component must have the admin role for granting topic accessibility.
Rembus.broker
— Methodbroker(;
wait=true,
secure=nothing,
ws=nothing,
tcp=nothing,
zmq=nothing,
http=nothing,
name="broker",
policy=:first_up,
mode=nothing,
reset=nothing,
log="info",
plugin=nothing,
context=nothing
)
Start the broker.
Return immediately when wait
is false, otherwise blocks until shutdown is requested.
Overwrite command line arguments if args is not empty.
Rembus.component
— Functioncomponent(url)
Connect rembus component defined by url
.
The connection is supervised and network faults starts connection retries attempts until successful outcome.
Rembus.component
— Functioncomponent(urls::Vector)
Connect component to remotes defined be urls
array.
The connection pool is supervised.
Rembus.connect
— Methodconnect(url::AbstractString)::RBHandle
Connect to the broker.
The returned rembus handler do not auto-reconnect in case of a fault condition.
The returned RBHandle
handle represents a connected component used for the Rembus APIs. For example:
using Rembus
rb = connect("mycomponent")
publish(rb, "temperature", ["room_1", 21.5])
The url
argument string is formatted as:
url = [<protocol>://][<host>][:<port>/]<cid>
<protocol>
is one of:
ws
web socketwss
secure web sockettcp
tcp sockettls
TLS over tcp socketzmq
ZeroMQ socket
<host>
and <port>
are the hostname/ip and the port of the listening broker.
<cid>
is the unique name of the component.
Rembus.connect
— Methodconnect()
Connect anonymously to the endpoint declared with REMBUS_BASE_URL
env variable.
REMBUS_BASE_URL
default to ws://127.0.0.1:8000
A component is considered anonymous when a different and random UUID is used as component identifier each time the application connect to the broker.
Rembus.expose
— Methodexpose(rb::RBHandle, fn::Function; raise=true, wait=true)
expose(rb::RBHandle, topic::AbstractString, fn::Function; raise=true, wait=true)
Expose the methods of function fn
to be executed by rpc clients using topic
as RPC method name.
If the topic
argument is omitted the function name equals to the RPC method name.
fn
returns the RPC response.
Rembus.forever
— Methodforever(rb::RBHandle)
Start the event loop awaiting to execute exposed and subscribed methods.
Rembus.forever
— Methodforever(server::Server; wait=true, secure=false)
Start an embedded server and accept connections.
Rembus.forever
— Methodforever(rb::Visor.Process)
Start the event loop awaiting to execute exposed and subscribed methods.
Rembus.inject
— Functioninject(rb::RBHandle, ctx)
Bind a ctx
context object to the rb
component.
When a ctx
context object is bound then it will be the first argument of subscribed and exposed methods.
See @inject
for more details.
Rembus.isauthenticated
— Methodisauthenticated(session)
Return true if the connected component is authenticated.
Rembus.private_topic
— Methodprivate_topic(rb::RBHandle, topic::AbstractString; raise=true)
Set the topic
to private.
The component must have the admin role for changing the privateness level.
Rembus.public_topic
— Methodpublic_topic(rb::RBHandle, topic::AbstractString; raise=true)
Set the topic
to public.
The component must have the admin role for changing the privateness level.
Rembus.publish
— Functionpublish(rb::RBHandle, topic::AbstractString, data=[]; qos=QOS0)
Publish data
values on topic
.
data
may be a value or a vector of values. Each value map to the arguments of the subscribed method.
For example if the subscriber is a method that expects two arguments:
mytopic(x,y) = @info "x=$x, y=$y"
The published message needs an array of two elements:
publish(rb, "mytopic", [1, 2])
When a subscribed method expect one argument instead of passing an array of one element it may be better to pass the value:
mytopic(x) = @info "x=$x"
publish(rb, "mytopic", 1)
If the subscribed method has no arguments invoke publish
as:
mytopic() = @info "mytopic invoked"
publish(rb, "mytopic")
data
array may contains any type, but if the components are implemented in different languages then data has to be a DataFrame or a primitive type that is CBOR encodable.
Rembus.reactive
— Methodreactive(
rb::RBHandle;
from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
raise=true,
wait=true
)
Start the delivery of published messages for which there was declared an interest with subscribe
.
Rembus.register
— Methodregister(
cid::AbstractString,
pin::AbstractString;
tenant=Union{Nothing, AbstractString} = nothing,
scheme::UInt8
)
Register the component with name cid
.
To register a component a single pin
or a set of tenants must be configured in the tenants.json
file.
The pin
shared secret is a 8 hex digits string (for example "deedbeef").
Rembus.rpc
— Functionrpc(rb,
topic::AbstractString,
data=nothing;
raise=true,
timeout=request_timeout())
Call the remote topic
method with arguments extracted from data
.
The rb
handle may be a RBHandle
or a Server
value.
Exposer
using Rembus
using Statistics
@expose service_noargs() = "success"
@expose service_name(name) = "hello " * name
@expose service_dictionary(d) = mean(values(d))
@expose function service_multiple_args(name, score, flags)
isa(name, String) && isa(score, Float64) && isa(flags, Vector)
end
RPC client
using Rembus
rb = connect()
rcp(rb, "service_noargs")
rpc(rb, "service_name", "hello world")
rpc(rb, "service_dictionary", Dict("r1"=>13.3, "r2"=>3.0))
rpc(rb, "service_multiple_args", ["name", 1.0, ["red"=>1,"blue"=>2,"yellow"=>3]])
Rembus.server
— Functionserver(
ctx=nothing;
secure=false,
ws=nothing,
tcp=nothing,
http=nothing,
zmq=nothing,
name="server",
mode=nothing,
log=TRACE_INFO
)
Initialize a server node.
Rembus.subscribe
— Methodsubscribe(rb::RBHandle, fn::Function; from=Now(), raise=true)
subscribe(
rb::RBHandle, topic::AbstractString, fn::Function; from=Now(),
raise=true
)
Declare interest for messages published on topic
logical channel.
The function fn
is called when a message is received on topic
and reactive
put the rb
component in reactive mode.
If the topic
argument is omitted the function name must be equal to the topic name.
If from
is LastReceived()
then rb
component will receive messages published when it was offline.
Rembus.unauthorize
— Methodfunction unauthorize(
rb::RBHandle, client::AbstractString, topic::AbstractString;
raise=true
)
Revoke authorization to the client
component for use of the private topic
.
The component must have the admin role for revoking topic accessibility.
Rembus.unexpose
— Methodunexpose(rb::RBHandle, fn::Function; raise=true)
unexpose(rb::RBHandle, topic::AbstractString; raise=true)
Stop servicing RPC requests targeting topic
or fn
methods.
Rembus.unreactive
— Methodunreactive(rb::RBHandle, timeout=5; raise=true, wait=true)
Stop the delivery of published message.
Rembus.unregister
— Methodunregister(rb)
Unregister the connected component.
Only a connected and authenticated component may execute the unregister command.
using Rembus
rb = connect("authenticated_component")
Rembus.unregister(rb)
close(rb)
Rembus.unsubscribe
— Methodunsubscribe(rb::RBHandle, topic::AbstractString; raise=true, wait=true)
unsubscribe(rb::RBHandle, fn::Function; raise=true, wait=true)
No more messages published on a topic
logical channel or a topic name equals to the name of the subscribed function will be delivered to rb
component.
Rembus.@component
— Macro@component "url"
Set the name of the component and the protocol for connecting to the broker.
url
may be:
- "myname": use $REMBUS_BASE_URL for connection parameters
- "tcp://host:port/myname": tcp connection
- "ws://host:port/myname": web socket connection
- "zmq://host:port/myname": ZeroMQ connection
Rembus.@expose
— Macro@expose function fn(arg1,...)
...
end
Expose the function expression.
Rembus.@expose
— Macro@expose fn
Expose all the methods of the function fn
.
Example
Expose the function mycalc
that implements a service that may accept two numbers or a string and number:
mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y
@expose mycalc
Call mycal
service using the correct types of arguments:
# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003
# ok
julia> response = @rpc mycalc("hello",2.0)
10.0
If the RPC client call mycalc
with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException
julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
Rembus.@forever
— Macro@forever
Start the event loop awaiting to execute exposed and subscribed methods.
Rembus.@inject
— Macro @inject container
Bind a container
object that is passed as the first argument of the subscribed component functions.
The container
is useful for mantaining a state.
using Rembus
# keep the number of processed messages
mutable struct Context
msgcount::UInt
end
function topic(context::Context, arg1, arg2)
context.msgcount += 1
some_logic(arg1, arg2)
end
ctx = Context(0)
@subscribe topic
@inject ctx
Using @inject
to set a container
object means that if some component publish topic(arg1,arg2)
then the method foo(container,arg2,arg2)
will be called.
Rembus.@publish
— Macro@publish topic(arg1,arg2,...)
Publish a message to topic
logic channel.
The function topic(arg1,arg2,...)
will be called on each connected component subscribed to topic
.
Publisher
@publish foo("gfr", 54.2)
Subscriber
function foo(name, value)
println("do something with $name=$value")
end
@subscribe foo
@reactive
supervise()
Rembus.@reactive
— Macro@reactive
The subscribed methods start to handle published messages.
Rembus.@reactive
— Macro@reactive
The subscribed methods start to handle published messages.
Rembus.@rpc
— Macro@rpc service(arg1,...)
Call the remote service
method and return its outcome.
The outcome may be the a return value or a RpcMethodException
if the remote throws an exception.
The service
method must match the signature of an exposed remote service
method.
Components may subscribe to service
for receiving the service
request.
Exposer
function mymethod(x, y)
return evaluate(x,y)
end
@expose mymethod
supervise()
RPC client
response = @rpc mymethod(x,y)
Subscriber
function service(x, y)
...
end
@subscribe service
@reactive
supervise()
Rembus.@rpc_timeout
— Macro@rpc_timeout value
Set the rpc request timeout in seconds.
Rembus.@subscribe
— Macro@subscribe topic [mode]
Setup a subscription to topic
logic channel to handle messages from @publish
or @rpc
.
mode
values`:
from_now
(default): receive messages published from now.before_now
: receive messages published when the component was offline.
Messages starts to be delivered to topic
when reactivity is enabled with @reactive
macro.
Subscriber
function foo(arg1, arg2)
...
end
@subscribe foo
@reactive
supervise()
Publisher
@publish foo("gfr", 54.2)
Rembus.@subscribe
— Macro@subscribe function fn(args...)
...
end [mode]
Subscribe the function expression.
Rembus.@terminate
— Macro@terminate
Close the connection and terminate the component.
Rembus.@unexpose
— Macro@unexpose fn
The methods of fn
function is no more available to rpc clients.
Rembus.@unreactive
— Macro@unreactive
The subscribed methods stop to handle published messages.
Rembus.@unsubscribe
— Macro@unsubscribe mytopic
The methods of mytopic
function stop to handle messages published to topic mytopic
.