Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update synchronizer and use Jitter Buffer #118

Merged
merged 5 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require (
github.com/gorilla/mux v1.8.0
github.com/livekit/go-rtmp v0.0.0-20230317185657-6e9cfa387c7e
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230517210015-117bec6a19a8
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135
github.com/livekit/protocol v1.5.8-0.20230614173826-89359963fc24
github.com/livekit/psrpc v0.3.1
github.com/livekit/server-sdk-go v1.0.11-0.20230504163637-22d8afafa6b0
github.com/livekit/server-sdk-go v1.0.12-0.20230614223322-5fdaa0386d4a
github.com/pion/interceptor v0.1.17
github.com/pion/rtcp v1.2.10
github.com/pion/rtp v1.7.13
Expand Down Expand Up @@ -51,7 +51,7 @@ require (
github.com/klauspost/compress v1.16.5 // indirect
github.com/lithammer/shortuuid/v4 v4.0.0 // indirect
github.com/mackerelio/go-osstat v0.2.4 // indirect
github.com/magefile/mage v1.14.0 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mattn/go-pointer v0.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,18 @@ github.com/livekit/go-rtmp v0.0.0-20230317185657-6e9cfa387c7e h1:Fw7uyi8OK3M7iAp
github.com/livekit/go-rtmp v0.0.0-20230317185657-6e9cfa387c7e/go.mod h1:X+CliWDrjhm5C+NgmxVt2ncdO3MnKDlbZHTwkuf0808=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230517210015-117bec6a19a8 h1:YgBDljjYPJc57sSwaoyUgiviThQDyS7SyWsXJSRsZH8=
github.com/livekit/mediatransportutil v0.0.0-20230517210015-117bec6a19a8/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34=
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135 h1:lWYbsondvqG69czxoACDwaJ/BoyD57BahCo70ZH+m4U=
github.com/livekit/mediatransportutil v0.0.0-20230612070454-d5299b956135/go.mod h1:MRc0zSOSzXuFt0X218SgabzlaKevkvCckPgBEoHYc34=
github.com/livekit/protocol v1.5.8-0.20230614173826-89359963fc24 h1:RKVbd4Ck114gnM0dP61VKqr9B9EMwdAnH7giXHxiBx0=
github.com/livekit/protocol v1.5.8-0.20230614173826-89359963fc24/go.mod h1:Y+jl7rD7u8ZMfIUzGr41DU7G5j+34rtgefTCrD/ApZc=
github.com/livekit/psrpc v0.3.1 h1:KfylgJHvoLQcc22t/oflwMOeSnx0c14G7cWsS+9MYS4=
github.com/livekit/psrpc v0.3.1/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/livekit/server-sdk-go v1.0.11-0.20230504163637-22d8afafa6b0 h1:FazC6kxZZJ9n8GsUlhfZNzXMfknb9tEMXbDMN1qgyFQ=
github.com/livekit/server-sdk-go v1.0.11-0.20230504163637-22d8afafa6b0/go.mod h1:6RR74y85Hd3DjI1T66mhPJkOWDTquQ9MebQMyIY086A=
github.com/livekit/server-sdk-go v1.0.12-0.20230614223322-5fdaa0386d4a h1:hMCE2b1txjepy0tWv4rBfu+QHPuVUyTfgDmaO70AXwU=
github.com/livekit/server-sdk-go v1.0.12-0.20230614223322-5fdaa0386d4a/go.mod h1:2Q1KLiWyLjG1NGhe7y2r35qs8OVMwXzjyQeCXqGyAKU=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo=
github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
Expand Down
6 changes: 2 additions & 4 deletions pkg/lksdk_output/lksdk_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func (s *LKSDKOutput) AddAudioTrack(output lksdk.SampleProvider, mimeType string
logger.Errorw("could not unpublish audio track", err)
}
}
// ensure that OnUnbind is called as it may not be in case of read failure
output.OnUnbind()
output.Close()
}
track.OnBind(func() {
if err := track.StartWrite(output, onComplete); err != nil {
Expand Down Expand Up @@ -111,8 +110,7 @@ func (s *LKSDKOutput) AddVideoTrack(outputs []VideoSampleProvider, layers []*liv
}
}
}
// ensure that OnUnbind is called as it may not be in case of read failure
output.OnUnbind()
output.Close()
}

onRTCP := func(pkt rtcp.Packet) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/media/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ func (e *Output) OnBind() error {
func (e *Output) OnUnbind() error {
e.logger.Infow("sample provider unbound")

return e.Close()
}

func (e *Output) Close() error {

e.fuse.Break()

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/whip/relay_media_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func (rs *RelayMediaSink) SetWriter(w io.WriteCloser) error {
return rs.mediaBuffer.SetWriter(w)
}

func (rs *RelayMediaSink) Close() {
func (rs *RelayMediaSink) Close() error {
rs.mediaBuffer.Close()

return nil
}
4 changes: 3 additions & 1 deletion pkg/whip/sdk_media_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ func (sp *SDKMediaSink) SetWriter(w io.WriteCloser) error {
return psrpc.Unimplemented
}

func (sp *SDKMediaSink) Close() {
func (sp *SDKMediaSink) Close() error {
sp.fuse.Break()

return nil
}

func (sp *SDKMediaSink) ensureTrackInitialized(s *media.Sample) error {
Expand Down
128 changes: 94 additions & 34 deletions pkg/whip/whip_track_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package whip

import (
"bytes"
"io"
"net"
"strings"
Expand All @@ -10,7 +11,7 @@ import (
"github.com/frostbyte73/core"
"github.com/livekit/ingress/pkg/errors"
"github.com/livekit/protocol/logger"
"github.com/livekit/server-sdk-go/pkg/samplebuilder"
"github.com/livekit/server-sdk-go/pkg/jitter"
"github.com/livekit/server-sdk-go/pkg/synchronizer"
"github.com/pion/rtcp"
"github.com/pion/rtp"
Expand All @@ -20,24 +21,25 @@ import (
)

const (
maxVideoLate = 300 // nearly 2s for fhd video
maxAudioLate = 25 // 4s for audio
maxVideoLatency = 600 * time.Millisecond
maxAudioLatency = time.Second
)

type MediaSink interface {
PushSample(s *media.Sample, ts time.Duration) error
Close()
Close() error
}

type whipTrackHandler struct {
logger logger.Logger
remoteTrack *webrtc.TrackRemote
receiver *webrtc.RTPReceiver
sb *samplebuilder.SampleBuilder
mediaSink MediaSink
sync *synchronizer.TrackSynchronizer
writePLI func(ssrc webrtc.SSRC)
onRTCP func(packet rtcp.Packet)
logger logger.Logger
remoteTrack *webrtc.TrackRemote
receiver *webrtc.RTPReceiver
depacketizer rtp.Depacketizer
jb *jitter.Buffer
mediaSink MediaSink
sync *synchronizer.TrackSynchronizer
writePLI func(ssrc webrtc.SSRC)
onRTCP func(packet rtcp.Packet)

firstPacket sync.Once
fuse core.Fuse
Expand All @@ -63,11 +65,17 @@ func newWHIPTrackHandler(
fuse: core.NewFuse(),
}

sb, err := t.createSampleBuilder()
jb, err := t.createJitterBuffer()
if err != nil {
return nil, err
}
t.sb = sb
t.jb = jb

depacketizer, err := t.createDepacketizer()
if err != nil {
return nil, err
}
t.depacketizer = depacketizer

return t, nil
}
Expand Down Expand Up @@ -142,19 +150,51 @@ func (t *whipTrackHandler) processRTPPacket() error {

t.firstPacket.Do(func() {
t.logger.Debugw("first packet received")
t.sync.FirstPacketForTrack(pkt)
t.sync.Initialize(pkt)
})

t.sb.Push(pkt)
t.jb.Push(pkt)

for {
s, rtpTs := t.sb.PopWithTimestamp()
if s == nil {
pkts := t.jb.Pop(false)
if len(pkts) == 0 {
break
}

ts, err := t.sync.GetPTS(rtpTs)
if err != nil {
return err
var ts time.Duration
var buffer bytes.Buffer // TODO reuse the same buffer across calls, after resetting it if buffer allocation is a performane bottleneck
for _, pkt := range pkts {
ts, err = t.sync.GetPTS(pkt)
switch err {
case nil, synchronizer.ErrBackwardsPTS:
err = nil
default:
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll now return an ErrBackwardsPTS that you might want to ignore

}

if len(pkt.Payload) <= 2 {
// Padding
continue
}

buf, err := t.depacketizer.Unmarshal(pkt.Payload)
if err != nil {
return err
}

_, err = buffer.Write(buf)
if err != nil {
return err
}
}

// This returns the average duration, not the actual duration of the specific sample
// SampleBuilder is using the duration of the previous sample, which is inaccurate as well
sampleDuration := t.sync.GetFrameDuration()

s := &media.Sample{
Data: buffer.Bytes(),
Duration: sampleDuration,
}

err = t.mediaSink.PushSample(s, ts)
Expand Down Expand Up @@ -202,35 +242,55 @@ func (t *whipTrackHandler) startRTCPReceiver() {
}()
}

func (t *whipTrackHandler) createSampleBuilder() (*samplebuilder.SampleBuilder, error) {
func (t *whipTrackHandler) createDepacketizer() (rtp.Depacketizer, error) {
var depacketizer rtp.Depacketizer
var maxLate uint16
var writePLI func()

switch strings.ToLower(t.remoteTrack.Codec().MimeType) {
case strings.ToLower(webrtc.MimeTypeVP8):
depacketizer = &codecs.VP8Packet{}
maxLate = maxVideoLate
writePLI = func() { t.writePLI(t.remoteTrack.SSRC()) }

case strings.ToLower(webrtc.MimeTypeH264):
depacketizer = &codecs.H264Packet{}
maxLate = maxVideoLate
writePLI = func() { t.writePLI(t.remoteTrack.SSRC()) }

case strings.ToLower(webrtc.MimeTypeOpus):
depacketizer = &codecs.OpusPacket{}
maxLate = maxAudioLate

default:
return nil, errors.ErrUnsupportedDecodeFormat
}

return depacketizer, nil
}

func (t *whipTrackHandler) createJitterBuffer() (*jitter.Buffer, error) {
var maxLatency time.Duration
options := []jitter.Option{jitter.WithLogger(t.logger)}

depacketizer, err := t.createDepacketizer()
if err != nil {
return nil, err
}

switch strings.ToLower(t.remoteTrack.Codec().MimeType) {
case strings.ToLower(webrtc.MimeTypeVP8):
maxLatency = maxVideoLatency
options = append(options, jitter.WithPacketDroppedHandler(func() { t.writePLI(t.remoteTrack.SSRC()) }))

case strings.ToLower(webrtc.MimeTypeH264):
maxLatency = maxVideoLatency
options = append(options, jitter.WithPacketDroppedHandler(func() { t.writePLI(t.remoteTrack.SSRC()) }))

case strings.ToLower(webrtc.MimeTypeOpus):
maxLatency = maxAudioLatency
// No PLI for audio

default:
return nil, errors.ErrUnsupportedDecodeFormat
}

sb := samplebuilder.New(
maxLate, depacketizer, t.remoteTrack.Codec().ClockRate,
samplebuilder.WithPacketDroppedHandler(writePLI),
)
clockRate := t.remoteTrack.Codec().ClockRate

jb := jitter.NewBuffer(depacketizer, clockRate, maxLatency, options...)

return sb, nil
return jb, nil
}