@@ -10,7 +10,6 @@ import (
10
10
"text/tabwriter"
11
11
"time"
12
12
13
- etcdclient "github.com/coreos/etcd/client"
14
13
"github.com/coreos/etcd/clientv3"
15
14
16
15
"bytes"
@@ -24,7 +23,6 @@ import (
24
23
// and organize them by volume.
25
24
type EtcdWriteVolume struct {
26
25
MasterConfigLocation string
27
- V2Client etcdclient.Client
28
26
V3Client * clientv3.Client
29
27
durationSpec string
30
28
duration time.Duration
@@ -55,11 +53,11 @@ func (d *EtcdWriteVolume) AvailableParameters() []types.Parameter {
55
53
}
56
54
57
55
func (d * EtcdWriteVolume ) Complete (logger * log.Logger ) error {
58
- v2Client , v3Client , found , err := findEtcdClients (d .MasterConfigLocation , logger )
56
+ v3Client , found , err := findEtcdClients (d .MasterConfigLocation , logger )
59
57
if err != nil || ! found {
60
58
return err
61
59
}
62
- d .V2Client , d . V3Client = v2Client , v3Client
60
+ d .V3Client = v3Client
63
61
64
62
// determine the duration to run the check from either the deprecated env var, the flag, or the default
65
63
s := os .Getenv ("ETCD_WRITE_VOLUME_DURATION" ) // deprecated way
@@ -78,9 +76,6 @@ func (d *EtcdWriteVolume) Complete(logger *log.Logger) error {
78
76
}
79
77
80
78
func (d * EtcdWriteVolume ) CanRun () (bool , error ) {
81
- if d .V2Client == nil {
82
- return false , fmt .Errorf ("must have a V2 etcd client" )
83
- }
84
79
if d .V3Client == nil {
85
80
return false , fmt .Errorf ("must have a V3 etcd client" )
86
81
}
@@ -90,53 +85,23 @@ func (d *EtcdWriteVolume) CanRun() (bool, error) {
90
85
func (d * EtcdWriteVolume ) Check () types.DiagnosticResult {
91
86
r := types .NewDiagnosticResult (EtcdWriteName )
92
87
93
- var wg sync.WaitGroup
94
-
95
88
ctx := context .Background ()
96
89
ctx , cancel := context .WithDeadline (ctx , time .Now ().Add (d .duration ))
97
90
defer cancel ()
98
91
99
92
keyStats := & keyCounter {}
100
93
stats := & lockedKeyCounter {KeyCounter : keyStats }
101
94
102
- wg .Add (2 )
103
- go func () {
104
- defer wg .Done ()
105
- keys := etcdclient .NewKeysAPI (d .V2Client )
106
- w := keys .Watcher ("/" , & etcdclient.WatcherOptions {Recursive : true })
107
- for {
108
- evt , err := w .Next (ctx )
109
- if err != nil {
110
- if err != context .DeadlineExceeded {
111
- r .Error ("DEw2001" , err , fmt .Sprintf ("Unable to get a v2 watch event, stopping early: %v" , err ))
112
- }
113
- return
114
- }
115
- node := evt .Node
116
- if node == nil {
117
- node = evt .PrevNode
118
- }
119
- if node == nil {
95
+ ch := d .V3Client .Watch (ctx , "/" , clientv3 .WithKeysOnly (), clientv3 .WithPrefix ())
96
+ for resource := range ch {
97
+ for _ , evt := range resource .Events {
98
+ if evt .Kv == nil {
120
99
continue
121
100
}
122
- action := fmt .Sprintf ("v2 :%s" , evt .Action )
123
- stats .Inc (strings .Split (action + "/" + strings .TrimPrefix (evt .Node .Key , "/" ), "/" ))
101
+ action := fmt .Sprintf ("v3 :%s" , evt .Type )
102
+ stats .Inc (strings .Split (action + "/" + strings .TrimPrefix (string ( evt .Kv .Key ) , "/" ), "/" ))
124
103
}
125
- }()
126
- go func () {
127
- defer wg .Done ()
128
- ch := d .V3Client .Watch (ctx , "/" , clientv3 .WithKeysOnly (), clientv3 .WithPrefix ())
129
- for resource := range ch {
130
- for _ , evt := range resource .Events {
131
- if evt .Kv == nil {
132
- continue
133
- }
134
- action := fmt .Sprintf ("v3:%s" , evt .Type )
135
- stats .Inc (strings .Split (action + "/" + strings .TrimPrefix (string (evt .Kv .Key ), "/" ), "/" ))
136
- }
137
- }
138
- }()
139
- wg .Wait ()
104
+ }
140
105
141
106
bins := keyStats .Bins ("" , "/" )
142
107
sort .Sort (DescendingBins (bins ))
@@ -154,38 +119,32 @@ func (d *EtcdWriteVolume) Check() types.DiagnosticResult {
154
119
}
155
120
156
121
// findEtcdClients finds and loads etcd clients
157
- func findEtcdClients (configFile string , logger * log.Logger ) (etcdclient. Client , * clientv3.Client , bool , error ) {
122
+ func findEtcdClients (configFile string , logger * log.Logger ) (* clientv3.Client , bool , error ) {
158
123
masterConfig , err := GetMasterConfig (configFile , logger )
159
124
if err != nil {
160
125
configErr := fmt .Errorf ("Unreadable master config; skipping this diagnostic." )
161
126
logger .Error ("DE2001" , configErr .Error ())
162
- return nil , nil , false , configErr
127
+ return nil , false , configErr
163
128
}
164
129
if len (masterConfig .EtcdClientInfo .URLs ) == 0 {
165
130
configErr := fmt .Errorf ("No etcdClientInfo.urls defined; can't contact etcd" )
166
131
logger .Error ("DE2002" , configErr .Error ())
167
- return nil , nil , false , configErr
168
- }
169
- v2Client , err := etcd .MakeEtcdClient (masterConfig .EtcdClientInfo )
170
- if err != nil {
171
- configErr := fmt .Errorf ("Unable to create an etcd v2 client: %v" , err )
172
- logger .Error ("DE2003" , configErr .Error ())
173
- return nil , nil , false , configErr
132
+ return nil , false , configErr
174
133
}
175
134
config , err := etcd .MakeEtcdClientV3Config (masterConfig .EtcdClientInfo )
176
135
if err != nil {
177
136
configErr := fmt .Errorf ("Unable to create an etcd v3 client config: %v" , err )
178
137
logger .Error ("DE2004" , configErr .Error ())
179
- return nil , nil , false , configErr
138
+ return nil , false , configErr
180
139
}
181
140
config .DialTimeout = 5 * time .Second
182
141
v3Client , err := clientv3 .New (* config )
183
142
if err != nil {
184
143
configErr := fmt .Errorf ("Unable to create an etcd v3 client: %v" , err )
185
144
logger .Error ("DE2005" , configErr .Error ())
186
- return nil , nil , false , configErr
145
+ return nil , false , configErr
187
146
}
188
- return v2Client , v3Client , true , nil
147
+ return v3Client , true , nil
189
148
}
190
149
191
150
type KeyCounter interface {
0 commit comments