-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsubscriptions.go
130 lines (111 loc) · 3.8 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package mqtt
import (
"errors"
"strings"
)
// Subscriptions is a WIP.
// subscriptions provides clients and servers with a way to manage requested
// topic published messages. subscriptions is an abstraction over state, not
// input/output operations, so calls to Subscribe should not write bytes over a transport.
type subscriptions interface {
// Subscribe takes a []byte slice to make it explicit and abundantly clear that
// Subscriptions is in charge of the memory corresponding to subscription topics.
// This is to say that Subscriptions should copy topic contents into its own memory
// storage mechanism or allocate the topic on the heap.
Subscribe(topic []byte) error
// Successfully matched topics are stored in the userBuffer and returned
// as a slice of byte slices.
// Match finds all subscribers to a topic or a filter.
Match(topicFilter string, userBuffer []byte) ([][]byte, error)
Unsubscribe(topicFilter string, userBuffer []byte) ([][]byte, error)
}
// TODO(soypat): Add AVL tree implementation like the one in github.com/soypat/go-canard, supposedly is best data structure for this [citation needed].
var _ subscriptions = subscriptionsMap{}
// subscriptionsMap implements Subscriptions interface with a map.
// It performs allocations.
type subscriptionsMap map[string]struct{}
func (sm subscriptionsMap) Subscribe(topic []byte) error {
tp := string(topic)
if _, ok := sm[tp]; ok {
return errors.New("topic already exists in subscriptions")
}
sm[tp] = struct{}{}
return nil
}
func (sm subscriptionsMap) Unsubscribe(topicFilter string, userBuffer []byte) (matched [][]byte, err error) {
return sm.match(topicFilter, userBuffer, true)
}
func (sm subscriptionsMap) Match(topicFilter string, userBuffer []byte) (matched [][]byte, err error) {
return sm.match(topicFilter, userBuffer, false)
}
func (sm subscriptionsMap) match(topicFilter string, userBuffer []byte, deleteMatches bool) (matched [][]byte, err error) {
n := 0 // Bytes copied into userBuffer.
filterParts := strings.Split(topicFilter, "/")
if err := validateWildcards(filterParts); err != nil {
return nil, err
}
_, hasNonWildSub := sm[topicFilter]
if hasNonWildSub {
if len(topicFilter) > len(userBuffer) {
return nil, ErrUserBufferFull
}
n += copy(userBuffer, topicFilter)
matched = append(matched, userBuffer[:n])
userBuffer = userBuffer[n:]
if deleteMatches {
delete(sm, topicFilter)
}
}
for k := range sm {
parts := strings.Split(k, "/")
if matches(filterParts, parts) {
if len(k) > len(userBuffer) {
return matched, ErrUserBufferFull
}
n += copy(userBuffer, k)
matched = append(matched, userBuffer[:n])
userBuffer = userBuffer[n:]
if deleteMatches {
delete(sm, k)
}
}
}
return matched, nil
}
func matches(filter, topicParts []string) bool {
i := 0
for i < len(topicParts) {
// topic is longer, no match
if i >= len(filter) {
return false
}
// matched up to here, and now the wildcard says "all others will match"
if filter[i] == "#" {
return true
}
// text does not match, and there wasn't a + to excuse it
if topicParts[i] != filter[i] && filter[i] != "+" {
return false
}
i++
}
// make finance/stock/ibm/# match finance/stock/ibm
return i == len(filter)-1 && filter[len(filter)-1] == "#" || i == len(filter)
}
func isWildcard(topic string) bool {
return strings.IndexByte(topic, '#') >= 0 || strings.IndexByte(topic, '+') >= 0
}
func validateWildcards(wildcards []string) error {
for i, part := range wildcards {
// catch things like finance#
if isWildcard(part) && len(part) != 1 {
return errors.New("malformed wildcard of style \"finance#\"")
}
isSingle := len(part) == 1 && part[0] == '#'
// # can only occur as the last part
if isSingle && i != len(wildcards)-1 {
return errors.New("last wildcard is single \"#\"")
}
}
return nil
}