Skip to content

Commit

Permalink
log emissions working (sorta)
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidaddict committed Nov 16, 2023
1 parent c9cdf98 commit 7d10c08
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 34 deletions.
3 changes: 1 addition & 2 deletions control-api/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,12 @@ func NewRunRequest(opts ...RequestOption) (*RunRequest, error) {
// This will validate a request's workload JWT, decrypt the request environment. It will not
// perform a comparison of the hash found in the claims with a recipient's expected hash
func (request *RunRequest) Validate(myKey nkeys.KeyPair) (*jwt.GenericClaims, error) {
fmt.Printf("%v", request)

claims, err := jwt.DecodeGeneric(request.WorkloadJwt)
if err != nil {
return nil, fmt.Errorf("could not decode workload JWT: %s", err)
}
request.DecodedClaims = *claims
fmt.Printf("decodeed claims %+v\n", request.DecodedClaims)
if !validWorkloadName.MatchString(claims.Subject) {
return nil, fmt.Errorf("workload name claim ('%s') does not match requirements of all lowercase letters", claims.Subject)
}
Expand Down
2 changes: 2 additions & 0 deletions examples/echoservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func main() {
req.Respond(req.Data())
}

fmt.Println("Starting echo service")

_, err = services.AddService(nc, services.Config{
Name: "EchoService",
Version: "1.0.0",
Expand Down
17 changes: 17 additions & 0 deletions examples/nodeconfigs/limited_issuers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"kernel_path": "/home/kevin/lab/firecracker/vmlinux-5.10",
"rootfs_path": "/home/kevin/lab/firecracker/rootfs.ext4",
"machine_pool_size": 1,
"cni": {
"network_name": "fcnet",
"interface_name": "veth0"
},
"machine_template": {
"vcpu_count": 1,
"memsize_mib": 256
},
"valid_issuers": [
"AARBEQDCEKB7NYZLZRXAOAF6QGYGCN636VTN45USLIIW4QLG7Z2MBGH4",
"ABAGWNQ5V5H6LVODYATY5Q27OBQASRLSUG23FYBDWUR5BFI5UIMQ5GOQ"
]
}
25 changes: 25 additions & 0 deletions examples/nodeconfigs/rate_limited.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"kernel_path": "/home/kevin/lab/firecracker/vmlinux-5.10",
"rootfs_path": "/home/kevin/lab/firecracker/rootfs.ext4",
"machine_pool_size": 1,
"cni": {
"network_name": "fcnet",
"interface_name": "veth0"
},
"machine_template": {
"vcpu_count": 1,
"memsize_mib": 256
},
"rate_limiters": {
"bandwidth": {
"one_time_burst": 1048576,
"refill_time": 500,
"size": 1048576
},
"iops": {
"one_time_burst": 100,
"refill_time": 1000,
"size": 100
}
}
}
13 changes: 13 additions & 0 deletions examples/nodeconfigs/simple.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"kernel_path": "/home/kevin/lab/firecracker/vmlinux-5.10",
"rootfs_path": "/home/kevin/lab/firecracker/rootfs.ext4",
"machine_pool_size": 1,
"cni": {
"network_name": "fcnet",
"interface_name": "veth0"
},
"machine_template": {
"vcpu_count": 1,
"memsize_mib": 256
}
}
2 changes: 2 additions & 0 deletions examples/sampleissuer.nk
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SAACFXIHHBJMPWCV4BVIYNSFVUGZIQPVKATH5YHDD2ZY5GS7KGRABEBZ7E

2 changes: 2 additions & 0 deletions examples/samplexkey.xk
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
SXAAIDDDZFRNRZPSQIVNL4LVATOMBIRXN7ECDOVJWGDDTS5CZGUWIKWEFU

9 changes: 5 additions & 4 deletions nex-agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func RunWorkload(name string, totalBytes int32, tempFile *os.File, runtimeEnviro
// This has to be backgrounded because the workload could be a long-running process/service
go func() {
cmd := exec.Command(tempFile.Name())
cmd.Stdout = &logEmitter{stderr: false}
cmd.Stderr = &logEmitter{stderr: true}
cmd.Stdout = &logEmitter{stderr: false, name: name}
cmd.Stderr = &logEmitter{stderr: true, name: name}

envVars := make([]string, len(runtimeEnvironment))
for k, v := range runtimeEnvironment {
Expand All @@ -34,7 +34,7 @@ func RunWorkload(name string, totalBytes int32, tempFile *os.File, runtimeEnviro
err = cmd.Wait()
msg := ""
if err != nil {
msg = fmt.Sprintf("%v", err)
msg = fmt.Sprintf("Workload stopped unexpectedly: %s", err)
} else {
msg = "OK"
}
Expand All @@ -47,6 +47,7 @@ func RunWorkload(name string, totalBytes int32, tempFile *os.File, runtimeEnviro

type logEmitter struct {
stderr bool
name string
}

func (l *logEmitter) Write(bytes []byte) (int, error) {
Expand All @@ -57,7 +58,7 @@ func (l *logEmitter) Write(bytes []byte) (int, error) {
lvl = agentapi.LogLevel_LEVEL_DEBUG
}
entry := &agentapi.LogEntry{
Source: "workload",
Source: l.name,
Level: lvl,
Text: string(bytes),
}
Expand Down
2 changes: 1 addition & 1 deletion nex-cli/cmd/nex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func main() {
run.Arg("url", "URL pointing to the file to run").Required().URLVar(&cli.RunOpts.WorkloadUrl)
run.Arg("id", "Public key of the target node to run the workload").Required().StringVar(&cli.RunOpts.TargetNode)

run.Flag("xkey", "Publisher's Xkey required to encrypt environment").Required().ExistingFileVar(&cli.RunOpts.PublisherXkeyFile)
run.Flag("xkey", "Path to publisher's Xkey required to encrypt environment").Required().ExistingFileVar(&cli.RunOpts.PublisherXkeyFile)
run.Flag("issuer", "Path to a seed key to sign the workload JWT as the issuer").Required().ExistingFileVar(&cli.RunOpts.ClaimsIssuerFile)
run.Arg("env", "Environment variables to pass to workload").StringMapVar(&cli.RunOpts.Env)
run.Flag("name", "Name of the workload. Must be alphabetic (lowercase)").Required().StringVar(&cli.RunOpts.Name)
Expand Down
6 changes: 3 additions & 3 deletions nex-node/cmd/nex-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
log.SetReportCaller(false)

lvl, ok := os.LookupEnv("LOG_LEVEL")
// LOG_LEVEL not set, let's default to debug

if !ok || len(strings.TrimSpace(lvl)) == 0 {
lvl = "debug"
}
Expand Down Expand Up @@ -85,15 +85,15 @@ func cmdUp(opts *nexnode.CliOptions, ctx context.Context, cancel context.CancelF

nc, err := generateConnectionFromOpts(opts)
if err != nil {
fmt.Println("Failed to connect to NATS", err)
log.WithError(err).Error("Failed to connect to NATS")
os.Exit(1)
}

log.Infof("Established node NATS connection to: %s", opts.Servers)

config, err := nex.LoadNodeConfiguration(opts.MachineConfigFile)
if err != nil {
fmt.Println("Failed to load machine configuration file", err)
log.WithError(err).WithField("file", opts.MachineConfigFile).Error("Failed to load machine configuration file")
os.Exit(1)
}

Expand Down
14 changes: 7 additions & 7 deletions nex-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (
)

type NodeConfiguration struct {
KernelPath string `json:"kernel_path"`
RootFsPath string `json:"rootfs_path"`
MachinePoolSize int `json:"machine_pool_size"`
CNI CNIDefinition `json:"cni"`
MachineTemplate MachineTemplate `json:"machine_template"`
RateLimiters *Limiters `json:"rate_limiters,omitempty"`
RequesterPublicKeys []string `json:"requester_public_keys"`
KernelPath string `json:"kernel_path"`
RootFsPath string `json:"rootfs_path"`
MachinePoolSize int `json:"machine_pool_size"`
CNI CNIDefinition `json:"cni"`
MachineTemplate MachineTemplate `json:"machine_template"`
RateLimiters *Limiters `json:"rate_limiters,omitempty"`
ValidIssuers []string `json:"valid_issuers,omitempty"`
}

type Limiters struct {
Expand Down
19 changes: 16 additions & 3 deletions nex-node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"runtime"
"slices"
"strconv"
"time"

Expand Down Expand Up @@ -93,6 +94,11 @@ func handleRun(api *ApiListener) func(m *nats.Msg) {
return
}
request.DecodedClaims = *decodedClaims
if !validateIssuer(request.DecodedClaims.Issuer, api.mgr.config.ValidIssuers) {
err := fmt.Errorf("invalid workload issuer: %s", request.DecodedClaims.Issuer)
api.log.WithError(err).Error("Workload validation failed")
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("%s", err))
}

payloadFile, err := api.payloadCache.GetPayload(&request)
if err != nil {
Expand All @@ -107,15 +113,16 @@ func handleRun(api *ApiListener) func(m *nats.Msg) {
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to pull warm VM from ready pool: %s", err))
return
}
runningVm.workloadStarted = time.Now().UTC()
runningVm.workloadSpecification = request
specMap[runningVm.vmmID] = request

_, err = runningVm.agentClient.PostWorkload(payloadFile.Name(), request.WorkloadEnvironment)
if err != nil {
api.log.WithError(err).Error("Failed to start workload in VM")
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unable to submit workload to agent process: %s", err))
return
}
runningVm.workloadStarted = time.Now().UTC()
runningVm.workloadSpecification = request

res := controlapi.NewEnvelope(controlapi.RunResponseType, controlapi.RunResponse{
Started: true,
Expand Down Expand Up @@ -178,7 +185,6 @@ func summarizeMachines(vms *map[string]runningFirecracker) []controlapi.MachineS
machines := make([]controlapi.MachineSummary, 0)
now := time.Now().UTC()
for _, v := range *vms {
fmt.Printf("VM: %+v\n", v)
machine := controlapi.MachineSummary{
Id: v.vmmID,
Healthy: true, // TODO cache last health status
Expand All @@ -197,6 +203,13 @@ func summarizeMachines(vms *map[string]runningFirecracker) []controlapi.MachineS
return machines
}

func validateIssuer(issuer string, validIssuers []string) bool {
if len(validIssuers) == 0 {
return true
}
return slices.Contains(validIssuers, issuer)
}

// This is the same uptime code as the NATS server, for consistency
func myUptime(d time.Duration) string {
// Just use total seconds for uptime, and display days / years
Expand Down
20 changes: 13 additions & 7 deletions nex-node/fc_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
func createAndStartVM(ctx context.Context, config *NodeConfiguration) (*runningFirecracker, error) {
vmmID := xid.New().String()

copy(config.RootFsPath, "/tmp/rootfs-"+vmmID+".ext4")

fcCfg, err := generateFirecrackerConfig(vmmID, config)
copy(config.RootFsPath, *fcCfg.Drives[0].PathOnHost)

if err != nil {
log.Errorf("Error: %s", err)
return nil, err
Expand Down Expand Up @@ -59,8 +59,6 @@ func createAndStartVM(ctx context.Context, config *NodeConfiguration) (*runningF
cmd := firecracker.VMCommandBuilder{}.
WithBin(firecrackerBinary).
WithSocketPath(fcCfg.SocketPath).
// WithStdin(os.Stdin).
// WithStdout(os.Stdout).
WithStderr(os.Stderr).
Build(ctx)

Expand All @@ -80,9 +78,9 @@ func createAndStartVM(ctx context.Context, config *NodeConfiguration) (*runningF
return nil, fmt.Errorf("failed to start machine: %v", err)
}

log.WithField("ip", m.Cfg.NetworkInterfaces[0].StaticConfiguration.IPConfiguration.IPAddr.IP).Info("machine started")

ip := m.Cfg.NetworkInterfaces[0].StaticConfiguration.IPConfiguration.IPAddr.IP
log.WithField("ip", ip).Info("machine started")

return &runningFirecracker{
vmmCtx: vmmCtx,
vmmCancel: vmmCancel,
Expand All @@ -105,14 +103,15 @@ func copy(src string, dst string) error {

func generateFirecrackerConfig(id string, config *NodeConfiguration) (firecracker.Config, error) {
socket := getSocketPath(id)
rootPath := getRootFsPath(id)

return firecracker.Config{
SocketPath: socket,
KernelImagePath: config.KernelPath,
LogPath: fmt.Sprintf("%s.log", socket),
Drives: []models.Drive{{
DriveID: firecracker.String("1"),
PathOnHost: firecracker.String("/tmp/rootfs-" + id + ".ext4"),
PathOnHost: &rootPath,
IsRootDevice: firecracker.Bool(true),
IsReadOnly: firecracker.Bool(false),
// RateLimiter: firecracker.NewRateLimiter(
Expand Down Expand Up @@ -147,6 +146,13 @@ func generateFirecrackerConfig(id string, config *NodeConfiguration) (firecracke
}, nil
}

func getRootFsPath(vmmID string) string {
filename := fmt.Sprintf("rootfs-%s.ext4", vmmID)
dir := os.TempDir()

return filepath.Join(dir, filename)
}

func getSocketPath(vmmID string) string {
filename := strings.Join([]string{
".firecracker.sock",
Expand Down
20 changes: 14 additions & 6 deletions nex-node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

agentapi "github.com/ConnectEverything/nex/agent-api"
controlapi "github.com/ConnectEverything/nex/control-api"
cloudevents "github.com/cloudevents/sdk-go"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -44,6 +45,7 @@ type emittedLog struct {

func (m *MachineManager) Start() error {
m.log.Info("Virtual machine manager starting")
specMap = make(map[string]controlapi.RunRequest)
go m.fillPool()

return nil
Expand Down Expand Up @@ -139,7 +141,7 @@ func (m *MachineManager) fillPool() {
m.log.WithError(err).Error("Failed to get channels from client for gRPC subscriptions")
return
}
go dispatchLogs(vm, m.kp, m.nc, logs, m.log)
go dispatchLogs(vm.vmmID, vm.ip.String(), m.kp, m.nc, logs, m.log)
go dispatchEvents(m, vm, m.kp, m.nc, events, m.log)

// Add the new microVM to the pool.
Expand All @@ -151,21 +153,27 @@ func (m *MachineManager) fillPool() {
}
}

func dispatchLogs(vm *runningFirecracker, kp nkeys.KeyPair, nc *nats.Conn, logs <-chan *agentapi.LogEntry, log *logrus.Logger) {
func dispatchLogs(vmId string, ip string, kp nkeys.KeyPair, nc *nats.Conn, logs <-chan *agentapi.LogEntry, log *logrus.Logger) {
pk, _ := kp.PublicKey()
for {
entry := <-logs
spec := specMap[vmId]
workloadName := spec.DecodedClaims.Subject
lvl := getLogrusLevel(entry)
emitLog := emittedLog{
Text: entry.Text,
Level: lvl,
MachineId: vm.vmmID,
MachineId: vmId,
}
logBytes, _ := json.Marshal(emitLog)
workloadName := vm.workloadSpecification.DecodedClaims.Subject
// $NEX.LOGS.{host}.{workload}.{vm}
nc.Publish(fmt.Sprintf("%s.%s.%s.%s", logSubjectPrefix, pk, workloadName, vm.vmmID), logBytes)
log.WithField("vmid", vm.vmmID).WithField("ip", vm.ip).Log(lvl, entry.Text)
subject := fmt.Sprintf("%s.%s.%s.%s", logSubjectPrefix, pk, workloadName, vmId)
fmt.Printf("SUBJECT: %s\n", subject)
err := nc.Publish(subject, logBytes)
if err != nil {
log.WithField("vmid", vmId).WithField("ip", ip).WithField("subject", subject).Warn("Failed to publish log on logs subject")
}
log.WithField("vmid", vmId).WithField("ip", ip).Log(lvl, entry.Text)

}
}
Expand Down
7 changes: 6 additions & 1 deletion nex-node/running_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
log "github.com/sirupsen/logrus"
)

var (
specMap map[string]controlapi.RunRequest
)

// Represents an instance of a single firecracker VM containing the nex agent.
type runningFirecracker struct {
vmmCtx context.Context
Expand Down Expand Up @@ -44,7 +48,8 @@ func (vm runningFirecracker) shutDown() {
if err != nil {
log.WithError(err).Warn("Failed to delete firecracker socket")
}
err = os.Remove("/tmp/rootfs-" + vm.vmmID + ".ext4")
rootFs := getRootFsPath(vm.vmmID)
err = os.Remove(rootFs)
if err != nil {
log.WithError(err).Warn("Failed to delete firecracker rootfs")
}
Expand Down

0 comments on commit 7d10c08

Please sign in to comment.