Rembus API

This API provides both approaches to connection handling:

  • automatic reconnection in case of network failures
  • exception throwing in case of network errors and reconnection explicitly managed by the application.

Rembus API functions:

component

component(
    url::AbstractString;
    ws=nothing,
    tcp=nothing,
    zmq=nothing,
    name=missing,
    secure=false,
    authenticated=false,
    policy="first_up",
    failovers=[]
) -> Twin

# for more details
help?> component

Start a component and join the network of Rembus nodes.

Connected Component

rb = component("ws://hostname:8000/mycomponent")

Connect to a broker that listens at the connection point ws://hostname:8000 and return the rb handle used by the other APIs for exchanging data and commands.

In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.

See Connected Components for URL format details.

Broker

rb = component(ws=8000, tcp=8001)

Start a broker that listens on the web socket port 8000 and on the TCP port 8001. The broker will accept connections from other components.

Broker and Connected Component

This is an advanced pattern that allows to create a component that is also a broker and that is able to connect to another broker. This pattern is useful for creating a component that is able to act as a proxy between two brokers or to create a component that is able to connect to a broker and at the same time to act as a broker for other components.

rb = component("ws://hostname:8000/mycomponent", ws=9000)

Start a broker that listens on the WebSocket port 9000 and connect to a broker defined at the connection point ws://hostname:8000.

connect

connect(url::AbstractString) -> Twin

Connect to the broker and return a connection handle used by the other APIs for exchanging data and commands.

The URL string passed to connect contains the address of a broker, the transport protocol, the port and optionally a persistent unique name for the component.

A disconnection from the remote endpoint will not trigger automatic reconnection, for example:

rb = connect("ws://hostname:8000/mycomponent")

Connects to a broker that listens at the connection point ws://hostname:8000 and returns the rb handle used by the other APIs for exchanging data and commands.

If the broker is not reachable the connect function will throw an Exception and if the connection is lost at a later time the rb handle becomes disconnected. The status of a component can be checked with the isopen method:

isopen(rb)

expose

expose(rb, name::AbstractString, fn::Function)
expose(rb, fn::Function)

Take a Julia function and exposes all of its the methods.

function myservice(df::DataFrame)
    ...
    return another_dataframe
end

function myservice(map::Dict)
    ...
    return 0
end

expose(rb, myservice)

The exposed function will became available to RPC clients using the @rpc macro or the rpc function.

unexpose

unexpose(rb, topic::AbstractString)
unexpose(rb, fn::Function)

Stop serving remote requests via rpc or @rpc.

rpc

rpc(
    rb::Twin,
    service::AbstractString,
    data...
)

# for more details
help?> rpc

Request a remote method and wait for a response.

response = rpc(rb, "my_service", Dict("name"=>"foo", "tickets"=>3))

The service name and the arguments are CBOR-encoded and transported to the remote site and the method my_service that expects a Dict as argument is called.

The return value of my_service is transported back to the RPC client calling site and taken as the return value of rpc.

If the remote method throws an Exception then the local RPC client will throw either an Exception reporting the reason of the remote error.

If the exposed method expects many arguments send an array of values, where each value is an argument:

# exposer side
function my_service(x,y,z)
    @assert x == 1
    @assert y == 2
    @assert z == 3
    return x+y+z
end

# rpc client side
rpc(rb, "my_service", [1, 2, 3])

subscribe

subscribe(rb, topic::AbstractString, fn::Function, from=Rembus.Now)

subscribe(rb, fn::Function, from=Rembus.Now)

# for more details
help?> subscribe

Declare interest for messages published on the topic logical channel.

If the topic is not specified the function fn is subscribed to the topic of the same name of the function.

To enable the reception of published messages, reactive must be called.

function mytopic(x, y)
    @info "consuming x=$x and y=$y"
end

rb = connect()

subscribe(rb, mytopic)

reactive(rb) 

By default subscribe will consume messages published after the component connect to the broker, messages sent previously are lost.

For receiving messages when the component was offline it is mandatory to set a component name and to declare interTYPEDSIGNATURESest in old messages with the from argument set to LastReceived:

rb = connect("myname")

subscribe(rb, mytopic, LastReceived)

reactive(rb) 

The subscribed function will be called each time a component produce a message with the@publish macro or the publish function.

unsubscribe

unsubscribe(rb::Twin, topic::AbstractString)
unsubscribe(rb::Twin, fn::Function)

Stop the function to receive messages produced with publish or @publish.

publish

publish(rb::Twin, topic::AbstractString, data...; qos=Rembus.QOS0)

# for more details
help?> subscribe

Publish a message on the topic channel.

rb = connect()

publish(rb, "metric", Dict("name"=>"trento/castello", "var"=>"T", "value"=>21.0))

close(rb)

metric is the message topic and the Dict value is the message content.

If the subscribed method expects many arguments send the values as a Vararg list:

# subscriber side
function my_topic(x,y,z)
    @assert x == 1
    @assert y == 2
    @assert z == 3
end

# publisher side
publish(rb, "my_topic", 1, 2, 3)

reactive

reactive(
    rb::Twin,
    from::Union{Real,Period,Dates.CompoundPeriod}=Day(1),
)

# for more details
help?> reactive

Enable the reception of published messages from subscribed topics.

Reactiveness is a property of a component and is applied to all subscribed topics.

unreactive

unreactive(rb::Twin)

Stop receiving published messages.

wait

wait(rb::Twin)

Needed for components that expose and/or subscribe methods. Wait forever for rpc requests or pub/sub messages.

inject

inject(rb::Twin, state::Any)

Bind a state object to the component.

inject is handy when a state must be shared between the subscribed methods, the exposed methods and the application.

When a state is injected two additional arguments are passed to the subscribed/exposed methods:

  • the first argument is the state value;
  • the second argument is the node handle;

The following example shows how to use a shared state:

  • the struct MyState manages the state;
  • the inject method binds the state object to the component;
  • the subscribed and the exposed method must declare as first argument the state object and as second argument the node handle;
mutable struct MyState
    counter::UInt
    data::Dict()
    MyState() = new(0, Dict())
end

mystate = MyState()

function add_metric(mystate::MyState, handle::RBHandle, measure)
    mystate.counter += 1 # count the received measures

    try
        indicator = measure["name"]
        mystate.data[indicator] = measure["value"]
    catch e
        @error "metrics: $e"
    end
end

function fetch_metrics(mystate)
    return mystate.data
end

rb = connect("ingestor")
inject(rb, mystate)

# declare interest to messages produced with
# publish(rb, "add_metric", Dict("name"=>"pressure", "value"=>1.5))
subscribe(rb, add_metric) 

# implement a service that may be requested with
# rpc(rb, "fetch_metrics")
expose(rb, fetch_metrics)

wait(rb)

close

close(rb::Twin)

Close the network connections associated with the rb handle and terminate the supervised processes related to the handle.

close(rb)

shutdown

shutdown(rb::Twin)

Terminate all the active supervised processes: The method shutdown(rb) is equivalent to close(rb).