Rembus

Rembus is a middleware for Pub/Sub and RPC communication styles.

There are two types of processes: Components and Brokers.

  • A Component connect to a Broker;
  • A Broker dispatch messages between Components;
  • A Component expose RPC services and/or subscribe to Pub/Sub topics;
  • A Component make RPC requests and/or publish messages to Pub/Sub topics;

Installation

using Pkg
Pkg.add("Rembus")

Rembus installs and compiles in a minute or two.

Broker

A Broker is a process that routes messages between components.

A Broker is capable of making components that use different transport protocols talk to each other. For example a component that uses a ZeroMQ socket may talk to a component that uses the WebSocket protocol.

Starting a Broker is simple as:

using Rembus

broker()

A startup script could be useful and the following broker script will do:

##!/bin/bash
#=
BINDIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
exec julia --threads auto --color=no -e "include(popfirst!(ARGS))" \
 --project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using Rembus

broker()

broker starts by default a WebSocket server listening on port 8000, for enabling tcp and/or zmq transports use the appropriate arguments:

shell> ./broker
usage: broker [-r] [-s] [-p HTTP] [-w WS] [-t TCP] [-z ZMQ] [-d] [-h]

optional arguments:
  -r, --reset      factory reset, clean up broker configuration
  -s, --secure     accept wss and tls connections
  -p, --http HTTP  accept HTTP clients on port HTTP (type: UInt16)
  -w, --ws WS      accept WebSocket clients on port WS (type: UInt16)
  -t, --tcp TCP    accept tcp clients on port TCP (type: UInt16)
  -z, --zmq ZMQ    accept zmq clients on port ZMQ (type: UInt16)
  -d, --debug      enable debug logs
  -h, --help       show this help message and exit

See Broker environment variables for customizing the runtime setting.

Component

A Component is a process that plays one or more of the following roles:

  • Publisher (Pub/Sub) : produce messages;
  • Subscriber (Pub/Sub): consume published messages;
  • Requestor (RPC): request a service;
  • Exposer (RPC): execute a service request and give back a response;

There are three type of components:

  • Anonymous
  • Named
  • Authenticated

An Anonymous component assume a random and ephemeral identity each time it connects to the broker. Example usage for anonymous components may be:

  • when it is not required to trace the originating source of messages;
  • for a Subscriber not interested to receive messages published when it was offline;
  • for preliminary prototyping;

A Named component has a unique and persistent name that make possible to receive messages published when the component was offline.

An Authenticated component is a named component that own a private key or a shared secret which can prove its identity.

Only authenticated components may use Pub/Sub private topics and private RPC methods.

An URL string defines the identity and the connection parameters of a component. The Macro-based API page documents the URL format.

Index

Rembus.RpcMethodExceptionType
RpcMethodException

Thrown when a RPC method throws an exception.

Fields

  • cid: component name

  • topic: service name

  • reason: remote exception description

Exposer

@expose foo(name::AbstractString) = "hello " * name

RPC client

try
    @rpc foo(1)
catch e
    @error e.reason
end

Output:

┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│   foo(!Matched::AbstractString)
│    @ Main REPL[2]:1
└ @ Main REPL
source
Rembus.RpcMethodLoopbackType
RpcMethodLoopback

Thrown when a RPC request would invoke a locally exposed method.

Fields

  • cid: component name

  • topic: service name

source
Rembus.RpcMethodNotFoundType

RpcMethodNotFound is thrown from a rpc request when the called method is unknown.

fields:

  • cid: component name

  • topic: service name

RPC Client

@rpc coolservice()

Output:

ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
source
Rembus.RpcMethodUnavailableType
RpcMethodUnavailable

Thrown when a RPC method is unavailable.

A method is considered unavailable when some component that exposed the method is currently disconnected from the broker.

Fields

  • cid: component name

  • topic: service name

source
Rembus.authorizeMethod
function authorize(
    rb::RBHandle, client::AbstractString, topic::AbstractString;
    raise=true
)

Authorize the client component to use the private topic.

The component must have the admin role for granting topic accessibility.

source
Rembus.brokerMethod
broker(;
    wait=true,
    secure=nothing,
    ws=nothing,
    tcp=nothing,
    zmq=nothing,
    http=nothing,
    name="broker",
    policy=:first_up,
    mode=nothing,
    reset=nothing,
    log="info",
    plugin=nothing,
    context=nothing
)

Start the broker.

Return immediately when wait is false, otherwise blocks until shutdown is requested.

Overwrite command line arguments if args is not empty.

source
Rembus.componentFunction
component(url)

Connect rembus component defined by url.

The connection is supervised and network faults starts connection retries attempts until successful outcome.

source
Rembus.componentFunction
component(urls::Vector)

Connect component to remotes defined be urls array.

The connection pool is supervised.

source
Rembus.connectMethod
connect(url::AbstractString)::RBHandle

Connect to the broker.

The returned rembus handler do not auto-reconnect in case of a fault condition.

The returned RBHandle handle represents a connected component used for the Rembus APIs. For example:

using Rembus
rb = connect("mycomponent")
publish(rb, "temperature", ["room_1", 21.5])

The url argument string is formatted as:

url = [<protocol>://][<host>][:<port>/]<cid>

<protocol> is one of:

  • ws web socket
  • wss secure web socket
  • tcp tcp socket
  • tls TLS over tcp socket
  • zmq ZeroMQ socket

<host> and <port> are the hostname/ip and the port of the listening broker.

<cid> is the unique name of the component.

source
Rembus.connectMethod
connect()

Connect anonymously to the endpoint declared with REMBUS_BASE_URL env variable.

REMBUS_BASE_URL default to ws://127.0.0.1:8000

A component is considered anonymous when a different and random UUID is used as component identifier each time the application connect to the broker.

source
Rembus.exposeMethod
expose(rb::RBHandle, fn::Function; raise=true, wait=true)
expose(rb::RBHandle, topic::AbstractString, fn::Function; raise=true, wait=true)

Expose the methods of function fn to be executed by rpc clients using topic as RPC method name.

If the topic argument is omitted the function name equals to the RPC method name.

fn returns the RPC response.

source
Rembus.foreverMethod
forever(rb::RBHandle)

Start the event loop awaiting to execute exposed and subscribed methods.
source
Rembus.foreverMethod
forever(server::Server; wait=true, secure=false)

Start an embedded server and accept connections.

source
Rembus.foreverMethod
forever(rb::Visor.Process)

Start the event loop awaiting to execute exposed and subscribed methods.
source
Rembus.injectFunction
inject(rb::RBHandle, ctx)

Bind a ctx context object to the rb component.

When a ctx context object is bound then it will be the first argument of subscribed and exposed methods.

See @inject for more details.

source
Rembus.private_topicMethod
private_topic(rb::RBHandle, topic::AbstractString; raise=true)

Set the topic to private.

The component must have the admin role for changing the privateness level.

source
Rembus.public_topicMethod
public_topic(rb::RBHandle, topic::AbstractString; raise=true)

Set the topic to public.

The component must have the admin role for changing the privateness level.

source
Rembus.publishFunction
publish(rb::RBHandle, topic::AbstractString, data=[]; qos=QOS0)

Publish data values on topic.

data may be a value or a vector of values. Each value map to the arguments of the subscribed method.

For example if the subscriber is a method that expects two arguments:

mytopic(x,y) = @info "x=$x, y=$y"

The published message needs an array of two elements:

publish(rb, "mytopic", [1, 2])

When a subscribed method expect one argument instead of passing an array of one element it may be better to pass the value:

mytopic(x) = @info "x=$x"

publish(rb, "mytopic", 1)

If the subscribed method has no arguments invoke publish as:

mytopic() = @info "mytopic invoked"

publish(rb, "mytopic")

data array may contains any type, but if the components are implemented in different languages then data has to be a DataFrame or a primitive type that is CBOR encodable.

source
Rembus.reactiveMethod
reactive(
    rb::RBHandle;
    from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
    raise=true,
    wait=true
)

Start the delivery of published messages for which there was declared an interest with subscribe.

source
Rembus.registerMethod
register(
    cid::AbstractString,
    pin::AbstractString;
    tenant=Union{Nothing, AbstractString} = nothing,
    scheme::UInt8
)

Register the component with name cid.

To register a component a single pin or a set of tenants must be configured in the tenants.json file.

The pin shared secret is a 8 hex digits string (for example "deedbeef").

source
Rembus.rpcFunction
rpc(rb,
    topic::AbstractString,
    data=nothing;
    raise=true,
    timeout=request_timeout())

Call the remote topic method with arguments extracted from data.

The rb handle may be a RBHandle or a Server value.

Exposer

using Rembus
using Statistics

@expose service_noargs() = "success"

@expose service_name(name) = "hello " * name

@expose service_dictionary(d) = mean(values(d))

@expose function service_multiple_args(name, score, flags)
    isa(name, String) && isa(score, Float64) && isa(flags, Vector)
end

RPC client

using Rembus

rb = connect()

rcp(rb, "service_noargs")

rpc(rb, "service_name", "hello world")

rpc(rb, "service_dictionary", Dict("r1"=>13.3, "r2"=>3.0))

rpc(rb, "service_multiple_args", ["name", 1.0, ["red"=>1,"blue"=>2,"yellow"=>3]])
source
Rembus.serverFunction
server(
    ctx=nothing;
    secure=false,
    ws=nothing,
    tcp=nothing,
    http=nothing,
    zmq=nothing,
    name="server",
    mode=nothing,
    log=TRACE_INFO
)

Initialize a server node.

source
Rembus.subscribeMethod
subscribe(rb::RBHandle, fn::Function; from=Now(), raise=true)
subscribe(
    rb::RBHandle, topic::AbstractString, fn::Function; from=Now(),
    raise=true
)

Declare interest for messages published on topic logical channel.

The function fn is called when a message is received on topic and reactive put the rb component in reactive mode.

If the topic argument is omitted the function name must be equal to the topic name.

If from is LastReceived() then rb component will receive messages published when it was offline.

source
Rembus.unauthorizeMethod
function unauthorize(
    rb::RBHandle, client::AbstractString, topic::AbstractString;
    raise=true
)

Revoke authorization to the client component for use of the private topic.

The component must have the admin role for revoking topic accessibility.

source
Rembus.unexposeMethod
unexpose(rb::RBHandle, fn::Function; raise=true)
unexpose(rb::RBHandle, topic::AbstractString; raise=true)

Stop servicing RPC requests targeting topic or fn methods.

source
Rembus.unreactiveMethod
unreactive(rb::RBHandle, timeout=5; raise=true, wait=true)

Stop the delivery of published message.

source
Rembus.unregisterMethod
unregister(rb)

Unregister the connected component.

Only a connected and authenticated component may execute the unregister command.

using Rembus

rb = connect("authenticated_component")
Rembus.unregister(rb)
close(rb)
source
Rembus.unsubscribeMethod
unsubscribe(rb::RBHandle, topic::AbstractString; raise=true, wait=true)
unsubscribe(rb::RBHandle, fn::Function; raise=true, wait=true)

No more messages published on a topic logical channel or a topic name equals to the name of the subscribed function will be delivered to rb component.

source
Rembus.@componentMacro
@component "url"

Set the name of the component and the protocol for connecting to the broker.

url may be:

  • "myname": use $REMBUS_BASE_URL for connection parameters
  • "tcp://host:port/myname": tcp connection
  • "ws://host:port/myname": web socket connection
  • "zmq://host:port/myname": ZeroMQ connection
source
Rembus.@exposeMacro
@expose fn

Expose all the methods of the function fn.

Example

Expose the function mycalc that implements a service that may accept two numbers or a string and number:

mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y

@expose mycalc

Call mycal service using the correct types of arguments:

# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003

# ok
julia> response = @rpc mycalc("hello",2.0)
10.0

If the RPC client call mycalc with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException

julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
source
Rembus.@foreverMacro
@forever

Start the event loop awaiting to execute exposed and subscribed methods.

source
Rembus.@injectMacro
 @inject container

Bind a container object that is passed as the first argument of the subscribed component functions.

The container is useful for mantaining a state.

using Rembus

# keep the number of processed messages
mutable struct Context
    msgcount::UInt
end

function topic(context::Context, arg1, arg2)
    context.msgcount += 1
    some_logic(arg1, arg2)
end

ctx = Context(0)
@subscribe topic
@inject ctx

Using @inject to set a container object means that if some component publish topic(arg1,arg2) then the method foo(container,arg2,arg2) will be called.

source
Rembus.@publishMacro
@publish topic(arg1,arg2,...)

Publish a message to topic logic channel.

The function topic(arg1,arg2,...) will be called on each connected component subscribed to topic.

Publisher

@publish foo("gfr", 54.2)

Subscriber

function foo(name, value)
    println("do something with $name=$value")
end

@subscribe foo
@reactive

supervise()
source
Rembus.@rpcMacro
@rpc service(arg1,...)

Call the remote service method and return its outcome.

The outcome may be the a return value or a RpcMethodException if the remote throws an exception.

The service method must match the signature of an exposed remote service method.

Components may subscribe to service for receiving the service request.

Exposer

function mymethod(x, y)
    return evaluate(x,y)
end

@expose mymethod
supervise()

RPC client

response = @rpc mymethod(x,y)

Subscriber

function service(x, y)
    ...
end

@subscribe service
@reactive

supervise()
source
Rembus.@subscribeMacro
@subscribe topic [mode]

Setup a subscription to topic logic channel to handle messages from @publish or @rpc.

mode values`:

  • from_now (default): receive messages published from now.
  • before_now: receive messages published when the component was offline.

Messages starts to be delivered to topic when reactivity is enabled with @reactive macro.

Subscriber

function foo(arg1, arg2)
    ...
end

@subscribe foo
@reactive

supervise()

Publisher

@publish foo("gfr", 54.2)
source
Rembus.@unsubscribeMacro
@unsubscribe mytopic

The methods of mytopic function stop to handle messages published to topic mytopic.

source