Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preliminary check before registering new receivers in the log handler #173

Merged
merged 4 commits into from
Feb 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/controllers/websocket/mux.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package websocket

import (
"fmt"
"net/http"

"github.com/gorilla/websocket"
Expand All @@ -18,6 +19,7 @@ type PartitionKeyGenerator interface {
type PartitionRegistry interface {
Register(key string, buffer chan string)
Deregister(key string, buffer chan string)
IsKeyExists(key string) bool
}

// Multiplexor is responsible for handling the data transfer between the storage layer
Expand Down Expand Up @@ -51,6 +53,11 @@ func (m *Multiplexor) Handle(w http.ResponseWriter, r *http.Request) error {
return errors.Wrapf(err, "generating partition key")
}

// check if the job ID exists before registering receiver
if !m.registry.IsKeyExists(key) {
return fmt.Errorf("invalid key: %s", key)
}

// Buffer size set to 1000 to ensure messages get queued.
// TODO: make buffer size configurable
buffer := make(chan string, 1000)
Expand Down
42 changes: 42 additions & 0 deletions server/handlers/mocks/mock_project_command_output_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 14 additions & 9 deletions server/handlers/project_command_output_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ProjectCommandOutputHandler interface {
// Deregister removes a channel from successive updates and closes it.
Deregister(jobID string, receiver chan string)

IsKeyExists(key string) bool

// Listens for msg from channel
Handle()

Expand Down Expand Up @@ -114,6 +116,13 @@ func NewAsyncProjectCommandOutputHandler(
}
}

func (p *AsyncProjectCommandOutputHandler) IsKeyExists(key string) bool {
p.receiverBuffersLock.RLock()
defer p.receiverBuffersLock.RUnlock()
_, ok := p.receiverBuffers[key]
return ok
}

func (p *AsyncProjectCommandOutputHandler) Send(ctx models.ProjectCommandContext, msg string, operationComplete bool) {
p.projectCmdOutput <- &ProjectCmdOutputLine{
JobID: ctx.JobID,
Expand Down Expand Up @@ -219,12 +228,7 @@ func (p *AsyncProjectCommandOutputHandler) writeLogLine(jobID string, line strin
select {
case ch <- line:
default:
// Client ws conn could be closed in two ways:
// 1. Client closes the conn gracefully -> the closeHandler() is executed which
// closes the channel and cleans up resources.
// 2. Client does not close the conn and the closeHandler() is not executed -> the
// receiverChan will be blocking for N number of messages (equal to buffer size)
// before we delete the channel and clean up the resources.
// Delete buffered channel if it's blocking.
delete(p.receiverBuffers[jobID], ch)
}
}
Expand Down Expand Up @@ -274,9 +278,6 @@ func (p *AsyncProjectCommandOutputHandler) CleanUp(pullContext PullContext) {
delete(p.projectOutputBuffers, jobID)
p.projectOutputBuffersLock.Unlock()

// Only delete the pull record from receiver buffers.
// WS channel will be closed when the user closes the browser tab
// in closeHanlder().
p.receiverBuffersLock.Lock()
delete(p.receiverBuffers, jobID)
p.receiverBuffersLock.Unlock()
Expand Down Expand Up @@ -305,3 +306,7 @@ func (p *NoopProjectOutputHandler) SetJobURLWithStatus(ctx models.ProjectCommand

func (p *NoopProjectOutputHandler) CleanUp(pullContext PullContext) {
}

func (p *NoopProjectOutputHandler) IsKeyExists(key string) bool {
return false
}