|
7 | 7 | "net"
|
8 | 8 | "strconv"
|
9 | 9 | "sync"
|
| 10 | + "testing" |
10 | 11 | "time"
|
11 | 12 |
|
12 | 13 | . "github.com/onsi/ginkgo"
|
@@ -739,3 +740,88 @@ var _ = Describe("Ring Tx timeout", func() {
|
739 | 740 | testTimeout()
|
740 | 741 | })
|
741 | 742 | })
|
| 743 | + |
| 744 | +func TestRingSetAddrsContention(t *testing.T) { |
| 745 | + const ( |
| 746 | + ringShard1Name = "ringShardOne" |
| 747 | + ringShard2Name = "ringShardTwo" |
| 748 | + ) |
| 749 | + |
| 750 | + for _, port := range []string{ringShard1Port, ringShard2Port} { |
| 751 | + if _, err := startRedis(port); err != nil { |
| 752 | + t.Fatal(err) |
| 753 | + } |
| 754 | + } |
| 755 | + |
| 756 | + t.Cleanup(func() { |
| 757 | + for _, p := range processes { |
| 758 | + if err := p.Close(); err != nil { |
| 759 | + t.Errorf("Failed to stop redis process: %v", err) |
| 760 | + } |
| 761 | + } |
| 762 | + processes = nil |
| 763 | + }) |
| 764 | + |
| 765 | + ring := redis.NewRing(&redis.RingOptions{ |
| 766 | + Addrs: map[string]string{ |
| 767 | + "ringShardOne": ":" + ringShard1Port, |
| 768 | + }, |
| 769 | + NewClient: func(opt *redis.Options) *redis.Client { |
| 770 | + // Simulate slow shard creation |
| 771 | + time.Sleep(100 * time.Millisecond) |
| 772 | + return redis.NewClient(opt) |
| 773 | + }, |
| 774 | + }) |
| 775 | + |
| 776 | + if _, err := ring.Ping(context.Background()).Result(); err != nil { |
| 777 | + t.Fatal(err) |
| 778 | + } |
| 779 | + |
| 780 | + // Continuously update addresses by adding and removing one address |
| 781 | + updatesDone := make(chan struct{}) |
| 782 | + defer func() { close(updatesDone) }() |
| 783 | + go func() { |
| 784 | + ticker := time.NewTicker(10 * time.Millisecond) |
| 785 | + defer ticker.Stop() |
| 786 | + for i := 0; ; i++ { |
| 787 | + select { |
| 788 | + case <-ticker.C: |
| 789 | + if i%2 == 0 { |
| 790 | + ring.SetAddrs(map[string]string{ |
| 791 | + ringShard1Name: ":" + ringShard1Port, |
| 792 | + }) |
| 793 | + } else { |
| 794 | + ring.SetAddrs(map[string]string{ |
| 795 | + ringShard1Name: ":" + ringShard1Port, |
| 796 | + ringShard2Name: ":" + ringShard2Port, |
| 797 | + }) |
| 798 | + } |
| 799 | + case <-updatesDone: |
| 800 | + return |
| 801 | + } |
| 802 | + } |
| 803 | + }() |
| 804 | + |
| 805 | + var pings, errClosed int |
| 806 | + timer := time.NewTimer(1 * time.Second) |
| 807 | + for running := true; running; pings++ { |
| 808 | + select { |
| 809 | + case <-timer.C: |
| 810 | + running = false |
| 811 | + default: |
| 812 | + if _, err := ring.Ping(context.Background()).Result(); err != nil { |
| 813 | + if err == redis.ErrClosed { |
| 814 | + // The shard client could be closed while ping command is in progress |
| 815 | + errClosed++ |
| 816 | + } else { |
| 817 | + t.Fatal(err) |
| 818 | + } |
| 819 | + } |
| 820 | + } |
| 821 | + } |
| 822 | + |
| 823 | + t.Logf("Number of pings: %d, errClosed: %d", pings, errClosed) |
| 824 | + if pings < 10_000 { |
| 825 | + t.Errorf("Expected at least 10k pings, got: %d", pings) |
| 826 | + } |
| 827 | +} |
0 commit comments