flowchart LR
C(("Client")) --> S((Server))
style C fill:#009E73, color:white
style S fill:#0072B2, color:white
Julia
Getting Started
Install the Rembus package and wait for the download and precompilation steps to complete:
julia> import Pkg; Pkg.add("Rembus")
Resolving package versions...
Installed Rembus ─ v1.0.0
...
Precompiling project...
1 dependency successfully precompiled in 73 seconds. 72 already precompiled.
julia>Rembus Node Types
Rembus defines three node types:
C Component: connects to a
BrokerorServer.B Broker: accepts connections and routes RPC and Pub/Sub messages between nodes.
S Server: accepts connections from other nodes.
Rembus Addressing
Addressing consists of two parts:
- The name of a node;
- The address of a remote node accepting connections;
Both are combined into a string that defines a node and its network link.
Example: A Component named my_node connecting to a Broker listening on WebSockets at port 8000:
rb = component("ws://localhost:8000/my_node")Rembus provides sensible defaults:
- Protocol:
ws - Host:
127.0.0.1 - Port:
8000
So the following is equivalent:
rb = component("my_node")Rembus Hello World
# 🔵 server.jl
using Rembus
greet(who) = "ciao $who"
rb = component(name="server")
expose(rb, greet)
wait(rb)# 🟢 client.jl
using Rembus
rb = component("client")
rpc(rb, "greet", "mondo")The greet service accepts an argument of type Any. This means you can pass values other than strings:
rpc(rb, "greet", 3.14))
rpc(rb, "greet", ["apple", "banana"]))
rpc(rb, "greet", Dict("name"=>"duck"))You you want stricter data validation, declare argument types:
greet(who::AbstractString) = "ciao $who"Now only string arguments are accepted. Passing another type will raise a remote exception:
rpc(rb, "greet", 3.14)ERROR: RpcMethodException("greet", "MethodError(Main.greet, (3.14,), 0x...)")Handling Disconnections
Rembus components automatically handle disconnection events.
If the server stops on the client side you have:
isopen(rb)
false
rpc(rb, "greet", "mondo")
ERROR: connection downWhen the server restarts, the client handle reconnects in background:
rpc(rb, "greet", "mondo")
"ciao mondo"Publish-Subscribe
flowchart LR
C(("Publisher")) --> S1((Subscriber 1))
C(("Publisher")) --> S2((Subscriber 2))
style C fill:#009E73, color:white
style S1 fill:#0072B2, color:white
style S2 fill:#0072B2, color:white
Subscriber 1
# 🔵 sub-1.jl
using Rembus
mytopic(data) = println("[Sub-1] mytopic:$data")
foo() = "sub-1.jl"
rb = component(name="sub-1", ws=3001)
subscribe(rb, mytopic)
expose(rb, foo)
println("up and running")
wait(rb)Subscriber 2
# 🔵 sub-2.jl
using Rembus
mytopic(data) = println("[Sub-2] mytopic:$data")
foo() = "sub-2.jl"
rb = component(name="sub-2", ws=3002)
subscribe(rb, mytopic)
expose(rb, foo)
println("up and running")
wait(rb)Publisher 1
# 🟢 publish.jl
using Rembus
rb = component([
"ws://:3001/client",
"ws://:3002/client"
])
sensor1 = Dict(
"T" => 18.3, "H" => 45.2
)
sensor2 = Dict(
"P" => 2.3
)
publish(
rb,
"mytopic",
Dict(
"sensor#1" => sensor1,
"sensor#2" => sensor2,
)
)A simple Broker example
flowchart LR
C(("Client")) --> B((Broker))
B --> S((Server))
style C fill:#009E73, color:white
style B fill:#F0E442, color:black
style S fill:#0072B2, color:white
B Broker
Run the B broker from the terminal:
shell> julia -e 'using Rembus; Rembus.brokerd()'Or in the REPL:
using Rembus
rb = component()By default the broker accepts WebSocket connections on port 8000. Other endpoints (TCP, ZeroMQ, HTTP) are also supported:
julia -e 'using Rembus; Rembus.brokerd()' -- --ws 8000 --tcp 8001 --zmq 8002 --http 9000
[ Info: [serve_http] starting at port 9000
[ Info: Listening on: 0.0.0.0:9000, thread id: 1Or in the REPL:
using Rembus
rb = component(ws=8000, tc=8001, zmq=8002, http=9000)If the http endpoint is active you can check that the broker is up and running with JSON-RPC:
sh> curl -X POST http://localhost:9000 \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","method":"uptime","id":"1234"}'
sh> {"id":"1234","jsonrpc":"2.0","result":"up for 15 seconds"} S Server
A server component exposes RPC services by defining functions.
Example: a stats service for a DataFrame:
using Statistics
using Rembus
# Return a stat summary of dataframes
# values from the value column.
function stats(df)
return Dict(
"min" => min(df.value...),
"max" => max(df.value...),
"mean" => mean(df.value),
"std" => std(df.value)
)
end
@expose statsThe @expose macro makes the stats method available to other components.
The following example shows a Julia component that requests the stats service and in the Python section there is the equivalent RPC request for a pandas dataframe.
C Client
A client can call stats using @rpc:
using DataFrames
using Rembus
df = DataFrame(
:name=>["kpi_$i" for i in 1:5],
:ts=>1:5,
:value=>rand(5)
)
summary = @rpc stats(df)As a side note keep in mind that the broker implements the version and uptime RPC services that unsurprisingly returns the broker version and the time in seconds since last reboot.
They may be useful when a C client wants to verify that the B broker is up and running:
using Rembus
@rpc version()
"0.2.0"
@rpc uptime()
"up for 15 seconds"Brokers in Series
flowchart LR
C(("Client")) --> B1((Broker1))
B1 --> B2((Broker2))
B2 --> S((Server))
style C fill:#009E73, color:white
style B1 fill:#F0E442, color:black
style B2 fill:#F0E442, color:black
style S fill:#0072B2, color:white
# 🟢 client.jl
using Rembus
rb = component("ws://localhost:3001/cli")
response = rpc(rb, "foo", 12.0)
close(rb)# 🟡 broker1.jl
using Rembus
rb = component("ws://localhost:3002/broker1", ws=3001)
wait(rb)# 🟡 broker2.jl
using Rembus
rb = component(ws=3002)
wait(rb)# 🔵 server.jl
using Rembus
foo(x) = 2x
rb = component("ws://localhost:3002/srv")
expose(rb, foo)
wait(rb)terminal-1> j broker-1.jl
terminal-2> j broker-2.jlBrokers in Parallel
flowchart LR
C(("Client")) --> B1((Broker1))
C(("Client")) --> B2((Broker2))
B1 --> S((Server))
B2 --> S((Server))
style C fill:#009E73, color:white
style B1 fill:#F0E442, color:black
style B2 fill:#F0E442, color:black
style S fill:#0072B2, color:white
# 🟢 client.jl
using Rembus
rb = component(["ws://localhost:3001/cli", "ws://localhost:3002/cli"])
rpc(rb, "foo", 1.0)# 🟡 broker1.jl
using Rembus
rb = component(ws=3001)
wait(rb)# 🟡 broker2.jl
using Rembus
rb = component(ws=3002)
wait(rb)# 🔵 server.jl
using Rembus
foo(x) = 2x
rb = component(["ws://localhost:3001/srv", "ws://localhost:3002/srv"])
expose(rb, foo)
wait(rb)Security
Rembus Authentication supports:
- RSA digital signature
- ECDSA digital signature
- Shared secret (plaintext)
Autorization is based on private topics accessible only to entitled components.
The register/unregister methods handle automatic provisioning and deprovisioning.
See the Security documentation for details.
Settings
Rembus behavior is configurable via environment variables and settings.json file.
These control parameters such as:
- Data directory location
- Message timeouts
- Keep-alives
- Connection handling
See the Configuration for more details.