flowchart LR C(("Client")) --> S((Server)) style C fill:#009E73, color:white style S fill:#0072B2, color:white
Julia
Getting Started
Install the Rembus package and wait for the download and precompilation steps to complete:
julia> import Pkg; Pkg.add("Rembus")
Resolving package versions...
Installed Rembus ─ v1.0.0
...
Precompiling project...
1 dependency successfully precompiled in 73 seconds. 72 already precompiled.
julia>
Rembus Node Types
Rembus defines three node types:
C Component: connects to a
Broker
orServer
.B Broker: accepts connections and routes RPC and Pub/Sub messages between nodes.
S Server: accepts connections from other nodes.
Rembus Addressing
Addressing consists of two parts:
- The name of a node;
- The address of a remote node accepting connections;
Both are combined into a string that defines a node and its network link.
Example: A Component
named my_node
connecting to a Broker
listening on WebSockets at port 8000
:
= component("ws://localhost:8000/my_node") rb
Rembus provides sensible defaults:
- Protocol:
ws
- Host:
127.0.0.1
- Port:
8000
So the following is equivalent:
= component("my_node") rb
Rembus Hello World
# 🔵 server.jl
using Rembus
greet(who) = "ciao $who"
= component(name="server")
rb expose(rb, greet)
wait(rb)
# 🟢 client.jl
using Rembus
= component("client")
rb
rpc(rb, "greet", "mondo")
The greet
service accepts an argument of type Any
. This means you can pass values other than strings:
rpc(rb, "greet", 3.14))
rpc(rb, "greet", ["apple", "banana"]))
rpc(rb, "greet", Dict("name"=>"duck"))
You you want stricter data validation, declare argument types:
greet(who::AbstractString) = "ciao $who"
Now only string arguments are accepted. Passing another type will raise a remote exception:
rpc(rb, "greet", 3.14)
ERROR: RpcMethodException("greet", "MethodError(Main.greet, (3.14,), 0x...)")
Handling Disconnections
Rembus components automatically handle disconnection events.
If the server stops on the client side you have:
isopen(rb)
false
rpc(rb, "greet", "mondo")
: connection down ERROR
When the server restarts, the client handle reconnects in background:
rpc(rb, "greet", "mondo")
"ciao mondo"
Publish-Subscribe
flowchart LR C(("Publisher")) --> S1((Subscriber 1)) C(("Publisher")) --> S2((Subscriber 2)) style C fill:#009E73, color:white style S1 fill:#0072B2, color:white style S2 fill:#0072B2, color:white
Subscriber 1
# 🔵 sub-1.jl
using Rembus
mytopic(data) = println("[Sub-1] mytopic:$data")
foo() = "sub-1.jl"
= component(name="sub-1", ws=3001)
rb subscribe(rb, mytopic)
expose(rb, foo)
println("up and running")
wait(rb)
Subscriber 2
# 🔵 sub-2.jl
using Rembus
mytopic(data) = println("[Sub-2] mytopic:$data")
foo() = "sub-2.jl"
= component(name="sub-2", ws=3002)
rb subscribe(rb, mytopic)
expose(rb, foo)
println("up and running")
wait(rb)
Publisher 1
# 🟢 publish.jl
using Rembus
= component([
rb "ws://:3001/client",
"ws://:3002/client"
])
= Dict(
sensor1 "T" => 18.3, "H" => 45.2
)= Dict(
sensor2 "P" => 2.3
)
publish(
rb,"mytopic",
Dict(
"sensor#1" => sensor1,
"sensor#2" => sensor2,
) )
A simple Broker example
flowchart LR C(("Client")) --> B((Broker)) B --> S((Server)) style C fill:#009E73, color:white style B fill:#F0E442, color:black style S fill:#0072B2, color:white
B Broker
Run the B broker from the terminal:
shell> julia -e 'using Rembus; Rembus.brokerd()'
Or in the REPL:
using Rembus
= component() rb
By default the broker accepts WebSocket connections on port 8000
. Other endpoints (TCP, ZeroMQ, HTTP) are also supported:
julia -e 'using Rembus; Rembus.brokerd()' -- --ws 8000 --tcp 8001 --zmq 8002 --http 9000
[ Info: [serve_http] starting at port 9000
[ Info: Listening on: 0.0.0.0:9000, thread id: 1
Or in the REPL:
using Rembus
= component(ws=8000, tc=8001, zmq=8002, http=9000) rb
If the http
endpoint is active you can check that the broker is up and running with JSON-RPC:
sh> curl -X POST http://localhost:9000 \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"uptime","id":"1234"}'
sh> {"id":"1234","jsonrpc":"2.0","result":"up for 15 seconds"}
S Server
A server component exposes RPC services by defining functions.
Example: a stats
service for a DataFrame:
using Statistics
using Rembus
# Return a stat summary of dataframes
# values from the value column.
function stats(df)
return Dict(
"min" => min(df.value...),
"max" => max(df.value...),
"mean" => mean(df.value),
"std" => std(df.value)
)end
@expose stats
The @expose
macro makes the stats
method available to other components.
The following example shows a Julia component that requests the stats
service and in the Python section there is the equivalent RPC request for a pandas dataframe.
C Client
A client can call stats using @rpc:
using DataFrames
using Rembus
= DataFrame(
df :name=>["kpi_$i" for i in 1:5],
:ts=>1:5,
:value=>rand(5)
)
= @rpc stats(df) summary
As a side note keep in mind that the broker implements the version
and uptime
RPC services that unsurprisingly returns the broker version and the time in seconds since last reboot.
They may be useful when a C client wants to verify that the B broker is up and running:
using Rembus
@rpc version()
"0.2.0"
@rpc uptime()
"up for 15 seconds"
Brokers in Series
flowchart LR C(("Client")) --> B1((Broker1)) B1 --> B2((Broker2)) B2 --> S((Server)) style C fill:#009E73, color:white style B1 fill:#F0E442, color:black style B2 fill:#F0E442, color:black style S fill:#0072B2, color:white
# 🟢 client.jl
using Rembus
= component("ws://localhost:3001/cli")
rb = rpc(rb, "foo", 12.0)
response
close(rb)
# 🟡 broker1.jl
using Rembus
= component("ws://localhost:3002/broker1", ws=3001)
rb wait(rb)
# 🟡 broker2.jl
using Rembus
= component(ws=3002)
rb wait(rb)
# 🔵 server.jl
using Rembus
foo(x) = 2x
= component("ws://localhost:3002/srv")
rb expose(rb, foo)
wait(rb)
terminal-1> j broker-1.jl
terminal-2> j broker-2.jl
Brokers in Parallel
flowchart LR C(("Client")) --> B1((Broker1)) C(("Client")) --> B2((Broker2)) B1 --> S((Server)) B2 --> S((Server)) style C fill:#009E73, color:white style B1 fill:#F0E442, color:black style B2 fill:#F0E442, color:black style S fill:#0072B2, color:white
# 🟢 client.jl
using Rembus
= component(["ws://localhost:3001/cli", "ws://localhost:3002/cli"])
rb
rpc(rb, "foo", 1.0)
# 🟡 broker1.jl
using Rembus
= component(ws=3001)
rb wait(rb)
# 🟡 broker2.jl
using Rembus
= component(ws=3002)
rb wait(rb)
# 🔵 server.jl
using Rembus
foo(x) = 2x
= component(["ws://localhost:3001/srv", "ws://localhost:3002/srv"])
rb expose(rb, foo)
wait(rb)
Security
Rembus Authentication supports:
- RSA digital signature
- ECDSA digital signature
- Shared secret (plaintext)
Autorization is based on private topics accessible only to entitled components.
The register
/unregister
methods handle automatic provisioning and deprovisioning.
See the Security documentation for details.
Settings
Rembus behavior is configurable via environment variables and settings.json
file.
These control parameters such as:
- Data directory location
- Message timeouts
- Keep-alives
- Connection handling
See the Configuration for more details.