-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathclient.go
332 lines (303 loc) · 10.3 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
package mqtt
import (
"context"
"errors"
"io"
"net"
"sync"
"time"
)
var (
errDisconnected = errors.New("natiu-mqtt: disconnected")
)
// Client is a asynchronous MQTT v3.1.1 client implementation which is
// safe for concurrent use.
type Client struct {
cs clientState
rxlock sync.Mutex
rx Rx
txlock sync.Mutex
tx Tx
}
// ClientConfig is used to configure a new Client.
type ClientConfig struct {
// If a Decoder is not set one will automatically be picked.
Decoder Decoder
// OnPub is executed on every PUBLISH message received. Do not call
// HandleNext or other client methods from within this function.
OnPub func(pubHead Header, varPub VariablesPublish, r io.Reader) error
// TODO: add a backoff algorithm callback here so clients can roll their own.
}
// NewClient creates a new MQTT client with the configuration parameters provided.
// If no Decoder is provided a DecoderNoAlloc will be used.
func NewClient(cfg ClientConfig) *Client {
var onPub func(rx *Rx, varPub VariablesPublish, r io.Reader) error
if cfg.OnPub != nil {
onPub = func(rx *Rx, varPub VariablesPublish, r io.Reader) error {
return cfg.OnPub(rx.LastReceivedHeader, varPub, r)
}
}
if cfg.Decoder == nil {
cfg.Decoder = DecoderNoAlloc{UserBuffer: make([]byte, 4*1024)}
}
c := &Client{cs: clientState{closeErr: errors.New("yet to connect")}}
c.rx.RxCallbacks, c.tx.TxCallbacks = c.cs.callbacks(onPub)
c.rx.userDecoder = cfg.Decoder
return c
}
// HandleNext reads from the wire and decodes MQTT packets.
// If bytes are read and the decoder fails to read a packet the whole
// client fails and disconnects.
// HandleNext only returns an error in the case where the OnPub callback passed
// in the ClientConfig returns an error or if a packet is malformed.
// If HandleNext returns an error the client will be in a disconnected state.
func (c *Client) HandleNext() error {
n, err := c.readNextWrapped()
if err != nil && c.IsConnected() {
if n != 0 || errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
// We disconnect if:
// - We've read a malformed packet: n!=0
// - We receive an error signalling end of data (EOF) or closing of network connection.
// We don't want to disconnect if we've read 0 bytes and get a timeout error (there may be more data in future)
c.cs.OnDisconnect(err)
c.txlock.Lock()
c.tx.WriteSimple(PacketDisconnect) // Try to write disconnect but don't hold your breath. This is probably useless.
c.txlock.Unlock()
} else {
// Not a any of above cases. We stay connected and ignore error, but print it.
println("ignoring error", err.Error())
err = nil
}
}
return err
}
// readNextWrapped is a separate function so mutex locks Rx for minimum amount of time.
func (c *Client) readNextWrapped() (int, error) {
c.rxlock.Lock()
defer c.rxlock.Unlock()
if !c.IsConnected() && c.cs.lastTx.IsZero() {
// Client disconnected and not expecting to receive packets back.
return 0, errDisconnected
}
return c.rx.ReadNextPacket()
}
// StartConnect sends a CONNECT packet over the transport and does not wait for a
// CONNACK response. Client is not guaranteed to be connected after a call to this function.
func (c *Client) StartConnect(rwc io.ReadWriteCloser, vc *VariablesConnect) error {
c.rxlock.Lock()
defer c.rxlock.Unlock()
c.txlock.Lock()
defer c.txlock.Unlock()
c.tx.SetTxTransport(rwc)
c.rx.SetRxTransport(rwc)
if c.cs.IsConnected() {
return errors.New("already connected; disconnect before connecting")
}
return c.tx.WriteConnect(vc)
}
// Connect sends a CONNECT packet over the transport and waits for a
// CONNACK response from the server. The client is connected if the returned error is nil.
func (c *Client) Connect(ctx context.Context, rwc io.ReadWriteCloser, vc *VariablesConnect) error {
err := c.StartConnect(rwc, vc)
if err != nil {
return err
}
backoff := newBackoff()
for !c.IsConnected() && ctx.Err() == nil {
backoff.Miss()
err := c.HandleNext()
if err != nil {
return err
}
}
if c.IsConnected() {
return nil
}
return ctx.Err()
}
// IsConnected returns true if there still has been no disconnect event or an
// unrecoverable error encountered during decoding.
// A Connected client may send and receive MQTT messages.
func (c *Client) IsConnected() bool { return c.cs.IsConnected() }
// Disconnect performs a MQTT disconnect and resets the connection. Future
// calls to Err will return the argument userErr.
func (c *Client) Disconnect(userErr error) error {
if userErr == nil {
panic("nil error argument to Disconnect")
}
c.txlock.Lock()
defer c.txlock.Unlock()
if !c.IsConnected() {
return errDisconnected
}
c.cs.OnDisconnect(userErr)
err := c.tx.WriteSimple(PacketDisconnect)
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
err = nil //if EOF or network closed simply exit.
}
c.rxlock.Lock()
defer c.rxlock.Unlock()
c.rx.rxTrp.Close()
c.tx.txTrp.Close()
return err
}
// StartSubscribe begins subscription to argument topics.
func (c *Client) StartSubscribe(vsub VariablesSubscribe) error {
if err := vsub.Validate(); err != nil {
return err
}
c.txlock.Lock()
defer c.txlock.Unlock()
if !c.IsConnected() {
return errDisconnected
}
if c.AwaitingSuback() {
// TODO(soypat): Allow multiple subscriptions to be queued.
return errors.New("tried to subscribe while still awaiting suback")
}
c.cs.pendingSubs = vsub.Copy()
return c.tx.WriteSubscribe(vsub)
}
// Subscribe writes a SUBSCRIBE packet over the network and waits for the server
// to respond with a SUBACK packet or until the context ends.
func (c *Client) Subscribe(ctx context.Context, vsub VariablesSubscribe) error {
session := c.ConnectedAt()
err := c.StartSubscribe(vsub)
if err != nil {
return err
}
backoff := newBackoff()
for c.cs.PendingSublen() != 0 && ctx.Err() == nil {
if c.ConnectedAt() != session {
// Prevent waiting on subscribes from previous connection or during disconnection.
return errDisconnected
}
backoff.Miss()
c.HandleNext()
}
return ctx.Err()
}
// SubscribedTopics returns list of topics the client successfully subscribed to.
// Returns a copy of a slice so is safe for concurrent use.
func (c *Client) SubscribedTopics() []string {
c.cs.mu.Lock()
defer c.cs.mu.Unlock()
return append([]string{}, c.cs.activeSubs...)
}
// PublishPayload sends a PUBLISH packet over the network on the topic defined by
// varPub.
func (c *Client) PublishPayload(flags PacketFlags, varPub VariablesPublish, payload []byte) error {
if err := varPub.Validate(); err != nil {
return err
}
qos := flags.QoS()
if qos != QoS0 {
return errors.New("only supports QoS0")
}
c.txlock.Lock()
defer c.txlock.Unlock()
if !c.IsConnected() {
return errDisconnected
}
return c.tx.WritePublishPayload(newHeader(PacketPublish, flags, uint32(varPub.Size(qos)+len(payload))), varPub, payload)
}
// Err returns error indicating the cause of client disconnection.
func (c *Client) Err() error {
return c.cs.Err()
}
// StartPing writes a PINGREQ packet over the network without blocking waiting for response.
func (c *Client) StartPing() error {
c.txlock.Lock()
defer c.txlock.Unlock()
if !c.IsConnected() {
return errDisconnected
}
err := c.tx.WriteSimple(PacketPingreq)
if err == nil {
c.cs.PingSent() // Flag the fact that a ping has been sent successfully.
}
return err
}
// Ping writes a ping packet over the network and blocks until it receives the ping
// response back. It uses an exponential backoff algorithm to time checks on the
// status of the ping.
func (c *Client) Ping(ctx context.Context) error {
session := c.ConnectedAt()
err := c.StartPing()
if err != nil {
return err
}
pingTime := c.cs.LastPingTime()
if pingTime.IsZero() {
return nil // Ping completed.
}
backoff := newBackoff()
for pingTime == c.cs.LastPingTime() && ctx.Err() == nil {
if c.ConnectedAt() != session {
// Prevent waiting on subscribes from previous connection or during disconnection.
return errDisconnected
}
backoff.Miss()
c.HandleNext()
}
return ctx.Err()
}
// AwaitingPingresp checks if a ping sent over the wire had no response received back.
func (c *Client) AwaitingPingresp() bool { return c.cs.AwaitingPingresp() }
// ConnectedAt returns the time the client managed to successfully connect. If
// client is disconnected ConnectedAt returns the zero-value for time.Time.
func (c *Client) ConnectedAt() time.Time { return c.cs.ConnectedAt() }
// AwaitingSuback checks if a subscribe request sent over the wire had no suback received back.
// Returns false if client is disconnected.
func (c *Client) AwaitingSuback() bool { return c.cs.AwaitingSuback() }
// LastRx returns the time the last packet was received at.
// If Client is disconnected LastRx returns the zero value of time.Time.
func (c *Client) LastRx() time.Time { return c.cs.LastRx() }
// LastTx returns the time the last successful packet transmission finished at.
// A "successful" transmission does not necessarily mean the packet was received on the other end.
// If Client is disconnected LastTx returns the zero value of time.Time.
func (c *Client) LastTx() time.Time { return c.cs.LastTx() }
func newBackoff() exponentialBackoff {
return exponentialBackoff{
MaxWait: 500 * time.Millisecond,
}
}
// exponentialBackoff implements a [Exponential Backoff]
// delay algorithm to prevent saturation network or processor
// with failing tasks. An exponentialBackoff with a non-zero MaxWait is ready for use.
//
// [Exponential Backoff]: https://en.wikipedia.org/wiki/Exponential_backoff
type exponentialBackoff struct {
// Wait defines the amount of time that Miss will wait on next call.
Wait time.Duration
// Maximum allowable value for Wait.
MaxWait time.Duration
// StartWait is the value that Wait takes after a call to Hit.
StartWait time.Duration
// ExpMinusOne is the shift performed on Wait minus one, so the zero value performs a shift of 1.
ExpMinusOne uint32
}
// Hit sets eb.Wait to the StartWait value.
func (eb *exponentialBackoff) Hit() {
if eb.MaxWait == 0 {
panic("MaxWait cannot be zero")
}
eb.Wait = eb.StartWait
}
// Miss sleeps for eb.Wait and increases eb.Wait exponentially.
func (eb *exponentialBackoff) Miss() {
const k = 1
wait := eb.Wait
maxWait := eb.MaxWait
exp := eb.ExpMinusOne + 1
if maxWait == 0 {
panic("MaxWait cannot be zero")
}
time.Sleep(wait)
wait |= time.Duration(k)
wait <<= exp
if wait > maxWait {
wait = maxWait
}
eb.Wait = wait
}