13
13
import com .intellij .openapi .diagnostic .Logger ;
14
14
import io .fabric8 .kubernetes .api .model .Config ;
15
15
import io .fabric8 .kubernetes .client .internal .KubeConfigUtils ;
16
- import org .jetbrains .annotations .NotNull ;
17
16
18
17
import java .io .IOException ;
19
18
import java .nio .file .FileSystems ;
24
23
import java .nio .file .WatchEvent ;
25
24
import java .nio .file .WatchKey ;
26
25
import java .nio .file .WatchService ;
26
+ import java .util .Collection ;
27
+ import java .util .List ;
27
28
import java .util .function .Consumer ;
29
+ import java .util .stream .Collectors ;
28
30
29
31
public class ConfigWatcher implements Runnable {
30
32
31
33
private static final Logger LOG = Logger .getInstance (ConfigWatcher .class );
32
34
33
- private final Path config ;
34
- protected Listener listener ;
35
+ private final List <Path > configs ;
36
+ protected final Listener listener ;
37
+ private final HighSensitivityRegistrar registrar ;
38
+ private WatchService service ;
35
39
36
40
public interface Listener {
37
41
void onUpdate (ConfigWatcher source , Config config );
@@ -42,94 +46,146 @@ public ConfigWatcher(String config, Listener listener) {
42
46
}
43
47
44
48
public ConfigWatcher (Path config , Listener listener ) {
45
- this .config = config ;
49
+ this (List .of (config ), listener );
50
+ }
51
+
52
+ public ConfigWatcher (List <Path > configs , Listener listener ) {
53
+ this (configs , listener , new HighSensitivityRegistrar ());
54
+ }
55
+
56
+ public ConfigWatcher (List <Path > configs , Listener listener , HighSensitivityRegistrar registrar ) {
57
+ this .configs = configs ;
46
58
this .listener = listener ;
59
+ this .registrar = registrar ;
47
60
}
48
61
49
62
@ Override
50
63
public void run () {
51
- runOnConfigChange ((Config config ) -> {
52
- listener .onUpdate (this , config );
53
- });
64
+ watch ((Config config ) -> listener .onUpdate (this , config ));
54
65
}
55
66
56
- protected Config loadConfig () {
57
- try {
58
- return ConfigHelper .loadKubeConfig (config .toAbsolutePath ().toString ());
59
- } catch (IOException e ) {
60
- return null ;
67
+ public void close () throws IOException {
68
+ if (service != null ) {
69
+ service .close ();
61
70
}
62
71
}
63
72
64
- private void runOnConfigChange (Consumer <Config > consumer ) {
65
- try (WatchService service = newWatchService ()) {
66
- registerWatchService ( service );
67
- WatchKey key ;
68
- while (( key = service . take ()) != null ) {
69
- key . pollEvents (). stream ()
70
- . forEach (( event ) -> consumer . accept ( loadConfig ( getPath ( event ))));
71
- key . reset ();
72
- }
73
- } catch ( IOException | InterruptedException e ) {
73
+ private void watch (Consumer <Config > consumer ) {
74
+ try (WatchService service = createWatchService ()) {
75
+ Collection < Path > watchedDirectories = getWatchedDirectories ( );
76
+ watchedDirectories . forEach ( directory ->
77
+ new ConfigDirectoryWatch ( directory , consumer , service , registrar ). start ()
78
+ );
79
+ } catch ( IOException e ) {
80
+ String configPaths = configs . stream ()
81
+ . map ( path -> path . toAbsolutePath (). toString ())
82
+ . collect ( Collectors . joining ());
74
83
Logger .getInstance (ConfigWatcher .class ).warn (
75
- "Could not watch kubernetes config file at " + config . toAbsolutePath () , e );
84
+ "Could not watch kubernetes config file at " + configPaths , e );
76
85
}
77
86
}
78
87
79
- protected WatchService newWatchService () throws IOException {
80
- return FileSystems .getDefault ().newWatchService ();
88
+ protected WatchService createWatchService () throws IOException {
89
+ return this . service = FileSystems .getDefault ().newWatchService ();
81
90
}
82
91
83
- @ NotNull
84
- private void registerWatchService (WatchService service ) throws IOException {
85
- HighSensitivityRegistrar modifier = new HighSensitivityRegistrar ();
86
- modifier .registerService (getWatchedPath (),
87
- new WatchEvent .Kind []{
88
- StandardWatchEventKinds .ENTRY_CREATE ,
89
- StandardWatchEventKinds .ENTRY_MODIFY ,
90
- StandardWatchEventKinds .ENTRY_DELETE },
91
- service );
92
+ private Collection <Path > getWatchedDirectories () {
93
+ return configs .stream ()
94
+ .filter (this ::isFileInDirectory )
95
+ .map (Path ::getParent )
96
+ .collect (Collectors .toSet ());
92
97
}
93
98
94
- protected boolean isConfigPath (Path path ) {
95
- return path .equals (config );
99
+ protected boolean isFileInDirectory (Path path ) {
100
+ return path != null
101
+ && Files .isRegularFile (path )
102
+ && Files .isDirectory (path .getParent ());
96
103
}
97
104
98
- /**
99
- * Returns {@link Config} for the given path if the kube config file
100
- * <ul>
101
- * <li>exists and</li>
102
- * <li>is not empty and</li>
103
- * <li>is valid yaml</li>
104
- * </ul>
105
- * Returns {@code null} otherwise.
106
- *
107
- * @param path the path to the kube config
108
- * @return returns true if the kube config that the event points to exists, is not empty and is valid yaml
109
- */
110
- private Config loadConfig (Path path ) {
111
- if (path == null ) {
112
- return null ;
105
+ private class ConfigDirectoryWatch {
106
+ private final Path directory ;
107
+ private final WatchService service ;
108
+ private final HighSensitivityRegistrar registrar ;
109
+ private final Consumer <Config > consumer ;
110
+
111
+ private ConfigDirectoryWatch (Path directory , Consumer <Config > consumer , WatchService service , HighSensitivityRegistrar registrar ) {
112
+ this .directory = directory ;
113
+ this .consumer = consumer ;
114
+ this .service = service ;
115
+ this .registrar = registrar ;
113
116
}
114
- try {
115
- if (Files .exists (path )
116
- && isConfigPath (path )
117
- && Files .size (path ) > 0 ) {
118
- return KubeConfigUtils .parseConfig (path .toFile ());
117
+
118
+ private void start () {
119
+ try {
120
+ register (directory , service , registrar );
121
+ watch (consumer , service );
122
+ } catch (InterruptedException e ) {
123
+ LOG .warn ("Watching " + directory + " was interrupted" , e );
124
+ } catch (IOException e ) {
125
+ LOG .warn ("Could not watch " + directory , e );
119
126
}
120
- } catch (Exception e ) {
121
- // only catch
122
- LOG .warn ("Could not load kube config at " + path .toAbsolutePath (), e );
123
127
}
124
- return null ;
125
- }
126
128
127
- private Path getPath (WatchEvent <?> event ) {
128
- return getWatchedPath ().resolve ((Path ) event .context ());
129
- }
129
+ private void register (Path path , WatchService service , HighSensitivityRegistrar registrar ) throws IOException {
130
+ registrar .registerService (path ,
131
+ new WatchEvent .Kind []{
132
+ StandardWatchEventKinds .ENTRY_CREATE ,
133
+ StandardWatchEventKinds .ENTRY_MODIFY ,
134
+ StandardWatchEventKinds .ENTRY_DELETE
135
+ },
136
+ service );
137
+ }
138
+
139
+ private void watch (Consumer <Config > consumer , WatchService service ) throws InterruptedException {
140
+ for (WatchKey key = service .take (); key != null ; key = service .take ()) {
141
+ key .pollEvents ().forEach ((event ) -> {
142
+ Path changed = getAbsolutePath (directory , (Path ) event .context ());
143
+ if (isConfigPath (changed )) {
144
+ consumer .accept (loadConfig (changed ));
145
+ }
146
+ });
147
+ key .reset ();
148
+ }
149
+ }
150
+
151
+ protected boolean isConfigPath (Path path ) {
152
+ return configs != null
153
+ && configs .contains (path );
154
+ }
155
+
156
+ /**
157
+ * Returns {@link Config} for the given path if the kube config file
158
+ * <ul>
159
+ * <li>exists and</li>
160
+ * <li>is not empty and</li>
161
+ * <li>is valid yaml</li>
162
+ * </ul>
163
+ * Returns {@code null} otherwise.
164
+ *
165
+ * @param path the path to the kube config
166
+ * @return returns true if the kube config that the event points to exists, is not empty and is valid yaml
167
+ */
168
+ private Config loadConfig (Path path ) {
169
+ // TODO: replace by Config#getKubeConfigFiles once kubernetes-client 7.0 is available
170
+ if (path == null ) {
171
+ return null ;
172
+ }
173
+ try {
174
+ if (Files .exists (path )
175
+ && Files .size (path ) > 0 ) {
176
+ return KubeConfigUtils .parseConfig (path .toFile ());
177
+ }
178
+ } catch (Exception e ) {
179
+ // only catch
180
+ LOG .warn ("Could not load kube config at " + path .toAbsolutePath (), e );
181
+ }
182
+ return null ;
183
+ }
184
+
185
+ private Path getAbsolutePath (Path directory , Path relativePath ) {
186
+ return directory .resolve (relativePath );
187
+ }
130
188
131
- private Path getWatchedPath () {
132
- return config .getParent ();
133
189
}
134
190
135
191
}
0 commit comments