Rembus
Broker
Starting the broker is simple as:
julia -e "using Rembus; caronte()"
A startup script could be useful and the following caronte
script suffice:
##!/bin/bash
#=
BINDIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
exec julia --threads auto --color=no -e "include(popfirst!(ARGS))" \
--project=$BINDIR/.. --startup-file=no "${BASH_SOURCE[0]}" "$@"
=#
using Rembus
Rembus.caronte()
See Broker environment variables for customizing the runtime setting.
Component
A Component
is a broker client who uses the Rembus protocol for RPC commands and for streaming data in a Pub/Sub fashion.
The macro @component
declares a component whom identity and the connection parameters are defined with an URL:
component_url = "[<protocol>://][<host>][:<port>/]<cid>"
@component component_url
<protocol>
is one of:
ws
web socketwss
secure web sockettcp
tcp sockettls
TLS over tcp socketzmq
ZeroMQ socket
<host>
and <port>
are the hostname/ip and the port of the broker listening endpoint.
<cid>
is the unique name of the component.
For example:
@component "ws://caronte.org:8000/myclient"
defines the component myclient
that communicates with the broker hosted on caronte.org
, listening on port 8000
and accepting web socket connections.
Default component URL parameters
The string that define a component may be simplified by using the enviroment variable REMBUS_BASE_URL
that set the connection default parameters:
For example:
REMBUS_BASE_URL=ws://localhost:8000
define the default protocol, host and port, so that the above component_url
may be simplified as:
@component "myclient"
Uses the web socket protocol to connect to localhost
on port 8000
.
Index
Rembus.RembusDisconnect
Rembus.RembusTimeout
Rembus.RpcMethodException
Rembus.RpcMethodLoopback
Rembus.RpcMethodNotFound
Rembus.RpcMethodUnavailable
Rembus.authorize
Rembus.broker_parse
Rembus.broker_parse
Rembus.caronte
Rembus.connect
Rembus.connect
Rembus.connected_socket_load
Rembus.create_private_key
Rembus.expose
Rembus.load_owners
Rembus.load_token_app
Rembus.load_twins
Rembus.private_topic
Rembus.public_topic
Rembus.publish
Rembus.reactive
Rembus.register
Rembus.rpc
Rembus.save_configuration
Rembus.save_owners
Rembus.save_token_app
Rembus.save_twins
Rembus.set_broker_context
Rembus.set_broker_plugin
Rembus.shared
Rembus.subscribe
Rembus.unauthorize
Rembus.unexpose
Rembus.unreactive
Rembus.unregister
Rembus.unsubscribe
Rembus.zmq_load
Rembus.zmq_message
Rembus.zmq_message
Rembus.@component
Rembus.@disable_ack
Rembus.@enable_ack
Rembus.@expose
Rembus.@expose
Rembus.@publish
Rembus.@reactive
Rembus.@rpc
Rembus.@rpc_timeout
Rembus.@shared
Rembus.@subscribe
Rembus.@subscribe
Rembus.@terminate
Rembus.@unexpose
Rembus.@unreactive
Rembus.@unsubscribe
Rembus.RembusDisconnect
— TypeRembusDisconnect
Thrown when a rembus connection get unexpectedly down.
Rembus.RembusTimeout
— TypeRembusTimeout
Thrown when a response it is not received.
Rembus.RpcMethodException
— TypeRpcMethodException
Thrown when a RPC method throws an exception.
Fields
cid
: component nametopic
: service namereason
: remote exception description
Exposer
@expose foo(name::AbstractString) = "hello " * name
RPC client
try
@rpc foo(1)
catch e
@error e.reason
end
Output:
┌ Error: MethodError: no method matching foo(::UInt64)
│
│ Closest candidates are:
│ foo(!Matched::AbstractString)
│ @ Main REPL[2]:1
└ @ Main REPL
Rembus.RpcMethodLoopback
— TypeRpcMethodLoopback
Thrown when a RPC request to a locally exposed method.
Fields
cid
: component nametopic
: service name
Rembus.RpcMethodNotFound
— TypeRpcMethodNotFound
is thrown from a rpc request when a remote method is unknown.
fields:
cid
: component nametopic
: service name
RPC Client
@rpc coolservice()
Output:
ERROR: Rembus.RpcMethodNotFound("rembus", "coolservice")
Stacktrace:
...
Rembus.RpcMethodUnavailable
— TypeRpcMethodUnavailable
Thrown when a RPC method is unavailable.
A method is considered unavailable when some component that expose the method is currently disconnected from the broker.
Fields
cid
: component nametopic
: service name
Rembus.authorize
— Methodauthorize(router, twin, msg)
Administration command to authorize a component to publish/subscribe to a private topic.
Rembus.broker_parse
— Methodbroker_parse(pkt)
Get a Rembus message from a CBOR encoded packet.
The decoding is performed at the broker side.
Rembus.broker_parse
— Methodbroker_parse(router::Router, pkt::ZMQPacket)
The Broker parser of ZeroMQ messages.
pkt
is the zeromq message decoded as [identity, header, data]
.
Rembus.caronte
— Methodcaronte(; wait=true, exit_when_done=true)
Start the broker.
Return immediately when wait
is false, otherwise blocks until shut down.
Return instead of exiting if exit_when_done
is false.
Rembus.connect
— Methodconnect(url::AbstractString)::RBHandle
Connect to the broker.
The returned rembus handler do not auto-reconnect in case of a fault condition.
The returned RBHandle
handle represents a connected component used for the Rembus APIs. For example:
using Rembus
rb = connect("mycomponent")
publish(rb, "temperature", ["room_1", 21.5])
Rembus.connect
— Methodconnect()
Connect anonymously to the broker.
A random v4 UUID is used as component identifier.
Rembus.connected_socket_load
— Methodconnected_socket_load(pkt)
Get a Rembus message from a CBOR encoded packet.
The decoding is performed at the client side.
Rembus.create_private_key
— Methodcreate_private_key(cid::AbstractString)
Create a private key for cid
component and return its public key.
Rembus.expose
— Methodexpose(rb::RBHandle, topic::AbstractString, fn::Function; exceptionerror=true)
The methods of function fn
are registered to be executed when a RPC topic
request is received.
The returned value is the RPC response returned to the RPC client.
Rembus.load_owners
— Methodload_owners()
Return the owners dataframe
Rembus.load_token_app
— Methodload_token_app()
Return the token_app dataframe
Rembus.load_twins
— Methodload_twins(router)
Instantiates twins that have one or more interests.
Rembus.private_topic
— Methodprivate_topic(router, twin, msg)
Administration command to declare a private topic.
Rembus.public_topic
— Methodpublic_topic(router, twin, msg)
Administration command to reset a topic to public.
Rembus.publish
— Functionpublish(rb::RBHandle, topic::AbstractString, data=[])
Publish data
values on topic topic
.
data
may be any type of data, but if the components are implemented in different languages then data has to be a DataFrame or a CBOR basic data type.
Rembus.reactive
— Methodreactive(rb::RBHandle, timeout=5; exceptionerror=true)
Start the delivery of published messages for which there was declared an interest with subscribe
.
Rembus.register
— Methodregister(cid::AbstractString)
Register the client identified by cid
.
Rembus.rpc
— Functionrpc(rb::RBHandle,
topic::AbstractString,
data=nothing;
exceptionerror=true,
timeout=request_timeout())
Call the remote topic
method with arguments extracted from data
.
Exposer
using Rembus
using Statistics
@expose service_noargs() = "success"
@expose service_name(name) = "hello " * name
@expose service_dictionary(d) = mean(values(d))
@expose function service_multiple_args(name, score, flags)
isa(name, String) && isa(score, Float64) && isa(flags, Vector)
end
RPC client
using Rembus
rb = connect()
rcp(rb, "service_noargs")
rpc(rb, "service_name", "hello world")
rpc(rb, "service_dictionary", Dict("r1"=>13.3, "r2"=>3.0))
rpc(rb, "service_multiple_args", ["name", 1.0, ["red"=>1,"blue"=>2,"yellow"=>3]])
Rembus.save_configuration
— Methodsave_configuration(router::Router)
Persist router configuration on disk.
Rembus.save_owners
— Methodsave_owners(owners_df)
Save the owners table.
Rembus.save_token_app
— Methodsave_token_app(df)
Save the owners table.
Rembus.save_twins
— Methodsave_twins(router)
Persist twins to storage.
Save twins configuration only if twin has a name.
Persist undelivered messages if they are queued in memory.
Rembus.set_broker_context
— Methodset_broker_context(ctx)
Set the object to be passed ad first argument to functions related to twin lifecycle.
Actually the functions that use ctx
are:
twin_initialize
twin_finalize
park
unpark
Rembus.set_broker_plugin
— Methodset_broker_plugin(extension::Module)
Inject the module that implements the functions related to twin lifecycle.
Rembus.shared
— Methodshared(rb::RBHandle, ctx)
Bind a ctx
context object to the rb
component.
When a ctx
context object is bound then it will be the first argument of subscribed and exposed methods.
Rembus.subscribe
— Functionsubscribe(
rb::RBHandle, topic::AbstractString, fn::Function, retroactive::Bool=false;
exceptionerror=true
)
Declare interest for messages published on topic
.
The function fn
is called when a message is received on topic
and reactive
put the rb
component in reactive mode.
If retroactive
is true
then rb
component will receive messages published when it was offline.
Rembus.unauthorize
— Methodunauthorize(router, twin, msg)
Administration command to unauthorize a component to publish/subscribe to a private topic.
Rembus.unexpose
— Methodunexpose(rb::RBHandle, topic::AbstractString; exceptionerror=true)
Stop servicing RPC topic
request.
Rembus.unreactive
— Methodunreactive(rb::RBHandle, timeout=5; exceptionerror=true)
Stop the delivery of published message.
Rembus.unregister
— Methodunregister(cid::AbstractString)
Unregister the client identified by cid
.
The secret pin is not needed because only an already connected and authtenticated component may execute the unregister command.
Rembus.unsubscribe
— Methodunsubscribe(rb::RBHandle, topic::AbstractString; exceptionerror=true)
No more messages published on topic
will be delivered to rb
component.
Rembus.zmq_load
— Methodzmq_load(socket::ZMQ.Socket)
Get a Rembus message from a ZeroMQ multipart message.
The decoding is performed at the client side.
Rembus.zmq_message
— Methodzmq_message(router::Router)::ZMQPacket
Receive a Multipart ZeroMQ message.
Return the packet identity, header and data values extracted from a ROUTER socket.
Rembus.zmq_message
— Methodzmq_message(socket::ZMQ.Socket)::ZMQDealerPacket
Receive a Multipart ZeroMQ message.
Return the packet header and data values extracted from a DEALER socket.
Rembus.@component
— Macro@component "url"
Set the name of the component and the protocol for connecting to the broker.
url
may be:
- "myname": use $REMBUS_BASE_URL for connection parameters
- "tcp://host:port/myname": tcp connection
- "ws://host:port/myname": web socket connection
- "zmq://host:port/myname": ZeroMQ connection
Rembus.@disable_ack
— Macro@disable_ack
Disable acknowledge receipt of published messages.
This feature assure that messages get delivered at least one to the subscribed component.
Rembus.@enable_ack
— Macro@enable_ack
Enable acknowledge receipt of published messages.
This feature assure that messages get delivered at least one time to the subscribed component.
For default the acknowledge is disabled.
Rembus.@expose
— Macro@expose function fn(arg1,...)
...
end
Expose the function expression.
Rembus.@expose
— Macro@expose fn
Expose all the methods of the function fn
.
Example
Expose the function mycalc
that implements a service that may accept two numbers or a string and number:
mycalc(x::Number, y::Number) = x+y
mycalc(x::String, y::Number) = length(x)*y
@expose mycalc
Call mycal
service using the correct types of arguments:
# ok
julia> response = @rpc mycalc(1,2)
0x0000000000000003
# ok
julia> response = @rpc mycalc("hello",2.0)
10.0
If the RPC client call mycalc
with the argument's type that do not respect the signatures of the exposed service then it throws RpcMethodException
julia> response = @rpc mycalc("hello","world")
ERROR: RpcMethodException("rembus", "mycalc", "MethodError: no method matching mycalc(::String, ::String) ...
Rembus.@publish
— Macro@publish topic(arg1,arg2,...)
Publish a message to topic
logic channel.
The function topic(arg1,arg2,...)
will be called on each connected component subscribed to topic
.
Publisher
@publish foo("gfr", 54.2)
Subscriber
function foo(name, value)
println("do something with $name=$value")
end
@subscribe foo
@reactive
supervise()
Rembus.@reactive
— Macro@reactive
The subscribed methods start to handle published messages.
Rembus.@rpc
— Macro@rpc service(arg1,...)
Call the remote service
method and return its outcome.
The outcome may be the a return value or a RpcMethodException
if the remote throws an exception.
The service
method must match the signature of an exposed remote service
method.
Components may subscribe to service
for receiving the service
request.
Exposer
function mymethod(x, y)
return evaluate(x,y)
end
@expose mymethod
supervise()
RPC client
response = @rpc mymethod(x,y)
Subscriber
function service(x, y)
...
end
@subscribe service
@reactive
supervise()
Rembus.@rpc_timeout
— Macro@rpc_timeout value
Set the rpc request timeout in seconds.
Rembus.@shared
— Macro @shared container
Bind a container
object that is passed as the first argument of the subscribed component functions.
The container
is useful for mantaining a state.
using Rembus
# keep the number of processed messages
mutable struct Context
msgcount::UInt
end
function topic(context::Context, arg1, arg2)
context.msgcount += 1
some_logic(arg1, arg2)
end
ctx = Context(0)
@subscribe topic
@shared ctx
@reactive
Using @shared
to set a container
object means that if some component publish topic(arg1,arg2)
then the method foo(container,arg2,arg2)
will be called.
Rembus.@subscribe
— Macro@subscribe function fn(args...)
...
end [mode]
Subscribe the function expression.
Rembus.@subscribe
— Macro@subscribe topic [mode]
Setup a subscription to topic
logic channel to handle messages from @publish
or @rpc
.
mode
values`:
from_now
(default): receive messages published from now.before_now
: receive messages published when the component was offline.
Messages starts to be delivered to topic
when reactivity is enabled with @reactive
macro.
Subscriber
function foo(arg1, arg2)
...
end
@subscribe foo
@reactive
supervise()
Publisher
@publish foo("gfr", 54.2)
Rembus.@terminate
— Macro@terminate
Close the connection and terminate the component.
Rembus.@unexpose
— Macro@unexpose fn
The methods of fn
function is no more available to rpc clients.
Rembus.@unreactive
— Macro@unreactive
The subscribed methods stop to handle published messages.
Rembus.@unsubscribe
— Macro@unsubscribe mytopic
The methods of mytopic
function stop to handle messages published to topic mytopic
.