Rembus API
This API provides both approaches to connection handling:
- automatic reconnection in case of network failures
- exception throwing in case of network errors and reconnection explicitly managed by the application.
Rembus API functions:
- component
- connect
- expose
- unexpose
- rpc
- subscribe
- unsubscribe
- publish
- put
- reactive
- unreactive
- wait
- inject
- close
- shutdown
component
component(
url::AbstractString;
ws=nothing,
tcp=nothing,
zmq=nothing,
name=missing,
secure=false,
authenticated=false,
policy="first_up",
enc=Rembus.CBOR,
failovers=[]
) -> Twin
# for more details
help?> componentStart a component and join the network of Rembus nodes.
Connected Component
rb = component("ws://hostname:8000/mycomponent")Connect to a broker that listens at the connection point ws://hostname:8000 and return the rb handle used by the other APIs for exchanging data and commands.
In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.
See Component for URL format details.
Broker
rb = component(ws=8000, tcp=8001)Start a broker that listens on the web socket port 8000 and on the TCP port 8001. The broker will accept connections from other components.
Broker and Connected Component
This is an advanced pattern that allows to create a component that is also a broker and that is able to connect to another broker. This pattern is useful for creating a component that is able to act as a proxy between two brokers or to create a component that is able to connect to a broker and at the same time to act as a broker for other components.
rb = component("ws://hostname:8000/mycomponent", ws=9000)Start a broker that listens on the WebSocket port 9000 and connect to a broker defined at the connection point ws://hostname:8000.
connect
connect(url::AbstractString) -> TwinConnect to the broker and return a connection handle used by the other APIs for exchanging data and commands.
The URL string passed to connect contains the address of a broker, the transport protocol, the port and optionally a persistent unique name for the component.
A disconnection from the remote endpoint will not trigger automatic reconnection, for example:
rb = connect("ws://hostname:8000/mycomponent")Connects to a broker that listens at the connection point ws://hostname:8000 and returns the rb handle used by the other APIs for exchanging data and commands.
If the broker is not reachable the connect function will throw an Exception and if the connection is lost at a later time the rb handle becomes disconnected. The status of a component can be checked with the isopen method:
isopen(rb)expose
expose(rb, name::AbstractString, fn::Function)
expose(rb, fn::Function)Take a Julia function and exposes all of its the methods.
function myservice(df::DataFrame)
...
return another_dataframe
end
function myservice(map::Dict)
...
return 0
end
expose(rb, myservice)The exposed function will became available to RPC clients using the @rpc macro or the rpc function.
unexpose
unexpose(rb, topic::AbstractString)
unexpose(rb, fn::Function)Stop serving remote requests via rpc or @rpc.
rpc
rpc(
rb::Twin,
service::AbstractString,
data...
)
# for more details
help?> rpcRequest a remote method and wait for a response.
response = rpc(rb, "my_service", Dict("name"=>"foo", "tickets"=>3))The service name and the arguments are CBOR-encoded and transported to the remote site and the method my_service that expects a Dict as argument is called.
The return value of my_service is transported back to the RPC client calling site and taken as the return value of rpc.
If the remote method throws an Exception then the local RPC client will throw either an Exception reporting the reason of the remote error.
If the exposed method expects many arguments send an array of values, where each value is an argument:
# exposer side
function my_service(x,y,z)
@assert x == 1
@assert y == 2
@assert z == 3
return x+y+z
end
# rpc client side
rpc(rb, "my_service", [1, 2, 3])subscribe
subscribe(rb, topic::AbstractString, fn::Function, from=Rembus.Now)
subscribe(rb, fn::Function, from=Rembus.Now)
# for more details
help?> subscribeDeclare interest for messages published on the topic logical channel.
If the topic is not specified the function fn is subscribed to the topic of the same name of the function.
The subscribed function will be called each time a component produce a message with the@publish macro or the publish function.
To enable the reception of published messages, reactive must be called.
function mytopic(x, y)
@info "consuming x=$x and y=$y"
end
rb = connect()
subscribe(rb, mytopic)
reactive(rb) The from (default=Rembus.Now) argument defines the starting point in time from which messages published while the component was offline will be sent upon reconnection. Possible from values:
Rembus.Now: Equivalent to0.0, ignores old messages, and starts receiving only new messages from now.Rembus.LastReceived: Receives all messages published since the last disconnection.n::Float64: Receives messages published within the lastnseconds.Dates.CompoundPeriod: Defines a custom period using aCompoundPeriodvalue.
Example
rb = connect("myname")
subscribe(rb, mytopic, Rembus.LastReceived)
reactive(rb) Receive all messages published since the last component disconnection.
unsubscribe
unsubscribe(rb::Twin, topic::AbstractString)
unsubscribe(rb::Twin, fn::Function)Stop the function to receive messages produced with publish or @publish.
publish
publish(rb::Twin, topic::AbstractString, data...; qos=Rembus.QOS0)Publish one or more data values to a topic.
Each element in data is passed as an argument to the remote method subscribed to topic.
The data arguments may be of any type. For interoperability across components implemented in different languages, the values should be either DataFrames or primitive types that are CBOR-encodable (see RFC 8949).
The keyword argument qos specifies the quality of service (QoS) for message delivery:
Rembus.QOS0(default): At most once delivery. A message may be delivered zero or one time.Rembus.QOS1: At least once delivery. A message is delivered one or more times.Rembus.QOS2: Exactly once delivery. A message is delivered once and only once.
Examples
If the subscriber is a method that expects two arguments:
mytopic(x, y) = ... # do something with x and yYou can publish a message with two data values::
rb = component("myname")
publish(rb, "mytopic", 1, 2)If the remote subscribed method has no arguments, publish without data:
publish(rb, "mytopic")put
put(rb::Twin, topic::AbstractString, data...; qos=Rembus.QOS0)Publish one or more data values to a channel identified by topic, which is automatically prefixed with the component name.
Each element in data is passed as an argument to the remote method subscribed to topic.
The data arguments may be of any type. For interoperability across components implemented in different languages, the values should be either DataFrames or primitive types that are CBOR-encodable (see RFC 8949).
The keyword argument qos specifies the quality of service (QoS) for message delivery:
Rembus.QOS0(default): At most once delivery. A message may be delivered zero or one time.Rembus.QOS1: At least once delivery. A message is delivered one or more times.Rembus.QOS2: Exactly once delivery. A message is delivered once and only once.
Example
rb = component("myname")
put(rb, "mytopic", 1, 2)This publishes the values 1 and 2 to the topic myname/mytopic.
reactive
reactive(
rb::Twin,
from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
)
# for more details
help?> reactiveEnable the reception of published messages from subscribed topics.
Reactiveness is a property of a component and is applied to all subscribed topics.
unreactive
unreactive(rb::Twin)Stop receiving published messages.
wait
wait(rb::Twin)Needed for components that expose and/or subscribe methods. Wait forever for rpc requests or pub/sub messages.
inject
inject(rb::Twin, state::Any)Bind a state object to the component.
inject is handy when a state must be shared between the subscribed methods, the exposed methods and the application.
When a state is injected two additional arguments are passed to the subscribed/exposed methods:
- the first argument is the state value;
- the second argument is the node handle;
The following example shows how to use a shared state:
- the struct
MyStatemanages the state; - the
injectmethod binds the state object to the component; - the subscribed and the exposed method must declare as first argument the state object and as second argument the node handle;
mutable struct MyState
counter::UInt
data::Dict()
MyState() = new(0, Dict())
end
mystate = MyState()
function add_metric(mystate::MyState, handle::RBHandle, measure)
mystate.counter += 1 # count the received measures
try
indicator = measure["name"]
mystate.data[indicator] = measure["value"]
catch e
@error "metrics: $e"
end
end
function fetch_metrics(mystate)
return mystate.data
end
rb = connect("ingestor")
inject(rb, mystate)
# declare interest to messages produced with
# publish(rb, "add_metric", Dict("name"=>"pressure", "value"=>1.5))
subscribe(rb, add_metric)
# implement a service that may be requested with
# rpc(rb, "fetch_metrics")
expose(rb, fetch_metrics)
wait(rb)close
close(rb::Twin)Close the network connections associated with the rb handle and terminate the supervised processes related to the handle.
close(rb)shutdown
shutdown(rb::Twin)Terminate all the active supervised processes: The method shutdown(rb) is equivalent to close(rb).