Skip to content

Commit 0621544

Browse files
authored
Improve warning for insufficient file resources for PQ max_bytes (#16656)
This commit refactors the `PersistedQueueConfigValidator` class to provide a more detailed, accurate and actionable warning when pipeline's PQ configs are at risk of running out of disk space. See #14839 for design considerations. The highlights of the changes include accurately determining the free resources on a filesystem disk and then providing a breakdown of the usage for each of the paths configured for a queue.
1 parent 8c96913 commit 0621544

File tree

2 files changed

+222
-22
lines changed

2 files changed

+222
-22
lines changed

logstash-core/lib/logstash/persisted_queue_config_validator.rb

+63-12
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ def check(running_pipelines, pipeline_configs)
4242
warn_msg = []
4343
err_msg = []
4444
queue_path_file_system = Hash.new # (String: queue path, String: file system)
45-
required_free_bytes = Hash.new # (String: file system, Integer: size)
45+
required_free_bytes = Hash.new # (String: file system, Integer: size)
46+
current_usage_bytes = Hash.new # (String: file system, Integer: size)
4647

4748
pipeline_configs.select { |config| config.settings.get('queue.type') == 'persisted'}
4849
.select { |config| config.settings.get('queue.max_bytes').to_i != 0 }
@@ -60,12 +61,13 @@ def check(running_pipelines, pipeline_configs)
6061
check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)
6162

6263
queue_path_file_system[queue_path] = file_system
63-
if used_bytes < max_bytes
64-
required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes - used_bytes
65-
end
64+
# Add max_bytes to required total for this filesystem
65+
required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes
66+
# Track current usage separately
67+
current_usage_bytes[file_system] = current_usage_bytes.fetch(file_system, 0) + used_bytes
6668
end
6769

68-
check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
70+
check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes)
6971

7072
@last_check_pass = err_msg.empty? && warn_msg.empty?
7173

@@ -85,15 +87,64 @@ def check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)
8587
end
8688
end
8789

88-
# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/ devices.
90+
# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/devices.
8991
# It uses the filesystem of the path and count the required bytes by filesystem
90-
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
91-
disk_warn_msg =
92-
queue_path_file_system
93-
.select { |queue_path, file_system| !FsUtil.hasFreeSpace(Paths.get(queue_path), required_free_bytes.fetch(file_system, 0)) }
94-
.map { |queue_path, file_system| "The persistent queue on path \"#{queue_path}\" won't fit in file system \"#{file_system}\" when full. Please free or allocate #{required_free_bytes.fetch(file_system, 0)} more bytes." }
92+
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes)
93+
# Group paths by filesystem
94+
paths_by_filesystem = queue_path_file_system.group_by { |_, fs| fs }
95+
96+
# Only process filesystems that need more space
97+
filesystems_needing_space = paths_by_filesystem.select do |file_system, paths|
98+
additional_needed = required_free_bytes.fetch(file_system, 0) - current_usage_bytes.fetch(file_system, 0)
99+
!FsUtil.hasFreeSpace(Paths.get(paths.first.first), additional_needed)
100+
end
101+
102+
return if filesystems_needing_space.empty?
103+
104+
message_parts = [
105+
"Persistent queues require more disk space than is available on #{filesystems_needing_space.size > 1 ? 'multiple filesystems' : 'a filesystem'}:",
106+
""
107+
]
108+
109+
# Add filesystem-specific information
110+
filesystems_needing_space.each do |file_system, paths|
111+
total_required = required_free_bytes.fetch(file_system, 0)
112+
current_usage = current_usage_bytes.fetch(file_system, 0)
113+
additional_needed = total_required - current_usage
114+
fs_path = Paths.get(paths.first.first)
115+
free_space = Files.getFileStore(fs_path).getUsableSpace
116+
117+
message_parts.concat([
118+
"Filesystem '#{file_system}':",
119+
"- Total space required: #{LogStash::Util::ByteValue.human_readable(total_required)}",
120+
"- Currently free space: #{LogStash::Util::ByteValue.human_readable(free_space)}",
121+
"- Current PQ usage: #{LogStash::Util::ByteValue.human_readable(current_usage)}",
122+
"- Additional space needed: #{LogStash::Util::ByteValue.human_readable(additional_needed)}",
123+
"",
124+
"Individual queue requirements:",
125+
*paths.map { |path, _|
126+
used = get_page_size(::File.join(path, "page.*"))
127+
[
128+
" #{path}:",
129+
" Current size: #{LogStash::Util::ByteValue.human_readable(used)}",
130+
" Maximum size: #{LogStash::Util::ByteValue.human_readable(total_required / paths.size)}"
131+
]
132+
}.flatten,
133+
"" # Empty line between filesystems
134+
])
135+
end
95136

96-
warn_msg << disk_warn_msg unless disk_warn_msg.empty?
137+
# Add common footer
138+
message_parts.concat([
139+
"Please either:",
140+
"1. Free up disk space",
141+
"2. Reduce queue.max_bytes in your pipeline configurations",
142+
"3. Move PQ storage to a filesystem with more available space",
143+
"Note: Logstash may fail to start if this is not resolved.",
144+
""
145+
])
146+
147+
warn_msg << message_parts.join("\n")
97148
end
98149

99150
def get_file_system(queue_path)

logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb

+159-10
Original file line numberDiff line numberDiff line change
@@ -80,24 +80,173 @@
8080
end
8181

8282
context("disk does not have sufficient space") do
83-
# two pq with different paths
84-
let(:settings1) { settings.dup.merge("queue.max_bytes" => "1000pb") }
85-
let(:settings2) { settings1.dup.merge("path.queue" => Stud::Temporary.directory) }
83+
let(:pipeline_id) { "main" }
84+
# Create a pipeline config double that matches what the class expects
85+
let(:pipeline_config) do
86+
double("PipelineConfig").tap do |config|
87+
allow(config).to receive(:pipeline_id).and_return(pipeline_id)
88+
allow(config).to receive(:settings).and_return(
89+
double("Settings").tap do |s|
90+
allow(s).to receive(:get).with("queue.type").and_return("persisted")
91+
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) # 300GB
92+
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) # 64MB
93+
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id)
94+
allow(s).to receive(:get).with("path.queue").and_return(queue_path)
95+
end
96+
)
97+
end
98+
end
99+
100+
before do
101+
allow(Dir).to receive(:glob).and_return(["page.1"])
102+
allow(File).to receive(:size).and_return(25 * 1024 * 1024 * 1024)
103+
allow(FsUtil).to receive(:hasFreeSpace).and_return(false)
104+
allow(Files).to receive(:exists).and_return(true)
86105

87-
let(:pipeline_configs) do
88-
LogStash::Config::Source::Local.new(settings1).pipeline_configs +
89-
LogStash::Config::Source::Local.new(settings2).pipeline_configs
106+
# Mock filesystem
107+
mock_file_store = double("FileStore",
108+
name: "disk1",
109+
getUsableSpace: 100 * 1024 * 1024 * 1024 # 100GB free
110+
)
111+
allow(Files).to receive(:getFileStore).and_return(mock_file_store)
90112
end
91113

92-
it "should throw" do
114+
it "reports detailed space information" do
93115
expect(pq_config_validator).to receive(:check_disk_space) do |_, _, required_free_bytes|
94116
expect(required_free_bytes.size).to eq(1)
95-
expect(required_free_bytes.values[0]).to eq(1024**5 * 1000 * 2) # require 2000pb
117+
expect(required_free_bytes.values[0]).to eq(300 * 1024 * 1024 * 1024)
96118
end.and_call_original
97119

98-
expect(pq_config_validator.logger).to receive(:warn).once.with(/won't fit in file system/)
120+
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
121+
expect(msg).to include("Total space required: 300gb")
122+
expect(msg).to include("Current PQ usage: 25gb")
123+
end
99124

100-
pq_config_validator.check({}, pipeline_configs)
125+
pq_config_validator.check({}, [pipeline_config])
126+
end
127+
128+
context "with multiple pipelines" do
129+
let(:pipeline_id1) { "main" }
130+
let(:pipeline_id2) { "secondary" }
131+
let(:pipeline_id3) { "third" }
132+
133+
let(:base_queue_path) { queue_path }
134+
let(:queue_path1) { ::File.join(base_queue_path, pipeline_id1) }
135+
let(:queue_path2) { ::File.join(base_queue_path, pipeline_id2) }
136+
let(:queue_path3) { ::File.join(Stud::Temporary.directory, pipeline_id3) }
137+
138+
let(:pipeline_config1) do
139+
double("PipelineConfig").tap do |config|
140+
allow(config).to receive(:pipeline_id).and_return(pipeline_id1)
141+
allow(config).to receive(:settings).and_return(
142+
double("Settings").tap do |s|
143+
allow(s).to receive(:get).with("queue.type").and_return("persisted")
144+
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
145+
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
146+
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id1)
147+
allow(s).to receive(:get).with("path.queue").and_return(base_queue_path)
148+
end
149+
)
150+
end
151+
end
152+
153+
let(:pipeline_config2) do
154+
double("PipelineConfig").tap do |config|
155+
allow(config).to receive(:pipeline_id).and_return(pipeline_id2)
156+
allow(config).to receive(:settings).and_return(
157+
double("Settings").tap do |s|
158+
allow(s).to receive(:get).with("queue.type").and_return("persisted")
159+
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
160+
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
161+
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id2)
162+
allow(s).to receive(:get).with("path.queue").and_return(base_queue_path)
163+
end
164+
)
165+
end
166+
end
167+
168+
let(:pipeline_config3) do
169+
double("PipelineConfig").tap do |config|
170+
allow(config).to receive(:pipeline_id).and_return(pipeline_id3)
171+
allow(config).to receive(:settings).and_return(
172+
double("Settings").tap do |s|
173+
allow(s).to receive(:get).with("queue.type").and_return("persisted")
174+
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
175+
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
176+
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id3)
177+
allow(s).to receive(:get).with("path.queue").and_return(::File.dirname(queue_path3))
178+
end
179+
)
180+
end
181+
end
182+
183+
let(:mock_file_store1) { double("FileStore", name: "disk1", getUsableSpace: 100 * 1024 * 1024 * 1024) }
184+
let(:mock_file_store2) { double("FileStore", name: "disk2", getUsableSpace: 50 * 1024 * 1024 * 1024) }
185+
186+
before do
187+
# Precise path matching for Dir.glob
188+
allow(Dir).to receive(:glob) do |pattern|
189+
case pattern
190+
when /#{pipeline_id1}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"]
191+
when /#{pipeline_id2}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1", "#{::File.dirname(pattern)}/page.2"]
192+
when /#{pipeline_id3}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"]
193+
else []
194+
end
195+
end
196+
197+
# Set up file size matching with full paths
198+
allow(File).to receive(:size) do |path|
199+
case
200+
when path.include?(pipeline_id1) then 30 * 1024 * 1024 * 1024 # 30GB for main
201+
when path.include?(pipeline_id2) then 25 * 1024 * 1024 * 1024 # 25GB for secondary
202+
when path.include?(pipeline_id3) then 25 * 1024 * 1024 * 1024 # 25GB for third
203+
else 0
204+
end
205+
end
206+
207+
allow(Files).to receive(:getFileStore) do |path|
208+
case path.toString
209+
when /#{pipeline_id3}/ then mock_file_store2
210+
else mock_file_store1
211+
end
212+
end
213+
214+
allow(FsUtil).to receive(:hasFreeSpace).and_return(false)
215+
allow(Files).to receive(:exists).and_return(true)
216+
end
217+
218+
context "with multiple queues on same filesystem" do
219+
it "reports consolidated information for same filesystem" do
220+
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
221+
expect(msg).to match(/Persistent queues require more disk space than is available on a filesystem:/)
222+
expect(msg).to match(/Filesystem 'disk1':/)
223+
expect(msg).to match(/Total space required: 600gb/) # 300GB * 2
224+
expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB)
225+
expect(msg).to match(/Current size: 30gb/) # First queue
226+
expect(msg).to match(/Current size: 50gb/) # Second queue (2 files * 25GB)
227+
end
228+
229+
pq_config_validator.check({}, [pipeline_config1, pipeline_config2])
230+
end
231+
end
232+
233+
context "with queues across multiple filesystems" do
234+
it "reports separate information for each filesystem" do
235+
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
236+
# First filesystem
237+
expect(msg).to match(/Filesystem 'disk1':/)
238+
expect(msg).to match(/Total space required: 600gb/) # 300GB * 2
239+
expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB)
240+
241+
# Second filesystem
242+
expect(msg).to match(/Filesystem 'disk2':/)
243+
expect(msg).to match(/Total space required: 300gb/) # 300GB
244+
expect(msg).to match(/Current PQ usage: 25gb/) # 25GB
245+
end
246+
247+
pq_config_validator.check({}, [pipeline_config1, pipeline_config2, pipeline_config3])
248+
end
249+
end
101250
end
102251
end
103252

0 commit comments

Comments
 (0)