Skip to content

Commit 788a897

Browse files
committed
WIP > XPack, Update Core and X-Pack test setup
* Do not delete X-Pack templates * Use context timeouts * Check pending tasks // ---------------------------------------------------------------------------------------------------- // https://github.com/elastic/elasticsearch/blob/master/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java ensureNoInitializingShards(); wipeCluster(); waitForClusterStateUpdatesToFinish(); logIfThereAreRunningTasks(); wipeCluster() { if (hasXPack) { wipeRollupJobs(); waitForPendingRollupTasks(); } wipeSnapshots(); adminClient().performRequest(new Request("DELETE", "*")); if (hasXPack) { Request request = new Request("GET", "_cat/templates"); request.addParameter("h", "name"); if (isXPackTemplate(template)) continue; adminClient().performRequest(new Request("DELETE", "_template/" + template)); } else { adminClient().performRequest(new Request("DELETE", "_template/*")); } wipeClusterSettings(); if (hasXPack) { deleteAllPolicies(); } wipeRollupJobs() { Response response = adminClient().performRequest(new Request("GET", "/_rollup/job/_all")); for (Map<String, Object> jobConfig : jobConfigs) { String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id"); Request request = new Request("POST", "/_rollup/job/" + jobId + "/_stop"); request.addParameter("ignore", "404"); request.addParameter("wait_for_completion", "true"); request.addParameter("timeout", "10s"); } for (Map<String, Object> jobConfig : jobConfigs) { Request request = new Request("DELETE", "/_rollup/job/" + jobId); request.addParameter("ignore", "404"); } waitForPendingRollupTasks() { waitForPendingTasks(adminClient(), taskName -> taskName.startsWith("xpack/rollup/job") == false); ensureNoInitializingShards() { Request request = new Request("GET", "/_cluster/health"); request.addParameter("wait_for_no_initializing_shards", "true"); request.addParameter("timeout", "70s"); request.addParameter("level", "shards"); adminClient().performRequest(request); waitForClusterStateUpdatesToFinish() { assertBusy(() -> { Response response = adminClient().performRequest(new Request("GET", "/_cluster/pending_tasks")); List<?> tasks = (List<?>) entityAsMap(response).get("tasks"); if (false == tasks.isEmpty()) { fail(message.toString()); }, 30, TimeUnit.SECONDS); // curl -s -k -X POST 'https://elastic:elastic@localhost:9200/_all/_ilm/remove' deleteAllPolicies() { Response response = adminClient().performRequest(new Request("GET", "/_ilm/policy")); for (String policyName : policies.keySet()) { adminClient().performRequest(new Request("DELETE", "/_ilm/policy/" + policyName)); } // elastic/elasticsearch#31642 // // > At the end of every ESRestTestCase we clean the cluster which includes // > deleting all of the templates. If xpack is installed it'll automatically // > recreate a few templates every time they are removed. Which is slow. // isXPackTemplate(String name) { if (name.startsWith(".monitoring-")) { return true; } if (name.startsWith(".watch") || name.startsWith(".triggered_watches")) { return true; } if (name.startsWith(".ml-")) { return true; } switch (name) { case ".triggered_watches": case ".watches": case "logstash-index-template": case "security_audit_log": return true; default: return false; } // ---------------------------------------------------------------------------------------------------- // https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/src/test/java/org/elasticsearch/xpack/test/rest/XPackRestIT.java setupForTests() { waitForTemplates(); waitForWatcher(); enableMonitoring(); cleanup() disableMonitoring(); clearMlState(); if (isWaitForPendingTasks()) { // This waits for pending tasks to complete, so must go last (otherwise // it could be waiting for pending tasks while monitoring is still running). ESRestTestCase.waitForPendingTasks(adminClient(), task -> { // Don't check rollup jobs because we clear them in the superclass. return task.contains(RollupJob.NAME); }); waitForTemplates() { for (String template : templates) { awaitCallApi("indices.exists_template", singletonMap("name", template), emptyList(), response -> true, () -> "Exception when waiting for [" + template + "] template to be created"); } waitForWatcher() { if (isWatcherTest()) { // ensure watcher is started, so that a test can stop watcher and everything still works fine enableMonitoring() { if (isMonitoringTest()) { // Enable monitoring and waits for monitoring documents to be collected and indexed clearMlState() { if (isMachineLearningTest()) { new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata(); } isMonitoringTest() { String testName = getTestName(); return testName != null && (testName.contains("=monitoring/") || testName.contains("=monitoring\\")); } isWatcherTest() { String testName = getTestName(); return testName != null && (testName.contains("=watcher/") || testName.contains("=watcher\\")); } isMachineLearningTest() { String testName = getTestName(); return testName != null && (testName.contains("=ml/") || testName.contains("=ml\\")); } // ---------------------------------------------------------------------------------------------------- // https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/integration/MlRestTestStateCleaner.java class MlRestTestStateCleaner { ... } clearMlMetadata() { deleteAllDatafeeds(); deleteAllJobs(); deleteAllDataFrameAnalytics(); // indices will be deleted by the ESRestTestCase class deleteAllDatafeeds() { Request datafeedsRequest = new Request("GET", "/_ml/datafeeds"); datafeedsRequest.addParameter("filter_path", "datafeeds"); datafeeds = (List<Map<String, Object>>) XContentMapValues.extractValue("datafeeds", ESRestTestCase.entityAsMap(datafeedsResponse)); try { adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop")); } catch (Exception e1) { logger.warn("failed to stop all datafeeds. Forcing stop", e1); try { adminClient.performRequest(new Request("POST", "/_ml/datafeeds/_all/_stop?force=true")); } catch (Exception e2) { logger.warn("Force-closing all data feeds failed", e2); } throw new RuntimeException( "Had to resort to force-stopping datafeeds, something went wrong?", e1); } for (Map<String, Object> datafeed : datafeeds) { String datafeedId = (String) datafeed.get("datafeed_id"); adminClient.performRequest(new Request("DELETE", "/_ml/datafeeds/" + datafeedId)); } deleteAllJobs() { Request jobsRequest = new Request("GET", "/_ml/anomaly_detectors"); jobsRequest.addParameter("filter_path", "jobs"); jobConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("jobs", ESRestTestCase.entityAsMap(response)); if (jobConfigs == null) { return; } adminClient.performRequest(new Request("POST", "/_ml/anomaly_detectors/_all/_close")); for (Map<String, Object> jobConfig : jobConfigs) { String jobId = (String) jobConfig.get("job_id"); adminClient.performRequest(new Request("DELETE", "/_ml/anomaly_detectors/" + jobId)); } deleteAllDataFrameAnalytics() { Request analyticsRequest = new Request("GET", "/_ml/data_frame/analytics?size=10000"); analyticsRequest.addParameter("filter_path", "data_frame_analytics"); analytics = (List<Map<String, Object>>) XContentMapValues.extractValue("data_frame_analytics", ESRestTestCase.entityAsMap(analyticsResponse)); if (analytics == null) { return; } for (Map<String, Object> config : analytics) { String id = (String) config.get("id"); adminClient.performRequest(new Request("DELETE", "/_ml/data_frame/analytics/" + id)); }
1 parent 8f98624 commit 788a897

File tree

1 file changed

+113
-10
lines changed

1 file changed

+113
-10
lines changed

internal/cmd/generate/commands/gentests/generator.go

Lines changed: 113 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ func (g *Generator) genFileHeader() {
193193
import (
194194
encjson "encoding/json"
195195
encyaml "gopkg.in/yaml.v2"
196+
"context"
196197
"crypto/tls"
197198
"testing"
198199
"time"
@@ -289,14 +290,37 @@ func (g *Generator) genCommonSetup() {
289290
commonSetup := func() {
290291
var res *esapi.Response
291292
293+
{
294+
res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForNoInitializingShards(true))
295+
if res != nil && res.Body != nil {
296+
defer res.Body.Close()
297+
}
298+
}
299+
292300
{
293301
res, _ = es.Indices.Delete([]string{"_all"})
294302
if res != nil && res.Body != nil { defer res.Body.Close() }
295303
}
296304
297305
{
298-
res, _ = es.Indices.DeleteTemplate("*")
299-
if res != nil && res.Body != nil { defer res.Body.Close() }
306+
var r map[string]interface{}
307+
res, _ = es.Indices.GetTemplate()
308+
if res != nil && res.Body != nil {
309+
defer res.Body.Close()
310+
json.NewDecoder(res.Body).Decode(&r)
311+
for templateName, _ := range r {
312+
if strings.HasPrefix(templateName, ".") {
313+
continue
314+
}
315+
if templateName == "security_audit_log" {
316+
continue
317+
}
318+
if templateName == "logstash-index-template" {
319+
continue
320+
}
321+
es.Indices.DeleteTemplate(templateName)
322+
}
323+
}
300324
}
301325
302326
{
@@ -327,6 +351,13 @@ func (g *Generator) genCommonSetup() {
327351
}
328352
}
329353
}
354+
355+
{
356+
res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForStatus("yellow"))
357+
if res != nil && res.Body != nil {
358+
defer res.Body.Close()
359+
}
360+
}
330361
}
331362
332363
`)
@@ -338,6 +369,21 @@ func (g *Generator) genXPackSetup() {
338369
xpackSetup := func() {
339370
var res *esapi.Response
340371
372+
{
373+
var r map[string]interface{}
374+
res, _ = es.Indices.GetTemplate()
375+
if res != nil && res.Body != nil {
376+
defer res.Body.Close()
377+
json.NewDecoder(res.Body).Decode(&r)
378+
for templateName, _ := range r {
379+
if strings.HasPrefix(templateName, ".") {
380+
continue
381+
}
382+
es.Indices.DeleteTemplate(templateName)
383+
}
384+
}
385+
}
386+
341387
{
342388
res, _ = es.Watcher.DeleteWatch("my_watch")
343389
if res != nil && res.Body != nil {
@@ -395,8 +441,10 @@ func (g *Generator) genXPackSetup() {
395441
396442
{
397443
var r map[string]interface{}
398-
es.ML.StopDatafeed("_all")
399-
res, _ = es.ML.GetDatafeeds(es.ML.GetDatafeeds.WithAllowNoDatafeeds(true))
444+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
445+
defer cancel()
446+
es.ML.StopDatafeed("_all", es.ML.StopDatafeed.WithContext(ctx))
447+
res, _ = es.ML.GetDatafeeds()
400448
if res != nil && res.Body != nil {
401449
defer res.Body.Close()
402450
json.NewDecoder(res.Body).Decode(&r)
@@ -412,13 +460,15 @@ func (g *Generator) genXPackSetup() {
412460
413461
{
414462
var r map[string]interface{}
415-
es.ML.CloseJob("_all", es.ML.CloseJob.WithForce(true))
416-
res, _ = es.ML.GetJobs(es.ML.GetJobs.WithAllowNoJobs(true))
463+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
464+
defer cancel()
465+
es.ML.CloseJob("_all", es.ML.CloseJob.WithContext(ctx))
466+
res, _ = es.ML.GetJobs()
417467
if res != nil && res.Body != nil {
418468
defer res.Body.Close()
419469
json.NewDecoder(res.Body).Decode(&r)
420470
for _, v := range r["jobs"].([]interface{}) {
421-
jobID, ok := v.(map[string]interface{})["datafeed_id"]
471+
jobID, ok := v.(map[string]interface{})["job_id"]
422472
if !ok {
423473
continue
424474
}
@@ -438,7 +488,7 @@ func (g *Generator) genXPackSetup() {
438488
if !ok {
439489
continue
440490
}
441-
es.Rollup.StopJob(jobID.(string))
491+
es.Rollup.StopJob(jobID.(string), es.Rollup.StopJob.WithWaitForCompletion(true))
442492
es.Rollup.DeleteJob(jobID.(string))
443493
}
444494
}
@@ -457,12 +507,45 @@ func (g *Generator) genXPackSetup() {
457507
continue
458508
}
459509
taskID := fmt.Sprintf("%v:%v", v.(map[string]interface{})["node"], v.(map[string]interface{})["id"])
460-
es.Tasks.Cancel(es.Tasks.Cancel.WithTaskID(taskID))
510+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
511+
defer cancel()
512+
es.Tasks.Cancel(es.Tasks.Cancel.WithTaskID(taskID), es.Tasks.Cancel.WithContext(ctx))
513+
}
514+
}
515+
}
516+
}
517+
518+
{
519+
var r map[string]interface{}
520+
res, _ = es.Snapshot.GetRepository()
521+
if res != nil && res.Body != nil {
522+
defer res.Body.Close()
523+
json.NewDecoder(res.Body).Decode(&r)
524+
for repositoryID, _ := range r {
525+
var r map[string]interface{}
526+
res, _ = es.Snapshot.Get(repositoryID, []string{"_all"})
527+
json.NewDecoder(res.Body).Decode(&r)
528+
for _, vv := range r["responses"].([]interface{}) {
529+
for _, v := range vv.(map[string]interface{})["snapshots"].([]interface{}) {
530+
snapshotID, ok := v.(map[string]interface{})["snapshot"]
531+
if !ok {
532+
continue
533+
}
534+
es.Snapshot.Delete(repositoryID, fmt.Sprintf("%s", snapshotID))
535+
}
461536
}
537+
es.Snapshot.DeleteRepository([]string{fmt.Sprintf("%s", repositoryID)})
462538
}
463539
}
464540
}
465541
542+
{
543+
res, _ = es.ILM.RemovePolicy(es.ILM.RemovePolicy.WithIndex("_all"))
544+
if res != nil && res.Body != nil {
545+
defer res.Body.Close()
546+
}
547+
}
548+
466549
{
467550
res, _ = es.Cluster.Health(es.Cluster.Health.WithWaitForStatus("yellow"))
468551
if res != nil && res.Body != nil {
@@ -478,7 +561,7 @@ func (g *Generator) genXPackSetup() {
478561
}
479562
480563
{
481-
res, _ = es.Indices.Refresh(es.Indices.Refresh.WithIndex(".security*"))
564+
res, _ = es.Indices.Refresh(es.Indices.Refresh.WithIndex("_all"))
482565
if res != nil && res.Body != nil {
483566
defer res.Body.Close()
484567
}
@@ -490,6 +573,26 @@ func (g *Generator) genXPackSetup() {
490573
defer res.Body.Close()
491574
}
492575
}
576+
577+
{
578+
var i int
579+
for {
580+
i++
581+
var r map[string]interface{}
582+
res, _ = es.Cluster.PendingTasks()
583+
if res != nil && res.Body != nil {
584+
defer res.Body.Close()
585+
json.NewDecoder(res.Body).Decode(&r)
586+
if len(r["tasks"].([]interface{})) < 1 {
587+
break
588+
}
589+
}
590+
if i > 30 {
591+
break
592+
}
593+
time.Sleep(time.Second)
594+
}
595+
}
493596
}
494597
495598
`)

0 commit comments

Comments
 (0)