Skip to content

Commit 0884e48

Browse files
committed
chore: improve cluster pipeline retries
1 parent 7335ef6 commit 0884e48

File tree

1 file changed

+75
-43
lines changed

1 file changed

+75
-43
lines changed

cluster.go

+75-43
Original file line numberDiff line numberDiff line change
@@ -846,8 +846,8 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
846846
c.cmdable = c.Process
847847

848848
c.hooks.setProcess(c.process)
849-
c.hooks.setProcessPipeline(c._processPipeline)
850-
c.hooks.setProcessTxPipeline(c._processTxPipeline)
849+
c.hooks.setProcessPipeline(c.processPipeline)
850+
c.hooks.setProcessTxPipeline(c.processTxPipeline)
851851

852852
return c
853853
}
@@ -1187,7 +1187,7 @@ func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error)
11871187
return c.Pipeline().Pipelined(ctx, fn)
11881188
}
11891189

1190-
func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
1190+
func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
11911191
cmdsMap := newCmdsMap()
11921192

11931193
if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil {
@@ -1210,7 +1210,7 @@ func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) erro
12101210
wg.Add(1)
12111211
go func(node *clusterNode, cmds []Cmder) {
12121212
defer wg.Done()
1213-
c._processPipelineNode(ctx, node, cmds, failedCmds)
1213+
c.processPipelineNode(ctx, node, cmds, failedCmds)
12141214
}(node, cmds)
12151215
}
12161216

@@ -1263,22 +1263,38 @@ func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool
12631263
return true
12641264
}
12651265

1266-
func (c *ClusterClient) _processPipelineNode(
1266+
func (c *ClusterClient) processPipelineNode(
12671267
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
12681268
) {
12691269
_ = node.Client.hooks.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1270-
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1271-
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1272-
return writeCmds(wr, cmds)
1273-
}); err != nil {
1274-
setCmdsErr(cmds, err)
1275-
return err
1276-
}
1270+
cn, err := node.Client.getConn(ctx)
1271+
if err != nil {
1272+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1273+
setCmdsErr(cmds, err)
1274+
return err
1275+
}
12771276

1278-
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
1279-
return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
1280-
})
1281-
})
1277+
err = c.processPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1278+
node.Client.releaseConn(ctx, cn, err)
1279+
return err
1280+
})
1281+
}
1282+
1283+
func (c *ClusterClient) processPipelineNodeConn(
1284+
ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1285+
) error {
1286+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1287+
return writeCmds(wr, cmds)
1288+
}); err != nil {
1289+
if shouldRetry(err, true) {
1290+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1291+
}
1292+
setCmdsErr(cmds, err)
1293+
return err
1294+
}
1295+
1296+
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
1297+
return c.pipelineReadCmds(ctx, node, rd, cmds, failedCmds)
12821298
})
12831299
}
12841300

@@ -1365,7 +1381,7 @@ func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) erro
13651381
return c.TxPipeline().Pipelined(ctx, fn)
13661382
}
13671383

1368-
func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
1384+
func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
13691385
// Trim multi .. exec.
13701386
cmds = cmds[1 : len(cmds)-1]
13711387

@@ -1399,7 +1415,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
13991415
wg.Add(1)
14001416
go func(node *clusterNode, cmds []Cmder) {
14011417
defer wg.Done()
1402-
c._processTxPipelineNode(ctx, node, cmds, failedCmds)
1418+
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
14031419
}(node, cmds)
14041420
}
14051421

@@ -1423,40 +1439,56 @@ func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int
14231439
return cmdsMap
14241440
}
14251441

1426-
func (c *ClusterClient) _processTxPipelineNode(
1442+
func (c *ClusterClient) processTxPipelineNode(
14271443
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
14281444
) {
14291445
cmds = wrapMultiExec(ctx, cmds)
14301446
_ = node.Client.hooks.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
1431-
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
1432-
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1433-
return writeCmds(wr, cmds)
1434-
}); err != nil {
1435-
setCmdsErr(cmds, err)
1436-
return err
1437-
}
1447+
cn, err := node.Client.getConn(ctx)
1448+
if err != nil {
1449+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1450+
setCmdsErr(cmds, err)
1451+
return err
1452+
}
14381453

1439-
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
1440-
statusCmd := cmds[0].(*StatusCmd)
1441-
// Trim multi and exec.
1442-
trimmedCmds := cmds[1 : len(cmds)-1]
1454+
err = c.processTxPipelineNodeConn(ctx, node, cn, cmds, failedCmds)
1455+
node.Client.releaseConn(ctx, cn, err)
1456+
return err
1457+
})
1458+
}
14431459

1444-
if err := c.txPipelineReadQueued(
1445-
ctx, rd, statusCmd, trimmedCmds, failedCmds,
1446-
); err != nil {
1447-
setCmdsErr(cmds, err)
1460+
func (c *ClusterClient) processTxPipelineNodeConn(
1461+
ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1462+
) error {
1463+
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
1464+
return writeCmds(wr, cmds)
1465+
}); err != nil {
1466+
if shouldRetry(err, true) {
1467+
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
1468+
}
1469+
setCmdsErr(cmds, err)
1470+
return err
1471+
}
14481472

1449-
moved, ask, addr := isMovedError(err)
1450-
if moved || ask {
1451-
return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
1452-
}
1473+
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
1474+
statusCmd := cmds[0].(*StatusCmd)
1475+
// Trim multi and exec.
1476+
trimmedCmds := cmds[1 : len(cmds)-1]
14531477

1454-
return err
1455-
}
1478+
if err := c.txPipelineReadQueued(
1479+
ctx, rd, statusCmd, trimmedCmds, failedCmds,
1480+
); err != nil {
1481+
setCmdsErr(cmds, err)
14561482

1457-
return pipelineReadCmds(rd, trimmedCmds)
1458-
})
1459-
})
1483+
moved, ask, addr := isMovedError(err)
1484+
if moved || ask {
1485+
return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
1486+
}
1487+
1488+
return err
1489+
}
1490+
1491+
return pipelineReadCmds(rd, trimmedCmds)
14601492
})
14611493
}
14621494

0 commit comments

Comments
 (0)