Julia

Getting Started

Install the Rembus package and wait for the download and precompilation steps to complete:

julia> import Pkg; Pkg.add("Rembus")
   Resolving package versions...
   Installed Rembus ─ v1.0.0
   ...
Precompiling project...
  1 dependency successfully precompiled in 73 seconds. 72 already precompiled.

julia>

Rembus Node Types

Rembus defines three node types:

  • C Component: connects to a Broker or Server.

  • B Broker: accepts connections and routes RPC and Pub/Sub messages between nodes.

  • S Server: accepts connections from other nodes.

Rembus Addressing

Addressing consists of two parts:

  • The name of a node;
  • The address of a remote node accepting connections;

Both are combined into a string that defines a node and its network link.

Example: A Component named my_node connecting to a Broker listening on WebSockets at port 8000:

rb = component("ws://localhost:8000/my_node")

Rembus provides sensible defaults:

  • Protocol: ws
  • Host: 127.0.0.1
  • Port: 8000

So the following is equivalent:

rb = component("my_node")

Rembus Hello World

flowchart LR
  C(("Client")) --> S((Server))
  style C fill:#009E73, color:white
  style S fill:#0072B2, color:white

# 🔵 server.jl
using Rembus

greet(who) = "ciao $who"

rb = component(name="server")
expose(rb, greet)
wait(rb)
# 🟢 client.jl
using Rembus

rb = component("client")

rpc(rb, "greet", "mondo")

The greet service accepts an argument of type Any. This means you can pass values other than strings:

rpc(rb, "greet", 3.14))
rpc(rb, "greet", ["apple", "banana"]))
rpc(rb, "greet", Dict("name"=>"duck"))

You you want stricter data validation, declare argument types:

greet(who::AbstractString) = "ciao $who"

Now only string arguments are accepted. Passing another type will raise a remote exception:

rpc(rb, "greet", 3.14)
ERROR: RpcMethodException("greet", "MethodError(Main.greet, (3.14,), 0x...)")

Handling Disconnections

Rembus components automatically handle disconnection events.

If the server stops on the client side you have:

isopen(rb)
false

rpc(rb, "greet", "mondo")
ERROR: connection down

When the server restarts, the client handle reconnects in background:

rpc(rb, "greet", "mondo")
"ciao mondo"

Publish-Subscribe

flowchart LR
  C(("Publisher")) --> S1((Subscriber 1))
  C(("Publisher")) --> S2((Subscriber 2))
  style C fill:#009E73, color:white
  style S1 fill:#0072B2, color:white
  style S2 fill:#0072B2, color:white

Subscriber 1

# 🔵 sub-1.jl
using Rembus

mytopic(data) = println("[Sub-1] mytopic:$data")

foo() = "sub-1.jl"

rb = component(name="sub-1", ws=3001)
subscribe(rb, mytopic)
expose(rb, foo)

println("up and running")
wait(rb)

Subscriber 2

# 🔵 sub-2.jl
using Rembus

mytopic(data) = println("[Sub-2] mytopic:$data")

foo() = "sub-2.jl"

rb = component(name="sub-2", ws=3002)
subscribe(rb, mytopic)
expose(rb, foo)

println("up and running")
wait(rb)

Publisher 1

# 🟢 publish.jl
using Rembus

rb = component([
  "ws://:3001/client",
  "ws://:3002/client"
])

sensor1 = Dict(
  "T" => 18.3, "H" => 45.2
)
sensor2 = Dict(
  "P" => 2.3
)

publish(
    rb,
    "mytopic",
    Dict(
        "sensor#1" => sensor1,
        "sensor#2" => sensor2,
    )
)

A simple Broker example

flowchart LR
  C(("Client")) --> B((Broker))
  B --> S((Server))
  style C fill:#009E73, color:white
  style B fill:#F0E442, color:black
  style S fill:#0072B2, color:white

B Broker

Run the B broker from the terminal:

shell> julia -e 'using Rembus; Rembus.brokerd()'

Or in the REPL:

using Rembus

rb = component()

By default the broker accepts WebSocket connections on port 8000. Other endpoints (TCP, ZeroMQ, HTTP) are also supported:

julia -e 'using Rembus; Rembus.brokerd()' -- --ws 8000 --tcp 8001 --zmq 8002 --http 9000
[ Info: [serve_http] starting at port 9000
[ Info: Listening on: 0.0.0.0:9000, thread id: 1

Or in the REPL:

using Rembus

rb = component(ws=8000, tc=8001, zmq=8002, http=9000)

If the http endpoint is active you can check that the broker is up and running with JSON-RPC:

sh> curl -X POST http://localhost:9000 \
  -H "Content-Type: application/json" \
  -d '{"jsonrpc":"2.0","method":"uptime","id":"1234"}'

sh> {"id":"1234","jsonrpc":"2.0","result":"up for 15 seconds"}  

S Server

A server component exposes RPC services by defining functions.

Example: a stats service for a DataFrame:

using Statistics
using Rembus

# Return a stat summary of dataframes
# values from the value column.
function stats(df)
    return Dict(
        "min" => min(df.value...),
        "max" => max(df.value...),
        "mean" => mean(df.value),
        "std" => std(df.value)
    )
end

@expose stats

The @expose macro makes the stats method available to other components.

The following example shows a Julia component that requests the stats service and in the Python section there is the equivalent RPC request for a pandas dataframe.

C Client

A client can call stats using @rpc:

using DataFrames
using Rembus

df = DataFrame(
  :name=>["kpi_$i" for i in 1:5],
  :ts=>1:5, 
  :value=>rand(5)
)

summary = @rpc stats(df)

As a side note keep in mind that the broker implements the version and uptime RPC services that unsurprisingly returns the broker version and the time in seconds since last reboot.

They may be useful when a C client wants to verify that the B broker is up and running:

using Rembus

@rpc version()
"0.2.0"

@rpc uptime()
"up for 15 seconds"

Brokers in Series

flowchart LR
  C(("Client")) --> B1((Broker1))
  B1 --> B2((Broker2))
  B2 --> S((Server))
  style C fill:#009E73, color:white
  style B1 fill:#F0E442, color:black
  style B2 fill:#F0E442, color:black
  style S fill:#0072B2, color:white

# 🟢 client.jl
using Rembus

rb = component("ws://localhost:3001/cli")
response = rpc(rb, "foo", 12.0)

close(rb)
# 🟡 broker1.jl
using Rembus

rb = component("ws://localhost:3002/broker1", ws=3001)
wait(rb)
# 🟡 broker2.jl
using Rembus

rb = component(ws=3002)
wait(rb)
# 🔵 server.jl
using Rembus

foo(x) = 2x

rb = component("ws://localhost:3002/srv")
expose(rb, foo)

wait(rb)
terminal-1> j broker-1.jl
terminal-2> j broker-2.jl

Brokers in Parallel

flowchart LR
  C(("Client")) --> B1((Broker1))
  C(("Client")) --> B2((Broker2))
  B1 --> S((Server))
  B2 --> S((Server))
  style C fill:#009E73, color:white
  style B1 fill:#F0E442, color:black
  style B2 fill:#F0E442, color:black
  style S fill:#0072B2, color:white

# 🟢 client.jl
using Rembus

rb = component(["ws://localhost:3001/cli", "ws://localhost:3002/cli"])

rpc(rb, "foo", 1.0)
# 🟡 broker1.jl
using Rembus

rb = component(ws=3001)
wait(rb)
# 🟡 broker2.jl
using Rembus

rb = component(ws=3002)
wait(rb)
# 🔵 server.jl
using Rembus

foo(x) = 2x

rb = component(["ws://localhost:3001/srv", "ws://localhost:3002/srv"])
expose(rb, foo)

wait(rb)

Security

Rembus Authentication supports:

  • RSA digital signature
  • ECDSA digital signature
  • Shared secret (plaintext)

Autorization is based on private topics accessible only to entitled components.

The register/unregister methods handle automatic provisioning and deprovisioning.

See the Security documentation for details.

Settings

Rembus behavior is configurable via environment variables and settings.json file.

These control parameters such as:

  • Data directory location
  • Message timeouts
  • Keep-alives
  • Connection handling

See the Configuration for more details.