Visor
Visor is a tasks supervisor that provides a set of policies to control the logic for the shutdown and restart of tasks.
The scope of Visor is to provide support for reliable, long-running and fault tolerant applications written in Julia.
Visor is influenced by Erlang Supervisor design principles with the aim to fit as "naturally" as possible into the Julia task system.
Getting Started
using Pkg; 
Pkg.add("Visor")# demo.jl:
using Visor
struct AppData
    count::Int
end
# Task implementation, the first argument is a task descriptor
function level_1_task(td)
    @info "starting $td"
    for msg in td.inbox
        if isshutdown(msg)
            break
        elseif isa(msg, AppData)
            @info "$td recv: $msg"
        end
    end
    @info "shutting down $td"
end
function level_2_task(td)
    @info "starting $td"
    n = 0
    while true
        sleep(0.5)
        if isshutdown(td)
            break
        end
        # send a data message to process named level_1_task
        cast("level_1_task", AppData(n))
        n += 1
    end
    @info "shutting down $td"
end
supervise([process(level_1_task), process(level_2_task)])On Linux machines Visor caputures SIGINT signal and terminate the application tasks in the reverse order of startup:
$ julia demo.jl
[ Info: starting level_1_task
[ Info: starting level_2_task
[ Info: level_1_task recv: AppData(0)
[ Info: level_1_task recv: AppData(1)
[ Info: level_1_task recv: AppData(2)
^C[ Info: shutting down level_2_task
[ Info: shutting down level_1_task
A Process is a supervised task that is started/stopped in a deterministic way and that may be restarted in case of failure.
A process task function has a mandatory first argument that is a task descriptor td value that:
- check if a shutdown request is pending with isshutdown;
- receive messages from the inboxChannel;
function level_2_task(td)
    while true
        # do something ...
        if isshutdown(td)
            break
        end
    end
endIf the task function is designed to receive messages the argument of isshutdown MUST BE the message and not the task object:
function level_1_task(td)
    for msg in td.inbox
        # check if it is a shutdown request 
        if isshutdown(msg)
            break
        elseif isa(msg, AppMessage)
            # do something with the application message
            # that some task sent to level_1_task.
            do_something(msg)
        end
    end
endsupervise manages processes and supervisors until all of them terminates or a shutdown is requested.
processes = [process(level_1_task), process(level_2_task)]
supervise(processes)supervise applied to a supervisor is an example of a hierachical supervision tree:
sv = supervisor("bucket", [process(foo), process(bar)])
supervise(sv)In this case the supervisor "bucket" manages the tasks foo and bar, whereas in the previous example level_1_task and level_2_task are managed by the root supervisor.
see Supervisor documentation for more details.
Restart policy example
The following example show the restart on failure scenario: a task failure is simulated with an error exception that terminates the consumer.
using Visor
# Consumer process task: read messages from process inbox and print to stdout.
# The first mandatory argument process is the task descriptor object.
# With the task descriptor it is possible to receive any type of messages, for example
# shutdown requests, and send messages to other processes via call and cast methods.
function consumer(td)
    while true
        # Fetch data or control messages, 
        # for example a request to shutdown the task.
        msg = take!(td.inbox)
        # Check if msg is the shutdown control message ...
        !isshutdown(msg) || break
        println(msg)
        if msg == 5
            error("fatal error simulation")
        end
    end
end
# Producer task.
# In this case the shutdown request is not captured by checking the inbox messages but checking
# the task descriptor.
function producer(td)
    count = 1
    while true
        sleep(1)
        # send count value to consumer task
        cast("consumer", count)
        # check if was requested a shutdown (for example by SIGINT signal)
        !isshutdown(td) || break
        count += 1
    end
end
# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
    process(consumer)
    process(producer, thread=true)
]
# Supervises the tasks and apply restart policies in case of process failure.
# In this case it allows a maximum of 2 process restart in 1 second time window.
# In case the fail rate is above this limit the processes are terminated and the supervisor returns.
supervise(tasks, intensity=2, period=1);The producer process sends a number every seconds to the consumer process that get it from the inbox channel and print to stdout.
When the number equals to 5 then an exception is raised, the task fails and the supervisor restarts the consumer process:
$ julia producer_consumer.jl 
1
2
3
4
5
┌ Error: [consumer] exception: ErrorException("fatal error simulation")
└ @ Visor ~/dev/Visor/src/Visor.jl:593
6
7
^C8
9- Visor.DEFAULT_INTENSITY
- Visor.Request
- Visor.SupervisorResync
- Visor.add_node
- Visor.add_processes
- Visor.call
- Visor.call
- Visor.cast
- Visor.cast
- Visor.from
- Visor.from_name
- Visor.from_name
- Visor.hassupervised
- Visor.ifrestart
- Visor.isrequest
- Visor.isshutdown
- Visor.isshutdown
- Visor.process
- Visor.process
- Visor.receive
- Visor.reply
- Visor.resync
- Visor.setname
- Visor.setroot
- Visor.setsupervisor
- Visor.shutdown
- Visor.shutdown
- Visor.startup
- Visor.startup
- Visor.supervise
- Visor.supervise
- Visor.supervisor
- Visor.@isshutdown
Visor.DEFAULT_INTENSITY — ConstantMaximun numbers of restart in period seconds.
Visor.Request — TypeA response value is expected when a Request message is pushed to the target process inbox.
Visor.SupervisorResync — TypeTrigger a supervisor resync
Visor.add_node — Methodadd_node(supervisor::Supervisor, proc::Supervised)Add supervised proc to the children's collection of the controlling supervisor.
Visor.add_processes — Methodadd_processes(
    svisor::Supervisor,
    processes;
    intensity,
    period,
    strategy,
    terminateif::Symbol=:empty
)::SupervisorSetup hierarchy relationship between supervisor and supervised list of processes and configure supervisor behavior.
Visor.call — Methodcall(name::String, request::Any; timeout::Real=3)Send a request to the process identified by full name and wait for a response.
If timeout is equal to -1 then waits forever, otherwise if a response is not received in timeout seconds an ErrorException is raised.
The message sent to the target task is a Request struct that contains the request and a channel for sending back the response.
using Visor
function server(task)
    for msg in task.inbox
        isshutdown(msg) && break
        put!(msg.inbox, msg.request * 2)
    end
    println("server done")
end
function requestor(task)
    request = 10
    response = call("server", request)
    println("server(",request,")=",response)
    shutdown()
end
supervise([process(server), process(requestor)])Visor.call — Methodcall(target::Supervised, request::Any; timeout::Real=-1)Send a request to target process and wait for a response.
Visor.cast — Methodcast(name::String, message::Any)Send a message to process with full name without waiting for a response.
Visor.cast — Methodcast(process::Supervised, message)The message value is sent to target process without waiting for a response.
Visor.from — Methodfrom(name::String)::SupervisedReturn the supervised node identified by full name.
Given for example the process mytask supervised by mysupervisor:
supervisor("mysupervisor", [process(mytask)])then the full name of mytask process is mysupervisor.mytask.
Visor.from_name — Methodfrom_name(name::AbstractString)Return the process supervised by the root supervisor that is identified by name or nothing if such process does not exists.
Visor.from_name — Methodfrom_name(container::Supervised, name::AbstractString)Return the process identified by name that is supervised by container or nothing if such process does not exists.
It may be useful in case the process name contains dots because it does not lookup the supervision hierarchy using the dot as separator between nodes.
Visor.hassupervised — Methodhassupervised(name::String)Determine whether the supervised identified by name exists.
Visor.ifrestart — Methodifrestart(fn, process)Call the no-argument function fn if the process restarted.
The function fn is not executed at the first start of process.
Visor.isrequest — Methodisrequest(message)Return true if message is a Request.
Visor.isshutdown — Methodisshutdown(msg)Returns true if message msg is a shutdown command.
Visor.isshutdown — Methodfunction isshutdown(process::Supervised)Returns true if process has a shutdown command in its inbox.
As a side effect remove messages from process inbox until a shutdown request is found.
Visor.process — Methodprocess(id, fn;
        args=(),
        namedargs=(;),
        force_interrupt_after::Real=1.0,
        stop_waiting_after::Real=Inf,
        debounce_time=NaN,
        trace_exception=false,
        thread=true,
        restart=:transient)::ProcessSpecDeclare a supervised task that may be forcibly interrupted.
id is the process name and fn is the task function.
process returns only a specification: the task function has to be started with supervise.
See process online docs for more details.
Visor.process — Methodprocess(fn;
        args=(),
        namedargs=(;),
        force_interrupt_after::Real=1.0,
        stop_waiting_after::Real=Inf,
        debounce_time=NaN,
        thread=true,
        restart=:transient)::ProcessSpecThe process name is set equals to string(fn).
Visor.receive — Methodfunction receive(fn::Function, pd::Process)Execute fn(msg) for every message msg delivered to the pd Process.
Return if a Shutdown control message is received.
Visor.reply — Methodreply(request::Request, response::Any)Send the response to the call method that issued the request.
Visor.resync — Methodresync(supervisor)Restart processes previously stopped by supervisor policies.
Return true if all supervised processes terminated.
Visor.setname — Methodsetname(process::Process, new_name::AbstractString)Change the process name
Visor.setroot — Methodsetroot(;
    intensity::Int=1,
    period::Int=DEFAULT_PERIOD,
    strategy::Symbol=:one_for_one,
    terminateif::Symbol=:empty,
    handler::Union{Nothing,Function}=nothing,
)Setup root supervisor settings.
Visor.setsupervisor — Methodsetsupervisor(sv::Supervisor;
    intensity::Int=1,
    period::Int=DEFAULT_PERIOD,
    strategy::Symbol=:one_for_one,
    terminateif::Symbol=:empty,
    handler::Union{Nothing,Function}=nothing,
)Setup supervisor settings.
Visor.shutdown — Functionshutdown(node)Try to shutdown a process or a supervisor.
If node is a supervisor it first attempts to shutdown all children nodes and then it stop the supervisor. If some process refuse to shutdown the node supervisor is not stopped.
Visor.shutdown — Methodshutdown()Shutdown all supervised nodes.
Visor.startup — Methodstartup(proc::Supervised)Start the supervised process defined by proc as children of the root supervisor.
Visor.startup — Methodstartup(supervisor::Supervisor, proc::Supervised)Start the supervised process defined by proc as child of supervisor.
julia> using Visor
julia> foo(self) = println("foo process started");
julia> main(self) = startup(self.supervisor, process(foo));
julia> supervise([process(main)]);
foo process startedVisor.supervise — Methodsupervise(processes::Vector{<:Supervised};
          intensity::Int=DEFAULT_INTENSITY,
          period::Int=DEFAULT_PERIOD,
          strategy::Symbol=:one_for_one,
          terminateif::Symbol=:empty,
          handler::Union{Nothing, Function}=nothing,
          wait::Bool=true)::SupervisorThe root supervisor start a family of supervised processes.
Return the root supervisor or wait for supervisor termination if wait is true.
Arguments
- intensity::Int: maximum number of restarts allowed in- periodseconds.
- period::Int: time interval that controls the restart intensity.
- strategy::Symbol: defines the restart strategy:- :one_for_one: only the terminated task is restarted.
- :one_for_all: if a child task terminates, all other tasks are terminated, and then all children, including the terminated one, are restarted.
- :rest_for_one: if a child task terminates, the rest of the children tasks (that is, the child tasks after the terminated process in start order) are terminated. Then the terminated child task and the rest of the child tasks are restarted.
- :one_terminate_all: if a child task terminates then the remaining tasks will be concurrently terminated (the startup order is not respected).
 
- terminateif::Symbol:- :empty: terminate the supervisor when all child tasks terminate.
- :shutdown: the supervisor terminate at shutdown.
 
- handler: a callback function with prototype- fn(process, event)invoked when process events occurs:
when process tasks throws exception and when a process terminate because of a ProcessFatal reason.
- wait::Bool: wait for supervised nodes termination.
    children = [process(worker, args=(15,"myid"))]
    supervise(children)Visor.supervise — Methodsupervise(proc::Supervised;
          intensity::Int=DEFAULT_INTENSITY,
          period::Int=DEFAULT_PERIOD,
          strategy::Symbol=:one_for_one,
          terminateif::Symbol=:empty,
          handler::Union{Nothing, Function}=nothing,
          wait::Bool=true)::SupervisorThe root supervisor start a supervised process defined by proc.
Visor.supervisor — Functionsupervisor(
    id, processes;
    intensity=DEFAULT_INTENSITY,
    period=DEFAULT_PERIOD,
    strategy=:one_for_one,
    terminateif=:empty)::SupervisorSpecDeclare a supervisor of one or more processes.
processes may be a Process or an array of Process.
julia> using Visor
julia> mytask(pd) = ();
julia> supervisor("mysupervisor", process(mytask))
mysupervisor```jldoctest julia> using Visor
julia> tsk1(pd) = ();
julia> tsk2(pd) = ();
julia> supervisor("mysupervisor", [process(tsk1), process(tsk2)]) mysupervisor
See Supervisor documentation for more details.
Visor.@isshutdown — Macro@isshutdown process_descriptor
@isshutdown msgBreak the loop if a shutdown control message is received.