Rembus

Broker

Starting the broker is simple as:

julia -e "using Rembus; caronte()"

A startup script could be useful and the following caronte script suffice:

##!/bin/bash
#=
BINDIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
exec julia --threads auto --color=no -e "include(popfirst!(ARGS))" \
 --project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using Rembus

Rembus.caronte()

See Broker environment variables for customizing the runtime setting.

Component

A Component is a broker client who uses the Rembus protocol for RPC commands and for streaming data in a Pub/Sub fashion.

The macro @component declares a component whom identity and the connection parameters are defined with an URL:

component_url = "[<protocol>://][<host>][:<port>/]<cid>"

@component component_url

<protocol> is one of:

  • ws web socket
  • wss secure web socket
  • tcp tcp socket
  • tls TLS over tcp socket
  • zmq ZeroMQ socket

<host> and <port> are the hostname/ip and the port of the broker listening endpoint.

<cid> is the unique name of the component.

For example:

@component "ws://caronte.org:8000/myclient"

defines the component myclient that communicates with the broker hosted on caronte.org, listening on port 8000 and accepting web socket connections.

Default component URL parameters

The string that define a component may be simplified by using the enviroment variable REMBUS_BASE_URL that set the connection default parameters:

For example:

REMBUS_BASE_URL=ws://localhost:8000

define the default protocol, host and port, so that the above component_url may be simplified as:

@component "myclient"

Uses the web socket protocol to connect to localhost on port 8000.

Index

Rembus.RpcMethodExceptionType
RpcMethodException

Thrown when a RPC method throws an exception.

Fields

  • cid: component name

  • topic: service name

  • reason: remote exception description

Exposer

@expose foo(name::AbstractString) = "hello " * name

RPC client

try
    @rpc foo(1)
catch e
    @error e.reason
end

Output:

┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│   foo(!Matched::AbstractString)
│    @ Main REPL[2]:1
└ @ Main REPL
source
Rembus.RpcMethodLoopbackType
RpcMethodLoopback

Thrown when a RPC request to a locally exposed method.

Fields

  • cid: component name

  • topic: service name

source
Rembus.RpcMethodNotFoundType

RpcMethodNotFound is thrown from a rpc request when a remote method is unknown.

fields:

  • cid: component name

  • topic: service name

RPC Client

@rpc coolservice()

Output:

ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
source
Rembus.RpcMethodUnavailableType
RpcMethodUnavailable

Thrown when a RPC method is unavailable.

A method is considered unavailable when some component that expose the method is currently disconnected from the broker.

Fields

  • cid: component name

  • topic: service name

source
Rembus.authorizeMethod
authorize(router, twin, msg)

Administration command to authorize a component to publish/subscribe to a private topic.

source
Rembus.broker_parseMethod
broker_parse(pkt)

Get a Rembus message from a CBOR encoded packet.

The decoding is performed at the broker side.

source
Rembus.broker_parseMethod
broker_parse(router::Router, pkt::ZMQPacket)

The Broker parser of ZeroMQ messages.

pkt is the zeromq message decoded as [identity, header, data].

source
Rembus.caronteMethod
caronte(; wait=true, exit_when_done=true)

Start the broker.

Return immediately when wait is false, otherwise blocks until shut down.

Return instead of exiting if exit_when_done is false.

source
Rembus.connectMethod
connect(url::AbstractString)::RBHandle

Connect to the broker.

The returned rembus handler do not auto-reconnect in case of a fault condition.

The returned RBHandle handle represents a connected component used for the Rembus APIs. For example:

using Rembus
rb = connect("mycomponent")
publish(rb, "temperature", ["room_1", 21.5])
source
Rembus.connectMethod
connect()

Connect anonymously to the broker.

A random v4 UUID is used as component identifier.

source
Rembus.exposeMethod
expose(rb::RBHandle, topic::AbstractString, fn::Function; exceptionerror=true)

The methods of function fn are registered to be executed when a RPC topic request is received.

The returned value is the RPC response returned to the RPC client.

source
Rembus.publishFunction
publish(rb::RBHandle, topic::AbstractString, data=[])

Publish data values on topic topic.

data may be any type of data, but if the components are implemented in different languages then data has to be a DataFrame or a CBOR basic data type.

source
Rembus.reactiveMethod
reactive(rb::RBHandle, timeout=5; exceptionerror=true)

Start the delivery of published messages for which there was declared an interest with subscribe.

source
Rembus.rpcFunction
rpc(rb::RBHandle,
    topic::AbstractString,
    data=nothing;
    exceptionerror=true,
    timeout=request_timeout())

Call the remote topic method with arguments extracted from data.

Exposer

using Rembus
using Statistics

@expose service_noargs() = "success"

@expose service_name(name) = "hello " * name

@expose service_dictionary(d) = mean(values(d))

@expose function service_multiple_args(name, score, flags)
    isa(name, String) && isa(score, Float64) && isa(flags, Vector)
end

RPC client

using Rembus

rb = connect()

rcp(rb, "service_noargs")

rpc(rb, "service_name", "hello world")

rpc(rb, "service_dictionary", Dict("r1"=>13.3, "r2"=>3.0))

rpc(rb, "service_multiple_args", ["name", 1.0, ["red"=>1,"blue"=>2,"yellow"=>3]])
source
Rembus.save_twinsMethod
save_twins(router)

Persist twins to storage.

Save twins configuration only if twin has a name.

Persist undelivered messages if they are queued in memory.

source
Rembus.set_broker_contextMethod
set_broker_context(ctx)

Set the object to be passed ad first argument to functions related to twin lifecycle.

Actually the functions that use ctx are:

  • twin_initialize
  • twin_finalize
  • park
  • unpark
source
Rembus.set_broker_pluginMethod
set_broker_plugin(extension::Module)

Inject the module that implements the functions related to twin lifecycle.

source
Rembus.sharedMethod
shared(rb::RBHandle, ctx)

Bind a ctx context object to the rb component.

When a ctx context object is bound then it will be the first argument of subscribed and exposed methods.

source
Rembus.subscribeFunction
subscribe(
    rb::RBHandle, topic::AbstractString, fn::Function, retroactive::Bool=false;
    exceptionerror=true
)

Declare interest for messages published on topic.

The function fn is called when a message is received on topic and reactive put the rb component in reactive mode.

If retroactive is true then rb component will receive messages published when it was offline.

source
Rembus.unauthorizeMethod
unauthorize(router, twin, msg)

Administration command to unauthorize a component to publish/subscribe to a private topic.

source
Rembus.unexposeMethod
unexpose(rb::RBHandle, topic::AbstractString; exceptionerror=true)

Stop servicing RPC topic request.

source
Rembus.unreactiveMethod
unreactive(rb::RBHandle, timeout=5; exceptionerror=true)

Stop the delivery of published message.

source
Rembus.unregisterMethod

unregister(cid::AbstractString)

Unregister the client identified by cid.

The secret pin is not needed because only an already connected and authtenticated component may execute the unregister command.

source
Rembus.unsubscribeMethod
unsubscribe(rb::RBHandle, topic::AbstractString; exceptionerror=true)

No more messages published on topic will be delivered to rb component.

source
Rembus.zmq_loadMethod
zmq_load(socket::ZMQ.Socket)

Get a Rembus message from a ZeroMQ multipart message.

The decoding is performed at the client side.

source
Rembus.zmq_messageMethod
zmq_message(router::Router)::ZMQPacket

Receive a Multipart ZeroMQ message.

Return the packet identity, header and data values extracted from a ROUTER socket.

source
Rembus.zmq_messageMethod
zmq_message(socket::ZMQ.Socket)::ZMQDealerPacket

Receive a Multipart ZeroMQ message.

Return the packet header and data values extracted from a DEALER socket.

source
Rembus.@componentMacro
@component "url"

Set the name of the component and the protocol for connecting to the broker.

url may be:

  • "myname": use $REMBUS_BASE_URL for connection parameters
  • "tcp://host:port/myname": tcp connection
  • "ws://host:port/myname": web socket connection
  • "zmq://host:port/myname": ZeroMQ connection
source
Rembus.@disable_ackMacro
@disable_ack

Disable acknowledge receipt of published messages.

This feature assure that messages get delivered at least one to the subscribed component.

source
Rembus.@enable_ackMacro
@enable_ack

Enable acknowledge receipt of published messages.

This feature assure that messages get delivered at least one time to the subscribed component.

For default the acknowledge is disabled.

source
Rembus.@exposeMacro
@expose fn

Expose all the methods of the function fn.

Example

Expose the function mycalc that implements a service that may accept two numbers or a string and number:

mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y

@expose mycalc

Call mycal service using the correct types of arguments:

# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003

# ok
julia> response = @rpc mycalc("hello",2.0)
10.0

If the RPC client call mycalc with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException

julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
source
Rembus.@publishMacro
@publish topic(arg1,arg2,...)

Publish a message to topic logic channel.

The function topic(arg1,arg2,...) will be called on each connected component subscribed to topic.

Publisher

@publish foo("gfr", 54.2)

Subscriber

function foo(name, value)
    println("do something with $name=$value")
end

@subscribe foo
@reactive

supervise()
source
Rembus.@rpcMacro
@rpc service(arg1,...)

Call the remote service method and return its outcome.

The outcome may be the a return value or a RpcMethodException if the remote throws an exception.

The service method must match the signature of an exposed remote service method.

Components may subscribe to service for receiving the service request.

Exposer

function mymethod(x, y)
    return evaluate(x,y)
end

@expose mymethod
supervise()

RPC client

response = @rpc mymethod(x,y)

Subscriber

function service(x, y)
    ...
end

@subscribe service
@reactive

supervise()
source
Rembus.@sharedMacro
 @shared container

Bind a container object that is passed as the first argument of the subscribed component functions.

The container is useful for mantaining a state.

using Rembus

# keep the number of processed messages
mutable struct Context
    msgcount::UInt
end

function topic(context::Context, arg1, arg2)
    context.msgcount += 1
    some_logic(arg1, arg2)
end

ctx = Context(0)
@subscribe topic
@shared ctx
@reactive

Using @shared to set a container object means that if some component publish topic(arg1,arg2) then the method foo(container,arg2,arg2) will be called.

source
Rembus.@subscribeMacro
@subscribe topic [mode]

Setup a subscription to topic logic channel to handle messages from @publish or @rpc.

mode values`:

  • from_now (default): receive messages published from now.
  • before_now: receive messages published when the component was offline.

Messages starts to be delivered to topic when reactivity is enabled with @reactive macro.

Subscriber

function foo(arg1, arg2)
    ...
end

@subscribe foo
@reactive

supervise()

Publisher

@publish foo("gfr", 54.2)
source
Rembus.@unsubscribeMacro
@unsubscribe mytopic

The methods of mytopic function stop to handle messages published to topic mytopic.

source