Rembus API
This API provides both approaches to connection handling:
- automatic reconnection in case of network failures
- exception throwing in case of network errors and reconnection explicitly managed by the application.
Rembus API functions:
- component
- connect
- expose
- unexpose
- rpc
- subscribe
- unsubscribe
- publish
- reactive
- unreactive
- wait
- inject
- close
- shutdown
component
component(
url::AbstractString;
ws=nothing,
tcp=nothing,
zmq=nothing,
name=missing,
secure=false,
authenticated=false,
policy="first_up",
failovers=[]
) -> Twin
# for more details
help?> component
Start a component and join the network of Rembus nodes.
Connected Component
rb = component("ws://hostname:8000/mycomponent")
Connect to a broker that listens at the connection point ws://hostname:8000
and return the rb
handle used by the other APIs for exchanging data and commands.
In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.
See Connected Components for URL format details.
Broker
rb = component(ws=8000, tcp=8001)
Start a broker that listens on the web socket port 8000
and on the TCP port 8001
. The broker will accept connections from other components.
Broker and Connected Component
This is an advanced pattern that allows to create a component that is also a broker and that is able to connect to another broker. This pattern is useful for creating a component that is able to act as a proxy between two brokers or to create a component that is able to connect to a broker and at the same time to act as a broker for other components.
rb = component("ws://hostname:8000/mycomponent", ws=9000)
Start a broker that listens on the WebSocket port 9000
and connect to a broker defined at the connection point ws://hostname:8000
.
connect
connect(url::AbstractString) -> Twin
Connect to the broker and return a connection handle used by the other APIs for exchanging data and commands.
The URL string passed to connect
contains the address of a broker, the transport protocol, the port and optionally a persistent unique name for the component.
A disconnection from the remote endpoint will not trigger automatic reconnection, for example:
rb = connect("ws://hostname:8000/mycomponent")
Connects to a broker that listens at the connection point ws://hostname:8000
and returns the rb
handle used by the other APIs for exchanging data and commands.
If the broker is not reachable the connect
function will throw an Exception and if the connection is lost at a later time the rb
handle becomes disconnected. The status of a component can be checked with the isopen
method:
isopen(rb)
expose
expose(rb, name::AbstractString, fn::Function)
expose(rb, fn::Function)
Take a Julia function and exposes all of its the methods.
function myservice(df::DataFrame)
...
return another_dataframe
end
function myservice(map::Dict)
...
return 0
end
expose(rb, myservice)
The exposed function will became available to RPC clients using the @rpc
macro or the rpc
function.
unexpose
unexpose(rb, topic::AbstractString)
unexpose(rb, fn::Function)
Stop serving remote requests via rpc
or @rpc
.
rpc
rpc(
rb::Twin,
service::AbstractString,
data...
)
# for more details
help?> rpc
Request a remote method and wait for a response.
response = rpc(rb, "my_service", Dict("name"=>"foo", "tickets"=>3))
The service name and the arguments are CBOR-encoded and transported to the remote site and the method my_service
that expects a Dict
as argument is called.
The return value of my_service
is transported back to the RPC client calling site and taken as the return value of rpc
.
If the remote method throws an Exception then the local RPC client will throw either an Exception reporting the reason of the remote error.
If the exposed method expects many arguments send an array of values, where each value is an argument:
# exposer side
function my_service(x,y,z)
@assert x == 1
@assert y == 2
@assert z == 3
return x+y+z
end
# rpc client side
rpc(rb, "my_service", [1, 2, 3])
subscribe
subscribe(rb, topic::AbstractString, fn::Function, from=Rembus.Now)
subscribe(rb, fn::Function, from=Rembus.Now)
# for more details
help?> subscribe
Declare interest for messages published on the topic
logical channel.
If the topic
is not specified the function fn
is subscribed to the topic of the same name of the function.
To enable the reception of published messages, reactive
must be called.
function mytopic(x, y)
@info "consuming x=$x and y=$y"
end
rb = connect()
subscribe(rb, mytopic)
reactive(rb)
By default subscribe
will consume messages published after the component connect to the broker, messages sent previously are lost.
For receiving messages when the component was offline it is mandatory to set a component name and to declare interTYPEDSIGNATURESest in old messages with the from
argument set to LastReceived
:
rb = connect("myname")
subscribe(rb, mytopic, LastReceived)
reactive(rb)
The subscribed function will be called each time a component produce a message with the@publish
macro or the publish
function.
unsubscribe
unsubscribe(rb::Twin, topic::AbstractString)
unsubscribe(rb::Twin, fn::Function)
Stop the function to receive messages produced with publish
or @publish
.
publish
publish(rb::Twin, topic::AbstractString, data...; qos=Rembus.QOS0)
# for more details
help?> subscribe
Publish a message on the topic
channel.
rb = connect()
publish(rb, "metric", Dict("name"=>"trento/castello", "var"=>"T", "value"=>21.0))
close(rb)
metric
is the message topic and the Dict
value is the message content.
If the subscribed method expects many arguments send the values as a Vararg
list:
# subscriber side
function my_topic(x,y,z)
@assert x == 1
@assert y == 2
@assert z == 3
end
# publisher side
publish(rb, "my_topic", 1, 2, 3)
reactive
reactive(
rb::Twin,
from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
)
# for more details
help?> reactive
Enable the reception of published messages from subscribed topics.
Reactiveness is a property of a component and is applied to all subscribed topics.
unreactive
unreactive(rb::Twin)
Stop receiving published messages.
wait
wait(rb::Twin)
Needed for components that expose and/or subscribe methods. Wait forever for rpc requests or pub/sub messages.
inject
inject(rb::Twin, state::Any)
Bind a state object to the component.
inject
is handy when a state must be shared between the subscribed methods, the exposed methods and the application.
When a state is injected two additional arguments are passed to the subscribed/exposed methods:
- the first argument is the state value;
- the second argument is the node handle;
The following example shows how to use a shared state:
- the struct
MyState
manages the state; - the
inject
method binds the state object to the component; - the subscribed and the exposed method must declare as first argument the state object and as second argument the node handle;
mutable struct MyState
counter::UInt
data::Dict()
MyState() = new(0, Dict())
end
mystate = MyState()
function add_metric(mystate::MyState, handle::RBHandle, measure)
mystate.counter += 1 # count the received measures
try
indicator = measure["name"]
mystate.data[indicator] = measure["value"]
catch e
@error "metrics: $e"
end
end
function fetch_metrics(mystate)
return mystate.data
end
rb = connect("ingestor")
inject(rb, mystate)
# declare interest to messages produced with
# publish(rb, "add_metric", Dict("name"=>"pressure", "value"=>1.5))
subscribe(rb, add_metric)
# implement a service that may be requested with
# rpc(rb, "fetch_metrics")
expose(rb, fetch_metrics)
wait(rb)
close
close(rb::Twin)
Close the network connections associated with the rb
handle and terminate the supervised processes related to the handle.
close(rb)
shutdown
shutdown(rb::Twin)
Terminate all the active supervised processes: The method shutdown(rb)
is equivalent to close(rb)
.