Skip to content

Commit

Permalink
separates subscribe from log output, moves monitor to api client (#13)
Browse files Browse the repository at this point in the history
* separates subscribe from log output, moves monitor to api client

* lintergit add .
  • Loading branch information
autodidaddict authored Dec 29, 2023
1 parent 6fe9043 commit d590f07
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 132 deletions.
138 changes: 132 additions & 6 deletions control-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)

// API subjects:
Expand All @@ -20,16 +23,19 @@ type apiClient struct {
nc *nats.Conn
timeout time.Duration
namespace string
log *logrus.Logger
}

// Creates a new client to communicate with a group of NEX nodes
func NewApiClient(nc *nats.Conn, timeout time.Duration) *apiClient {
return NewApiClientWithNamespace(nc, timeout, "default")
// Creates a new client to communicate with a group of NEX nodes, using the
// namespace of 'default' for applicable requests
func NewApiClient(nc *nats.Conn, timeout time.Duration, log *logrus.Logger) *apiClient {
return NewApiClientWithNamespace(nc, timeout, "default", log)
}

// Creates a new client to communicate with a group of NEX nodes all within a given namespace
func NewApiClientWithNamespace(nc *nats.Conn, timeout time.Duration, namespace string) *apiClient {
return &apiClient{nc: nc, timeout: timeout, namespace: namespace}
// Creates a new client to communicate with a group of NEX nodes all within a given namespace. Note that
// this namespace is used for requests where it is mandatory
func NewApiClientWithNamespace(nc *nats.Conn, timeout time.Duration, namespace string, log *logrus.Logger) *apiClient {
return &apiClient{nc: nc, timeout: timeout, namespace: namespace, log: log}
}

// Attempts to stop a running workload. This can fail for a wide variety of reasons, the most common
Expand Down Expand Up @@ -122,6 +128,126 @@ func (api *apiClient) ListNodes() ([]PingResponse, error) {
return responses, nil
}

// A convenience function that subscribes to all available logs and uses
// an unbuffered, blocking channel
func (api *apiClient) MonitorAllLogs() (chan EmittedLog, error) {
return api.MonitorLogs("*", "*", "*", "*", 0)
}

// Creates a NATS subscription to the appropriate log subject. If you do not want to limit
// the monitor by any of the filters, supply a '*', not an empty string. Bufferlength refers
// to the size of the channel buffer, where 0 is unbuffered (aka blocking)
func (api *apiClient) MonitorLogs(
namespaceFilter string,
nodeFilter string,
workloadFilter string,
vmFilter string,
bufferLength int) (chan EmittedLog, error) {

subject := fmt.Sprintf("%s.logs.%s.%s.%s.%s", APIPrefix,
namespaceFilter,
nodeFilter,
workloadFilter,
vmFilter)

logChannel := make(chan EmittedLog, bufferLength)
_, err := api.nc.Subscribe(subject, handleLogEntry(api, logChannel))
if err != nil {
return nil, err
}

return logChannel, nil
}

// A convenience function that monitors all available events without filter, and
// uses an unbuffered (blocking) channel for the results
func (api *apiClient) MonitorAllEvents() (chan EmittedEvent, error) {
return api.MonitorEvents("*", "*", 0)
}

// Creates a NATS subscription to the appropriate event subject. If you don't want to limit
// the monitor to a specific namespace or event type, then supply '*' for both values, not
// an empty string. Buffer length is the size of the channel buffer, where 0 is unbuffered (blocking)
func (api *apiClient) MonitorEvents(
namespaceFilter string,
eventTypeFilter string,
bufferLength int) (chan EmittedEvent, error) {

subscribeSubject := fmt.Sprintf("%s.events.%s.*", APIPrefix, namespaceFilter)

eventChannel := make(chan EmittedEvent, bufferLength)

_, err := api.nc.Subscribe(subscribeSubject, handleEventEntry(api, eventChannel))
if err != nil {
return nil, err
}

// Add a monitor for the system namespace if the supplied filter doesn't
// already include it
if namespaceFilter != "*" && namespaceFilter != "system" {
systemSub := fmt.Sprintf("%s.events.system.*", APIPrefix)
_, err = api.nc.Subscribe(systemSub, handleEventEntry(api, eventChannel))
if err != nil {
return nil, err
}
}

return eventChannel, nil
}

func handleEventEntry(api *apiClient, ch chan EmittedEvent) func(m *nats.Msg) {
return func(m *nats.Msg) {
tokens := strings.Split(m.Subject, ".")
if len(tokens) != 4 {
return
}

namespace := tokens[2]
eventType := tokens[3]

event := cloudevents.NewEvent()
err := json.Unmarshal(m.Data, &event)
if err != nil {
return
}

ch <- EmittedEvent{
Event: event,
Namespace: namespace,
EventType: eventType,
}
}
}

func handleLogEntry(api *apiClient, ch chan EmittedLog) func(m *nats.Msg) {
return func(m *nats.Msg) {
/*
$NEX.logs.{namespace}.{node}.{workload}.{vm}
*/
tokens := strings.Split(m.Subject, ".")
if len(tokens) != 6 {
return
}
var logEntry rawLog
err := json.Unmarshal(m.Data, &logEntry)
if err != nil {
api.log.WithError(err).Error("Log entry deserialization failure")
return
}
if logEntry.Level == 0 {
logEntry.Level = logrus.DebugLevel
}

ch <- EmittedLog{
Namespace: tokens[2],
NodeId: tokens[3],
Workload: tokens[4],
rawLog: logEntry,
}
}

}

// Helper that submits data, gets a standard envelope back, and returns the inner data
// payload as JSON
func (api *apiClient) performRequest(subject string, raw interface{}) ([]byte, error) {
Expand Down
84 changes: 84 additions & 0 deletions control-api/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package controlapi

import (
"encoding/json"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)

func TestEventMonitor(t *testing.T) {
nc, _ := nats.Connect(nats.DefaultURL)
log := logrus.New()
apiClient := NewApiClient(nc, time.Second, log)
ch, err := apiClient.MonitorEvents("*", "*", 0)
if err != nil {
t.Fatalf("Failed to create log monitor: %s", err)
}

type testStruct struct {
Bob string `json:"bob"`
Alice string `json:"alice"`
}

evt := cloudevents.NewEvent()
evt.SetType("workload_started")
evt.SetID("1")
evt.SetSource("testing")
_ = evt.SetData(testStruct{
Bob: "1",
Alice: "2",
})

bytes, _ := json.Marshal(evt)
_ = nc.Publish("$NEX.events.default.workload_started", bytes)

actualEvent := <-ch
if actualEvent.EventType != "workload_started" {
t.Fatal("Event wrapper didn't maintain event type")
}
if actualEvent.Namespace != "default" {
t.Fatal("Event wrapper didn't maintain namespace")
}
var ts testStruct
err = actualEvent.DataAs(&ts)
if err != nil {
t.Fatalf("Event wrapper lost fidelity of event data: %s", err)
}
if ts.Alice != "2" || ts.Bob != "1" {
t.Fatalf("Lost data in event round trip!: %+v", ts)
}
}

func TestLogMonitor(t *testing.T) {
nc, _ := nats.Connect(nats.DefaultURL)
log := logrus.New()
apiClient := NewApiClient(nc, time.Second, log)
ch, err := apiClient.MonitorLogs("*", "*", "*", "*", 0)
if err != nil {
t.Fatalf("Failed to create log monitor: %s", err)
}
rawLog := rawLog{Text: "hey from test", Level: logrus.DebugLevel, MachineId: "vm1234"}
bytes, _ := json.Marshal(rawLog)

_ = nc.Publish("$NEX.logs.default.Nxxxx.echoservice.vm1234", bytes)
actualEntry := <-ch
if actualEntry.Namespace != "default" {
t.Fatalf("namespace in log should be default, found %s", actualEntry.Namespace)
}
if actualEntry.NodeId != "Nxxxx" {
t.Fatalf("node ID failed to propogate, should be Nxxx found %s", actualEntry.NodeId)
}
if actualEntry.Workload != "echoservice" {
t.Fatalf("workload failed to propogate, should be echoservice, found %s", actualEntry.Workload)
}
if actualEntry.MachineId != "vm1234" {
t.Fatalf("did not get the right machine ID. expected vm1234, found %s", actualEntry.MachineId)
}
if actualEntry.rawLog != rawLog {
t.Fatalf("Failed to wrap the raw on the wire log: %+v", actualEntry)
}
}
27 changes: 27 additions & 0 deletions control-api/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package controlapi

import (
cloudevents "github.com/cloudevents/sdk-go"
"github.com/sirupsen/logrus"
)

const (
APIPrefix = "$NEX"
)
Expand Down Expand Up @@ -65,6 +70,28 @@ type Envelope struct {
Error interface{} `json:"error,omitempty"`
}

// Wrapper for what goes across the wire
type EmittedLog struct {
Namespace string
NodeId string
Workload string
rawLog
}

type rawLog struct {
Text string `json:"text"`
Level logrus.Level `json:"level"`
MachineId string `json:"machine_id"`
}

// Note this a wrapper to add context to a cloud event, and is not
// intended to be sent on the wire as-is
type EmittedEvent struct {
cloudevents.Event
Namespace string
EventType string
}

func NewEnvelope(dataType string, data interface{}, err *string) Envelope {
var e interface{}
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions nex-agent/cmd/nex-agent/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build linux

package main

import (
Expand Down
4 changes: 3 additions & 1 deletion nex-cli/devrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/choria-io/fisk"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/sirupsen/logrus"
)

var (
Expand Down Expand Up @@ -46,7 +47,8 @@ func RunDevWorkload(ctx *fisk.ParseContext) error {
}
// developer mode can have a smaller discovery timeout, since we're assuming there's a NEX
// node "nearby"
nodeClient := controlapi.NewApiClientWithNamespace(nc, 750*time.Millisecond, "default")
log := logrus.New()
nodeClient := controlapi.NewApiClientWithNamespace(nc, 750*time.Millisecond, "default", log)

candidates, err := nodeClient.ListNodes()
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions nex-cli/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
controlapi "github.com/ConnectEverything/nex/control-api"
"github.com/choria-io/fisk"
"github.com/nats-io/natscli/columns"
"github.com/sirupsen/logrus"
)

// Uses a control API client to request a node list from a NATS environment
Expand All @@ -17,7 +18,8 @@ func ListNodes(ctx *fisk.ParseContext) error {
if err != nil {
return err
}
nodeClient := controlapi.NewApiClient(nc, Opts.Timeout)
log := logrus.New()
nodeClient := controlapi.NewApiClient(nc, Opts.Timeout, log)

nodes, err := nodeClient.ListNodes()
if err != nil {
Expand All @@ -36,7 +38,8 @@ func NodeInfo(ctx *fisk.ParseContext) error {
if err != nil {
return err
}
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace)
log := logrus.New()
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace, log)
id := ctx.SelectedCommand.Model().Args[0].Value.String()
nodeInfo, err := nodeClient.NodeInfo(id)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions nex-cli/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
controlapi "github.com/ConnectEverything/nex/control-api"
"github.com/choria-io/fisk"
"github.com/nats-io/nkeys"
"github.com/sirupsen/logrus"
)

// Issues a request to stop a running workload
Expand All @@ -15,7 +16,8 @@ func StopWorkload(ctx *fisk.ParseContext) error {
if err != nil {
return err
}
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace)
log := logrus.New()
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace, log)

issuerSeed, err := os.ReadFile(StopOpts.ClaimsIssuerFile)
if err != nil {
Expand Down Expand Up @@ -47,7 +49,8 @@ func RunWorkload(ctx *fisk.ParseContext) error {
if err != nil {
return err
}
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace)
log := logrus.New()
nodeClient := controlapi.NewApiClientWithNamespace(nc, Opts.Timeout, Opts.Namespace, log)

// Get node info so we can get public xkey from the target for env encryption
nodeInfo, err := nodeClient.NodeInfo(RunOpts.TargetNode)
Expand Down
Loading

0 comments on commit d590f07

Please sign in to comment.