Skip to content

Commit

Permalink
Implement generic fail, run, and exit channels for agent workload exe…
Browse files Browse the repository at this point in the history
…cution
  • Loading branch information
kthomas committed Dec 22, 2023
1 parent fd241ef commit 203ed1a
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 61 deletions.
23 changes: 18 additions & 5 deletions agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,33 @@ import (
// WorkloadCacheBucket is an internal, non-public bucket for sharing files between host and agent
const WorkloadCacheBucket = "NEXCACHE"

// DefaultRunloopSleepTimeoutMillis default number of milliseconds to sleep during execution runloops
const DefaultRunloopSleepTimeoutMillis = 25

// ExecutionProviderParams parameters for initializing a specific execution provider
type ExecutionProviderParams struct {
WorkRequest

Stderr io.Writer `json:"-"`
Stdout io.Writer `json:"-"`
TmpFilename string `json:"-"`
VmID string `json:"-"`
// Fail channel receives bool upon command failing to start
Fail chan bool `json:"-"`

// Run channel receives bool upon command successfully starting
Run chan bool `json:"-"`

// Exit channel receives int exit code upon command exit
Exit chan int `json:"-"`

Stderr io.Writer `json:"-"`
Stdout io.Writer `json:"-"`

TmpFilename string `json:"-"`
VmID string `json:"-"`
}

type WorkRequest struct {
WorkloadName string `json:"workload_name"`
Hash string `json:"hash"`
TotalBytes int `json:"total_bytes"`
TotalBytes int32 `json:"total_bytes"`
Environment map[string]string `json:"environment"`
WorkloadType string `json:"workload_type,omitempty"`

Expand Down
59 changes: 49 additions & 10 deletions nex-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ type Agent struct {
started time.Time
}

// InitAgent initializes a new agent to facilitate communications with
// NewAgent initializes a new agent to facilitate communications with
// the host node and dispatch workloads
func InitAgent() (*Agent, error) {
func NewAgent() (*Agent, error) {
metadata, err := GetMachineMetadata()
if err != nil {
return nil, err
Expand Down Expand Up @@ -120,19 +120,13 @@ func (a *Agent) handleWorkDispatched(m *nats.Msg) {
return
}

tmpFile, err := a.cacheExecutableArtifact(&request)
params, err := a.newExecutionProviderParams(&request)
if err != nil {
a.workAck(m, false, err.Error())
return
}

provider, err := providers.ExecutionProviderFactory(&agentapi.ExecutionProviderParams{
WorkRequest: request,
Stderr: &logEmitter{stderr: true, name: request.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: request.WorkloadName, logs: a.agentLogs},
TmpFilename: *tmpFile,
VmID: a.md.VmId,
})
provider, err := providers.NewExecutionProvider(params)
if err != nil {
msg := fmt.Sprintf("Failed to initialize workload execution provider; %s", err)
a.LogError(msg)
Expand Down Expand Up @@ -177,6 +171,51 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.WorkRequest) (*string, err
return &tempFile, nil
}

// newExecutionProviderParams initializes new execution provider params
// for the given work request and starts a goroutine listening
func (a *Agent) newExecutionProviderParams(req *agentapi.WorkRequest) (*agentapi.ExecutionProviderParams, error) {
tmpFile, err := a.cacheExecutableArtifact(req)
if err != nil {
return nil, err
}

params := &agentapi.ExecutionProviderParams{
WorkRequest: *req,
Stderr: &logEmitter{stderr: true, name: req.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: req.WorkloadName, logs: a.agentLogs},
TmpFilename: *tmpFile,
VmID: a.md.VmId,

Fail: make(chan bool),
Run: make(chan bool),
Exit: make(chan int),
}

go func() {
for {
select {
case <-params.Fail:
msg := fmt.Sprintf("Failed to start workload: %s", err)
a.PublishWorkloadExited(params.VmID, params.WorkloadName, msg, true, -1)
return

case <-params.Run:
a.PublishWorkloadStarted(params.VmID, params.WorkloadName, params.TotalBytes)

case exit := <-params.Exit:
msg := fmt.Sprintf("Exited workload with status: %d", exit)
a.PublishWorkloadExited(params.VmID, params.WorkloadName, msg, exit != 0, exit)
default:
// fmt.Println("no-op")
}

time.Sleep(agentapi.DefaultRunloopSleepTimeoutMillis)
}
}()

return params, nil
}

// workAck ACKs the provided NATS message by responding with the
// accepted status of the attempted work request and associated message
func (a *Agent) workAck(m *nats.Msg, accepted bool, msg string) error {
Expand Down
2 changes: 1 addition & 1 deletion nex-agent/cmd/nex-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func main() {
ctx := context.Background()

agent, err := nexagent.InitAgent()
agent, err := nexagent.NewAgent()
if err != nil {
haltVM(err)
}
Expand Down
29 changes: 19 additions & 10 deletions nex-agent/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
agentapi "github.com/ConnectEverything/nex/agent-api"
)

const NexEventSourceNexAgent = "nex-agent"

func (a *Agent) LogError(msg string) {
a.submitLog(msg, agentapi.LogLevelError)
}
Expand All @@ -21,18 +23,19 @@ func (a *Agent) LogInfo(msg string) {
func (a *Agent) submitLog(msg string, lvl agentapi.LogLevel) {
select {
case a.agentLogs <- &agentapi.LogEntry{
Source: "nex-agent",
Source: NexEventSourceNexAgent,
Level: lvl,
Text: msg,
}: // noop
default: // noop
}
}

func (a *Agent) PublishWorkloadStarted(vmID string, workloadName string, totalBytes int32) {
// FIXME-- revisit error handling
func (a *Agent) PublishWorkloadStarted(vmID, workloadName string, totalBytes int32) {
select {
case a.agentLogs <- &agentapi.LogEntry{
Source: "nex-agent",
Source: NexEventSourceNexAgent,
Level: agentapi.LogLevelInfo,
Text: fmt.Sprintf("Workload %s started", workloadName),
}: // noop
Expand All @@ -48,24 +51,30 @@ func (a *Agent) PublishWorkloadStarted(vmID string, workloadName string, totalBy
}
}

// PublishWorkloadStopped publishes a workload stopped message
func (a *Agent) PublishWorkloadStopped(vmId string, workloadName string, err bool, message string) {
// PublishWorkloadExited publishes a workload failed or stopped message
// FIXME-- revisit error handling
func (a *Agent) PublishWorkloadExited(vmID, workloadName, message string, err bool, code int) {
level := agentapi.LogLevelInfo
code := 0
if err {
level = agentapi.LogLevelError
code = -1
}

// FIXME-- this hack is here to get things working... refactor me
txt := fmt.Sprintf("Workload %s stopped", workloadName)
if code == -1 {
txt = fmt.Sprintf("Workload %s failed to start", workloadName)
}

select {
case a.agentLogs <- &agentapi.LogEntry{
Source: "nex-agent",
Source: NexEventSourceNexAgent,
Level: agentapi.LogLevel(level),
Text: fmt.Sprintf("Workload %s stopped", workloadName),
Text: txt,
}: // noop
default: // noop
}

evt := agentapi.NewAgentEvent(vmId, agentapi.WorkloadStoppedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName, Code: code, Message: message})
evt := agentapi.NewAgentEvent(vmID, agentapi.WorkloadStoppedEventType, agentapi.WorkloadStatusEvent{WorkloadName: workloadName, Code: code, Message: message})
select {
case a.eventLogs <- &evt: // noop
default:
Expand Down
4 changes: 2 additions & 2 deletions nex-agent/providers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type ExecutionProvider interface {
Validate() error
}

// ExecutionProviderFactory initializes and returns an execution provider for a given work request
func ExecutionProviderFactory(params *agentapi.ExecutionProviderParams) (ExecutionProvider, error) {
// NewExecutionProvider initializes and returns an execution provider for a given work request
func NewExecutionProvider(params *agentapi.ExecutionProviderParams) (ExecutionProvider, error) {
if params.WorkloadType == "" { // FIXME-- should req.WorkloadType be a *string for better readability? e.g., json.Unmarshal will set req.Type == "" even if it is not provided.
return nil, errors.New("execution provider factory requires a workload type parameter")
}
Expand Down
61 changes: 34 additions & 27 deletions nex-agent/providers/lib/elf.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os/exec"
"strings"
"time"

agentapi "github.com/ConnectEverything/nex/agent-api"
)
Expand All @@ -19,7 +20,9 @@ type ELF struct {
totalBytes int32
vmID string

// started chan int
fail chan bool
run chan bool
exit chan int

stderr io.Writer
stdout io.Writer
Expand All @@ -33,38 +36,38 @@ func (e *ELF) Execute() error {
cmd.Stdout = e.stdout
cmd.Stderr = e.stderr

envVars := make([]string, len(e.environment))
cmd.Env = make([]string, len(e.environment))
for k, v := range e.environment {
item := fmt.Sprintf("%s=%s", strings.ToUpper(k), v)
envVars = append(envVars, item)
cmd.Env = append(cmd.Env, item)
}
cmd.Env = envVars

err := cmd.Start()

err := cmd.Start() // this doesn't actually have to be in the goroutine, and we could just return the error...
if err != nil {
// FIXME
// msg := fmt.Sprintf("Failed to start workload: %s", err)
// //e.agent.LogError(msg)
// //e.agent.PublishWorkloadStopped(e.vmID, e.name, true, msg)
// return
e.fail <- true
return
}

//e.agent.PublishWorkloadStarted(e.vmID, e.name, e.totalBytes)

// if cmd.Wait has unblocked, it means the workload has stopped
err = cmd.Wait()

// FIXME
// var msg string
// if err != nil {
// msg = fmt.Sprintf("Workload stopped unexpectedly: %s", err)
// } else {
// msg = "OK"
// }

//e.agent.PublishWorkloadStopped(e.vmID, e.name, err != nil, msg)

go func() {
for {
if cmd.Process != nil {
e.run <- true
return
}

// TODO-- implement a timeout after which we dispatch e.fail

time.Sleep(time.Millisecond * agentapi.DefaultRunloopSleepTimeoutMillis)
}
}()

if err = cmd.Wait(); err != nil { // blocking until exit
if exitError, ok := err.(*exec.ExitError); ok {
e.exit <- exitError.ExitCode() // this is here for now for review but can likely be simplified to one line: `e.exit <- cmd.ProcessState.ExitCode()``
}
} else {
e.exit <- cmd.ProcessState.ExitCode()
}
}()

return nil
Expand All @@ -82,11 +85,15 @@ func InitNexExecutionProviderELF(params *agentapi.ExecutionProviderParams) *ELF
environment: params.Environment,
name: params.WorkloadName,
tmpFilename: params.TmpFilename,
totalBytes: int32(params.TotalBytes),
totalBytes: params.TotalBytes,
vmID: params.VmID,

stderr: params.Stderr,
stdout: params.Stdout,

fail: params.Fail,
run: params.Run,
exit: params.Exit,
}
}

Expand Down
10 changes: 7 additions & 3 deletions nex-agent/providers/lib/oci.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package lib

import agentapi "github.com/ConnectEverything/nex/agent-api"
import (
"errors"

agentapi "github.com/ConnectEverything/nex/agent-api"
)

// OCI execution provider implementation
type OCI struct {
}

func (o *OCI) Execute() error {
return nil
return errors.New("oci execution provider not yet implemented")
}

func (o *OCI) Validate() error {
return nil
return errors.New("oci execution provider not yet implemented")
}

// InitNexExecutionProviderOCI convenience method to initialize an OCI execution provider
Expand Down
10 changes: 7 additions & 3 deletions nex-agent/providers/lib/wasm.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package lib

import agentapi "github.com/ConnectEverything/nex/agent-api"
import (
"errors"

agentapi "github.com/ConnectEverything/nex/agent-api"
)

// Wasm execution provider implementation
type Wasm struct {
}

func (e *Wasm) Execute() error {
return nil
return errors.New("wasm execution provider not yet implemented")
}

func (e *Wasm) Validate() error {
return nil
return errors.New("wasm execution provider not yet implemented")
}

// InitNexExecutionProviderWasm convenience method to initialize a Wasm execution provider
Expand Down

0 comments on commit 203ed1a

Please sign in to comment.