Rembus API

This API provides both approaches to connection handling:

  • automatic reconnection in case of network failures
  • exception throwing in case of network errors and reconnection explicitly managed by the application.

Rembus API functions:

component

Connect to the broker and return a Visor process handle used by the other APIs for exchanging data and commands.

In case of connection lost the underlying supervision logic attempts to reconnect to the broker until it succeed.

rb = component("ws://hostname:8000/mycomponent")

The Macro-based API page documents the URL format.

connect

Connect to the broker and return a connection handle used by the other APIs for exchanging data and commands.

The URL string passed to connect contains the address of a broker, the transport protocol, the port and optionally a persistent unique name for the component.

rb = connect("ws://hostname:8000/mycomponent")

The Macro-based API page documents the URL format.

expose

Take a Julia function and exposes all of its the methods.

function myservice(df::DataFrame)
    ...
    return another_dataframe
end

function myservice(map::Dict)
    ...
    return 0
end

expose(rb, myservice)

The exposed function will became available to RPC clients using the @rpc macro or the rpc function.

unexpose

Stop serving remote requests via rpc or @rpc.

unexpose(rb, myservice)

rpc

Request a remote method and wait for a response.

response = rpc(rb, "my_service", Dict("name"=>"foo", "tickets"=>3))

The service name and the arguments are CBOR-encoded and transported to the remote site and the method my_service that expects a Dict as argument is called.

The return value of my_service is transported back to the RPC client calling site and taken as the return value of rpc.

If the remote method throws an Exception then the local RPC client will throw either an Exception reporting the reason of the remote error.

If the exposed method expects many arguments send an array of values, where each value is an argument:

# exposer side
function my_service(x,y,z)
    @assert x == 1
    @assert y == 2
    @assert z == 3
    return x+y+z
end

# rpc client side
rpc(rb, "my_service", [1, 2, 3])

subscribe

Declare interest for messages published on a logical channel: the topic.

The subscribed Julia methods are named as the topic of interest.

function mytopic(x, y)
    @info "consuming x=$x and y=$y"
end

connect()

subscribe(rb, mytopic)

forever() # or until Ctrl-C 

By default subscribe will consume messages published after the component connect to the broker, messages sent previously are lost.

For receiving messages when the component was offline it is mandatory to set a component name and to declare interest in old messages with the from argument set to LastReceived():

connect("myname")

subscribe(rb, mytopic, from=LastReceived())

forever() # or until Ctrl-C

NOTE By design messages are not persisted until a component declares

interest for a topic. In other words the persistence feature for a topic is enabled at the time of first subscription. If is important not to loose any message the rule is subscribe first and publish after.

The subscribed function will be called each time a component produce a message with the@publish macro or the publish function.

unsubscribe

Stop the function to receive messages produced with publish or @publish.

unsubscribe(rb, myservice)

publish

Publish a message:

rb = connect()

publish(rb, "metric", Dict("name"=>"trento/castello", "var"=>"T", "value"=>21.0))

close(rb)

metric is the message topic and the Dict value is the message content.

If the subscribed method expects many arguments send an array of values, where each value is an argument:

# subscriber side
function my_topic(x,y,z)
    @assert x == 1
    @assert y == 2
    @assert z == 3
end

# publisher side
publish(rb, "my_topic", [1, 2, 3])

reactive

Enable the reception of published messages from subscribed topics.

reactive(rb)

Reactiveness is a property of a component and is applied to all subscribed topics.

The forever function starts the loop that listen for published messages and by default the reactive mode is enabled.

unreactive

Stop receiving published messages.

unreactive(rb)

forever

Needed for components that expose and/or subscribe methods. Wait forever for rpc requests or pub/sub messages.

By default forever enable component reactiveness, see reactive.

inject

inject is handy when a state must be shared between the subscribed methods, the exposed methods and the application.

When a state is injected two additional arguments are passed to the subscribed/exposed methods:

  • the first argument is the state value;
  • the second argument is the node handle;

The following example shows how to use a shared state:

  • the struct MyState manages the state;
  • the inject method binds the state object to the component;
  • the subscribed and the exposed method must declare as first argument the state object and as second argument the node handle;
mutable struct MyState
    counter::UInt
    data::Dict()
    MyState() = new(0, Dict())
end

mystate = MyState()

function add_metric(mystate::MyState, handle::RBHandle, measure)
    mystate.counter += 1 # count the received measures

    try
        indicator = measure["name"]
        mystate.data[indicator] = measure["value"]
    catch e
        @error "metrics: $e"
    end
end

function fetch_metrics(mystate)
    return mystate.data
end

rb = connect("ingestor")
inject(rb, mystate)

# declare interest to messages produced with
# publish(rb, "add_metric", Dict("name"=>"pressure", "value"=>1.5))
subscribe(rb, add_metric) 

# implement a service that may be requested with
# rpc(rb, "fetch_metrics")
expose(rb, fetch_metrics)

forever()

close

Close the network connection.

close(rb)

NOTE: close applies to connections setup by connect api.

terminate

Close the network connection and shutdown the supervised process associated with the component.

terminate(rb)

NOTE: terminate applies to connections setup by component api.