Visor
Visor is a tasks supervisor that provides a set of policies to control the logic for the shutdown and restart of tasks.
The scope of Visor
is to provide support for reliable, long-running and fault tolerant applications written in Julia.
Visor is influenced by Erlang Supervisor design principles with the aim to fit as "naturally" as possible into the Julia task system.
Getting Started
using Pkg;
Pkg.add("Visor")
# demo.jl:
using Visor
struct AppData
count::Int
end
# Task implementation, the first argument is a task descriptor
function level_1_task(td)
@info "starting $td"
for msg in td.inbox
if isshutdown(msg)
break
elseif isa(msg, AppData)
@info "$td recv: $msg"
end
end
@info "shutting down $td"
end
function level_2_task(td)
@info "starting $td"
n = 0
while true
sleep(0.5)
if isshutdown(td)
break
end
# send a data message to process named level_1_task
cast("level_1_task", AppData(n))
n += 1
end
@info "shutting down $td"
end
supervise([process(level_1_task), process(level_2_task)])
On Linux machines Visor caputures SIGINT
signal and terminate the application tasks in the reverse order of startup:
$ julia demo.jl
[ Info: starting level_1_task
[ Info: starting level_2_task
[ Info: level_1_task recv: AppData(0)
[ Info: level_1_task recv: AppData(1)
[ Info: level_1_task recv: AppData(2)
^C[ Info: shutting down level_2_task
[ Info: shutting down level_1_task
A Process is a supervised task that is started/stopped in a deterministic way and that may be restarted in case of failure.
A process task function has a mandatory first argument that is a task descriptor td
value that:
- check if a shutdown request is pending with
isshutdown
; - receive messages from the
inbox
Channel;
function level_2_task(td)
while true
# do something ...
if isshutdown(td)
break
end
end
end
If the task function is designed to receive messages the argument of isshutdown
MUST BE the message and not the task
object:
function level_1_task(td)
for msg in td.inbox
# check if it is a shutdown request
if isshutdown(msg)
break
elseif isa(msg, AppMessage)
# do something with the application message
# that some task sent to level_1_task.
do_something(msg)
end
end
end
supervise
manages processes and supervisors until all of them terminates or a shutdown is requested.
processes = [process(level_1_task), process(level_2_task)]
supervise(processes)
supervise
applied to a supervisor is an example of a hierachical supervision tree:
sv = supervisor("bucket", [process(foo), process(bar)])
supervise(sv)
In this case the supervisor "bucket" manages the tasks foo
and bar
, whereas in the previous example level_1_task
and level_2_task
are managed by the root supervisor.
see Supervisor documentation for more details.
Restart policy example
The following example show the restart on failure scenario: a task failure is simulated with an error exception that terminates the consumer.
using Visor
# Consumer process task: read messages from process inbox and print to stdout.
# The first mandatory argument process is the task descriptor object.
# With the task descriptor it is possible to receive any type of messages, for example
# shutdown requests, and send messages to other processes via call and cast methods.
function consumer(td)
while true
# Fetch data or control messages,
# for example a request to shutdown the task.
msg = take!(td.inbox)
# Check if msg is the shutdown control message ...
!isshutdown(msg) || break
println(msg)
if msg == 5
error("fatal error simulation")
end
end
end
# Producer task.
# In this case the shutdown request is not captured by checking the inbox messages but checking
# the task descriptor.
function producer(td)
count = 1
while true
sleep(1)
# send count value to consumer task
cast("consumer", count)
# check if was requested a shutdown (for example by SIGINT signal)
!isshutdown(td) || break
count += 1
end
end
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
process(consumer)
process(producer, thread=true)
]
# Supervises the tasks and apply restart policies in case of process failure.
# In this case it allows a maximum of 2 process restart in 1 second time window.
# In case the fail rate is above this limit the processes are terminated and the supervisor returns.
supervise(tasks, intensity=2, period=1);
The producer process sends a number every seconds to the consumer process that get it from the inbox channel and print to stdout.
When the number equals to 5 then an exception is raised, the task fails and the supervisor restarts the consumer process:
$ julia producer_consumer.jl
1
2
3
4
5
┌ Error: [consumer] exception: ErrorException("fatal error simulation")
└ @ Visor ~/dev/Visor/src/Visor.jl:593
6
7
^C8
9
Visor.DEFAULT_INTENSITY
Visor.Request
Visor.SupervisorResync
Visor.add_node
Visor.add_processes
Visor.call
Visor.call
Visor.cast
Visor.cast
Visor.from
Visor.from_name
Visor.from_name
Visor.hassupervised
Visor.ifrestart
Visor.isrequest
Visor.isshutdown
Visor.isshutdown
Visor.process
Visor.process
Visor.receive
Visor.reply
Visor.resync
Visor.setname
Visor.setroot
Visor.setsupervisor
Visor.shutdown
Visor.shutdown
Visor.startup
Visor.startup
Visor.supervise
Visor.supervise
Visor.supervisor
Visor.@isshutdown
Visor.DEFAULT_INTENSITY
— ConstantMaximun numbers of restart in period seconds.
Visor.Request
— TypeA response value is expected when a Request message is pushed to the target process inbox.
Visor.SupervisorResync
— TypeTrigger a supervisor resync
Visor.add_node
— Methodadd_node(supervisor::Supervisor, proc::Supervised)
Add supervised proc
to the children's collection of the controlling supervisor
.
Visor.add_processes
— Methodadd_processes(
svisor::Supervisor,
processes;
intensity,
period,
strategy,
terminateif::Symbol=:empty
)::Supervisor
Setup hierarchy relationship between supervisor and supervised list of processes and configure supervisor behavior.
Visor.call
— Methodcall(name::String, request::Any; timeout::Real=3)
Send a request
to the process identified by full name
and wait for a response.
If timeout
is equal to -1 then waits forever, otherwise if a response is not received in timeout
seconds an ErrorException
is raised.
The message sent to the target task is a Request
struct that contains the request and a channel for sending back the response.
using Visor
function server(task)
for msg in task.inbox
isshutdown(msg) && break
put!(msg.inbox, msg.request * 2)
end
println("server done")
end
function requestor(task)
request = 10
response = call("server", request)
println("server(",request,")=",response)
shutdown()
end
supervise([process(server), process(requestor)])
Visor.call
— Methodcall(target::Supervised, request::Any; timeout::Real=-1)
Send a request
to target
process and wait for a response.
Visor.cast
— Methodcast(name::String, message::Any)
Send a message
to process with full name
without waiting for a response.
Visor.cast
— Methodcast(process::Supervised, message)
The message
value is sent to target
process without waiting for a response.
Visor.from
— Methodfrom(name::String)::Supervised
Return the supervised node identified by full name
.
Given for example the process mytask
supervised by mysupervisor
:
supervisor("mysupervisor", [process(mytask)])
then the full name of mytask
process is mysupervisor.mytask
.
Visor.from_name
— Methodfrom_name(name::AbstractString)
Return the process supervised by the root supervisor that is identified by name
or nothing if such process does not exists.
Visor.from_name
— Methodfrom_name(container::Supervised, name::AbstractString)
Return the process identified by name
that is supervised by container
or nothing if such process does not exists.
It may be useful in case the process name contains dots because it does not lookup the supervision hierarchy using the dot as separator between nodes.
Visor.hassupervised
— Methodhassupervised(name::String)
Determine whether the supervised identified by name
exists.
Visor.ifrestart
— Methodifrestart(fn, process)
Call the no-argument function fn
if the process
restarted.
The function fn
is not executed at the first start of process
.
Visor.isrequest
— Methodisrequest(message)
Return true if message is a Request
.
Visor.isshutdown
— Methodisshutdown(msg)
Returns true
if message msg
is a shutdown command.
Visor.isshutdown
— Methodfunction isshutdown(process::Supervised)
Returns true
if process has a shutdown command in its inbox.
As a side effect remove messages from process inbox until a shutdown
request is found.
Visor.process
— Methodprocess(id, fn;
args=(),
namedargs=(;),
force_interrupt_after::Real=1.0,
stop_waiting_after::Real=Inf,
debounce_time=NaN,
trace_exception=false,
thread=true,
restart=:transient)::ProcessSpec
Declare a supervised task that may be forcibly interrupted.
id
is the process name and fn
is the task function.
process
returns only a specification: the task function has to be started with supervise
.
See process
online docs for more details.
Visor.process
— Methodprocess(fn;
args=(),
namedargs=(;),
force_interrupt_after::Real=1.0,
stop_waiting_after::Real=Inf,
debounce_time=NaN,
thread=true,
restart=:transient)::ProcessSpec
The process name is set equals to string(fn)
.
Visor.receive
— Methodfunction receive(fn::Function, pd::Process)
Execute fn(msg)
for every message msg
delivered to the pd
Process.
Return if a Shutdown
control message is received.
Visor.reply
— Methodreply(request::Request, response::Any)
Send the response
to the call
method that issued the request
.
Visor.resync
— Methodresync(supervisor)
Restart processes previously stopped by supervisor policies.
Return true if all supervised processes terminated.
Visor.setname
— Methodsetname(process::Process, new_name::AbstractString)
Change the process name
Visor.setroot
— Methodsetroot(;
intensity::Int=1,
period::Int=DEFAULT_PERIOD,
strategy::Symbol=:one_for_one,
terminateif::Symbol=:empty,
handler::Union{Nothing,Function}=nothing,
)
Setup root supervisor settings.
Visor.setsupervisor
— Methodsetsupervisor(sv::Supervisor;
intensity::Int=1,
period::Int=DEFAULT_PERIOD,
strategy::Symbol=:one_for_one,
terminateif::Symbol=:empty,
handler::Union{Nothing,Function}=nothing,
)
Setup supervisor settings.
Visor.shutdown
— Functionshutdown(node)
Try to shutdown a process or a supervisor.
If node
is a supervisor it first attempts to shutdown all children nodes and then it stop the supervisor. If some process refuse to shutdown the node
supervisor is not stopped.
Visor.shutdown
— Methodshutdown()
Shutdown all supervised nodes.
Visor.startup
— Methodstartup(proc::Supervised)
Start the supervised process defined by proc
as children of the root supervisor.
Visor.startup
— Methodstartup(supervisor::Supervisor, proc::Supervised)
Start the supervised process defined by proc
as child of supervisor
.
julia> using Visor
julia> foo(self) = println("foo process started");
julia> main(self) = startup(self.supervisor, process(foo));
julia> supervise([process(main)]);
foo process started
Visor.supervise
— Methodsupervise(processes::Vector{<:Supervised};
intensity::Int=DEFAULT_INTENSITY,
period::Int=DEFAULT_PERIOD,
strategy::Symbol=:one_for_one,
terminateif::Symbol=:empty,
handler::Union{Nothing, Function}=nothing,
wait::Bool=true)::Supervisor
The root supervisor start a family of supervised processes
.
Return the root supervisor or wait for supervisor termination if wait
is true.
Arguments
intensity::Int
: maximum number of restarts allowed inperiod
seconds.period::Int
: time interval that controls the restart intensity.strategy::Symbol
: defines the restart strategy::one_for_one
: only the terminated task is restarted.:one_for_all
: if a child task terminates, all other tasks are terminated, and then all children, including the terminated one, are restarted.:rest_for_one
: if a child task terminates, the rest of the children tasks (that is, the child tasks after the terminated process in start order) are terminated. Then the terminated child task and the rest of the child tasks are restarted.:one_terminate_all
: if a child task terminates then the remaining tasks will be concurrently terminated (the startup order is not respected).
terminateif::Symbol
::empty
: terminate the supervisor when all child tasks terminate.:shutdown
: the supervisor terminate at shutdown.
handler
: a callback function with prototypefn(process, event)
invoked when process events occurs:
when process tasks throws exception and when a process terminate because of a ProcessFatal
reason.
wait::Bool
: wait for supervised nodes termination.
children = [process(worker, args=(15,"myid"))]
supervise(children)
Visor.supervise
— Methodsupervise(proc::Supervised;
intensity::Int=DEFAULT_INTENSITY,
period::Int=DEFAULT_PERIOD,
strategy::Symbol=:one_for_one,
terminateif::Symbol=:empty,
handler::Union{Nothing, Function}=nothing,
wait::Bool=true)::Supervisor
The root supervisor start a supervised process defined by proc
.
Visor.supervisor
— Functionsupervisor(
id, processes;
intensity=DEFAULT_INTENSITY,
period=DEFAULT_PERIOD,
strategy=:one_for_one,
terminateif=:empty)::SupervisorSpec
Declare a supervisor of one or more processes
.
processes
may be a Process
or an array of Process
.
julia> using Visor
julia> mytask(pd) = ();
julia> supervisor("mysupervisor", process(mytask))
mysupervisor
```jldoctest julia> using Visor
julia> tsk1(pd) = ();
julia> tsk2(pd) = ();
julia> supervisor("mysupervisor", [process(tsk1), process(tsk2)]) mysupervisor
See Supervisor documentation for more details.
Visor.@isshutdown
— Macro@isshutdown process_descriptor
@isshutdown msg
Break the loop if a shutdown control message is received.