Visor

Visor is a tasks supervisor that provides a set of policies to control the logic for the shutdown and restart of tasks.

The scope of Visor is to provide support for reliable, long-running and fault tolerant applications written in Julia.

Visor is influenced by Erlang Supervisor design principles with the aim to fit as "naturally" as possible into the Julia task system.

Getting Started

using Pkg; 
Pkg.add("Visor")
# demo.jl:

using Visor

struct AppData
    count::Int
end

# Task implementation, the first argument is a task descriptor
function level_1_task(td)
    @info "starting $td"
    for msg in td.inbox
        if isshutdown(msg)
            break
        elseif isa(msg, AppData)
            @info "$td recv: $msg"
        end
    end
    @info "shutting down $td"
end

function level_2_task(td)
    @info "starting $td"
    n = 0
    while true
        sleep(0.5)
        if isshutdown(td)
            break
        end
        # send a data message to process named level_1_task
        cast("level_1_task", AppData(n))
        n += 1
    end
    @info "shutting down $td"
end

supervise([process(level_1_task), process(level_2_task)])

On Linux machines Visor caputures SIGINT signal and terminate the application tasks in the reverse order of startup:

$ julia demo.jl

[ Info: starting level_1_task
[ Info: starting level_2_task
[ Info: level_1_task recv: AppData(0)
[ Info: level_1_task recv: AppData(1)
[ Info: level_1_task recv: AppData(2)
^C[ Info: shutting down level_2_task
[ Info: shutting down level_1_task

A Process is a supervised task that is started/stopped in a deterministic way and that may be restarted in case of failure.

A process task function has a mandatory first argument that is a task descriptor td value that:

  • check if a shutdown request is pending with isshutdown;
  • receive messages from the inbox Channel;
function level_2_task(td)
    while true
        # do something ...
        if isshutdown(td)
            break
        end
    end
end

If the task function is designed to receive messages the argument of isshutdown MUST BE the message and not the task object:

function level_1_task(td)
    for msg in td.inbox
        # check if it is a shutdown request 
        if isshutdown(msg)
            break
        elseif isa(msg, AppMessage)
            # do something with the application message
            # that some task sent to level_1_task.
            do_something(msg)
        end
    end
end

supervise manages processes and supervisors until all of them terminates or a shutdown is requested.

processes = [process(level_1_task), process(level_2_task)]
supervise(processes)

supervise applied to a supervisor is an example of a hierachical supervision tree:

sv = supervisor("bucket", [process(foo), process(bar)])
supervise(sv)

In this case the supervisor "bucket" manages the tasks foo and bar, whereas in the previous example level_1_task and level_2_task are managed by the root supervisor.

see Supervisor documentation for more details.

Restart policy example

The following example show the restart on failure scenario: a task failure is simulated with an error exception that terminates the consumer.

using Visor

# Consumer process task: read messages from process inbox and print to stdout.
# The first mandatory argument process is the task descriptor object.
# With the task descriptor it is possible to receive any type of messages, for example
# shutdown requests, and send messages to other processes via call and cast methods.
function consumer(td)

    while true
        # Fetch data or control messages, 
        # for example a request to shutdown the task.
        msg = take!(td.inbox)

        # Check if msg is the shutdown control message ...
        !isshutdown(msg) || break

        println(msg)
        if msg == 5
            error("fatal error simulation")
        end

    end
end

# Producer task.
# In this case the shutdown request is not captured by checking the inbox messages but checking
# the task descriptor.
function producer(td)
    count = 1
    while true
        sleep(1)

        # send count value to consumer task
        cast("consumer", count)

        # check if was requested a shutdown (for example by SIGINT signal)
        !isshutdown(td) || break

        count += 1
    end
end

# Tasks are started following the list order and are shut down in reverse order:
# producer is started last and terminated first.
tasks = [
    process(consumer)
    process(producer, thread=true)
]

# Supervises the tasks and apply restart policies in case of process failure.
# In this case it allows a maximum of 2 process restart in 1 second time window.
# In case the fail rate is above this limit the processes are terminated and the supervisor returns.
supervise(tasks, intensity=2, period=1);

The producer process sends a number every seconds to the consumer process that get it from the inbox channel and print to stdout.

When the number equals to 5 then an exception is raised, the task fails and the supervisor restarts the consumer process:

$ julia producer_consumer.jl 
1
2
3
4
5
┌ Error: [consumer] exception: ErrorException("fatal error simulation")
└ @ Visor ~/dev/Visor/src/Visor.jl:593
6
7
^C8
9
Visor.RequestType

A response value is expected when a Request message is pushed to the target process inbox.

source
Visor.add_nodeMethod
add_node(supervisor::Supervisor, proc::Supervised)

Add supervised proc to the children's collection of the controlling supervisor.

source
Visor.add_processesMethod
add_processes(
    svisor::Supervisor,
    processes;
    intensity,
    period,
    strategy,
    terminateif::Symbol=:empty
)::Supervisor

Setup hierarchy relationship between supervisor and supervised list of processes and configure supervisor behavior.

source
Visor.callMethod
call(name::String, request::Any; timeout::Real=3)

Send a request to the process identified by full name and wait for a response.

If timeout is equal to -1 then waits forever, otherwise if a response is not received in timeout seconds an ErrorException is raised.

The message sent to the target task is a Request struct that contains the request and a channel for sending back the response.

using Visor

function server(task)
    for msg in task.inbox
        isshutdown(msg) && break
        put!(msg.inbox, msg.request * 2)
    end
    println("server done")
end

function requestor(task)
    request = 10
    response = call("server", request)
    println("server(",request,")=",response)
    shutdown()
end

supervise([process(server), process(requestor)])
source
Visor.callMethod
call(target::Supervised, request::Any; timeout::Real=-1)

Send a request to target process and wait for a response.

source
Visor.castMethod
cast(name::String, message::Any)

Send a message to process with full name without waiting for a response.

source
Visor.castMethod
cast(process::Supervised, message)

The message value is sent to target process without waiting for a response.

source
Visor.fromMethod
from(name::String)::Supervised

Return the supervised node identified by full name.

Given for example the process mytask supervised by mysupervisor:

supervisor("mysupervisor", [process(mytask)])

then the full name of mytask process is mysupervisor.mytask.

source
Visor.from_nameMethod
from_name(name::AbstractString)

Return the process supervised by the root supervisor that is identified by name or nothing if such process does not exists.

source
Visor.from_nameMethod
from_name(container::Supervised, name::AbstractString)

Return the process identified by name that is supervised by container or nothing if such process does not exists.

It may be useful in case the process name contains dots because it does not lookup the supervision hierarchy using the dot as separator between nodes.

source
Visor.ifrestartMethod
ifrestart(fn, process)

Call the no-argument function fn if the process restarted.

The function fn is not executed at the first start of process.

source
Visor.isshutdownMethod
function isshutdown(process::Supervised)

Returns true if process has a shutdown command in its inbox.

As a side effect remove messages from process inbox until a shutdown request is found.

source
Visor.processMethod
process(id, fn;
        args=(),
        namedargs=(;),
        force_interrupt_after::Real=1.0,
        stop_waiting_after::Real=Inf,
        debounce_time=NaN,
        trace_exception=false,
        thread=true,
        restart=:transient)::ProcessSpec

Declare a supervised task that may be forcibly interrupted.

id is the process name and fn is the task function.

process returns only a specification: the task function has to be started with supervise.

See process online docs for more details.

source
Visor.processMethod
process(fn;
        args=(),
        namedargs=(;),
        force_interrupt_after::Real=1.0,
        stop_waiting_after::Real=Inf,
        debounce_time=NaN,
        thread=true,
        restart=:transient)::ProcessSpec

The process name is set equals to string(fn).

source
Visor.receiveMethod
function receive(fn::Function, pd::Process)

Execute fn(msg) for every message msg delivered to the pd Process.

Return if a Shutdown control message is received.

source
Visor.replyMethod
reply(request::Request, response::Any)

Send the response to the call method that issued the request.

source
Visor.resyncMethod
resync(supervisor)

Restart processes previously stopped by supervisor policies.

Return true if all supervised processes terminated.

source
Visor.setnameMethod
setname(process::Process, new_name::AbstractString)

Change the process name

source
Visor.setrootMethod
setroot(;
    intensity::Int=1,
    period::Int=DEFAULT_PERIOD,
    strategy::Symbol=:one_for_one,
    terminateif::Symbol=:empty,
    handler::Union{Nothing,Function}=nothing,
)

Setup root supervisor settings.

source
Visor.setsupervisorMethod
setsupervisor(sv::Supervisor;
    intensity::Int=1,
    period::Int=DEFAULT_PERIOD,
    strategy::Symbol=:one_for_one,
    terminateif::Symbol=:empty,
    handler::Union{Nothing,Function}=nothing,
)

Setup supervisor settings.

source
Visor.shutdownFunction
shutdown(node)

Try to shutdown a process or a supervisor.

If node is a supervisor it first attempts to shutdown all children nodes and then it stop the supervisor. If some process refuse to shutdown the node supervisor is not stopped.

source
Visor.startupMethod
startup(proc::Supervised)

Start the supervised process defined by proc as children of the root supervisor.

source
Visor.startupMethod
startup(supervisor::Supervisor, proc::Supervised)

Start the supervised process defined by proc as child of supervisor.

julia> using Visor

julia> foo(self) = println("foo process started");

julia> main(self) = startup(self.supervisor, process(foo));

julia> supervise([process(main)]);
foo process started
source
Visor.superviseMethod
supervise(processes::Vector{<:Supervised};
          intensity::Int=DEFAULT_INTENSITY,
          period::Int=DEFAULT_PERIOD,
          strategy::Symbol=:one_for_one,
          terminateif::Symbol=:empty,
          handler::Union{Nothing, Function}=nothing,
          wait::Bool=true)::Supervisor

The root supervisor start a family of supervised processes.

Return the root supervisor or wait for supervisor termination if wait is true.

Arguments

  • intensity::Int: maximum number of restarts allowed in period seconds.
  • period::Int: time interval that controls the restart intensity.
  • strategy::Symbol: defines the restart strategy:
    • :one_for_one: only the terminated task is restarted.
    • :one_for_all: if a child task terminates, all other tasks are terminated, and then all children, including the terminated one, are restarted.
    • :rest_for_one: if a child task terminates, the rest of the children tasks (that is, the child tasks after the terminated process in start order) are terminated. Then the terminated child task and the rest of the child tasks are restarted.
    • :one_terminate_all: if a child task terminates then the remaining tasks will be concurrently terminated (the startup order is not respected).
  • terminateif::Symbol:
    • :empty: terminate the supervisor when all child tasks terminate.
    • :shutdown: the supervisor terminate at shutdown.
  • handler: a callback function with prototype fn(process, event) invoked when process events occurs:

when process tasks throws exception and when a process terminate because of a ProcessFatal reason.

  • wait::Bool: wait for supervised nodes termination.
    children = [process(worker, args=(15,"myid"))]
    supervise(children)
source
Visor.superviseMethod
supervise(proc::Supervised;
          intensity::Int=DEFAULT_INTENSITY,
          period::Int=DEFAULT_PERIOD,
          strategy::Symbol=:one_for_one,
          terminateif::Symbol=:empty,
          handler::Union{Nothing, Function}=nothing,
          wait::Bool=true)::Supervisor

The root supervisor start a supervised process defined by proc.

source
Visor.supervisorFunction
supervisor(
    id, processes;
    intensity=DEFAULT_INTENSITY,
    period=DEFAULT_PERIOD,
    strategy=:one_for_one,
    terminateif=:empty)::SupervisorSpec

Declare a supervisor of one or more processes.

processes may be a Process or an array of Process.

julia> using Visor

julia> mytask(pd) = ();

julia> supervisor("mysupervisor", process(mytask))
mysupervisor

```jldoctest julia> using Visor

julia> tsk1(pd) = ();

julia> tsk2(pd) = ();

julia> supervisor("mysupervisor", [process(tsk1), process(tsk2)]) mysupervisor

See Supervisor documentation for more details.

source
Visor.@isshutdownMacro
@isshutdown process_descriptor
@isshutdown msg

Break the loop if a shutdown control message is received.

source