Skip to content

Commit a20c225

Browse files
authored
Implement tpc benchmarks as tests (#8125)
1 parent dfa0721 commit a20c225

File tree

7 files changed

+497
-21
lines changed

7 files changed

+497
-21
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import argparse
2+
import subprocess
3+
import pathlib
4+
import os
5+
from sys import stderr
6+
7+
8+
def variant(string):
9+
if string not in ["h", "ds"]:
10+
raise ValueError("variant must be h or ds")
11+
return string
12+
13+
14+
def paths(string):
15+
return list(map(pathlib.Path, string.split(";")))
16+
17+
18+
def parse_args():
19+
subparser = argparse.ArgumentParser()
20+
21+
subparser.add_argument('--is-test', action="store_true", default=False)
22+
23+
subparser.add_argument('--datasize', type=int, default=1)
24+
subparser.add_argument('--variant', type=variant, default='h')
25+
subparser.add_argument('--tasks', type=int, default=1)
26+
27+
subparser.add_argument('-o', '--output', default="./results")
28+
subparser.add_argument('--clean-old', action="store_true", default=False)
29+
subparser.add_argument('--query-filter', action="append", default=[])
30+
31+
args, argv = subparser.parse_known_args()
32+
33+
if args.is_test:
34+
parser = argparse.ArgumentParser()
35+
36+
parser.add_argument('--dqrun', type=pathlib.Path)
37+
parser.add_argument('--gen-queries', type=pathlib.Path)
38+
parser.add_argument('--downloaders-dir', type=pathlib.Path)
39+
parser.add_argument('--udfs-dir', type=paths)
40+
parser.add_argument('--fs-cfg', type=pathlib.Path)
41+
parser.add_argument('--flame-graph', type=pathlib.Path)
42+
parser.add_argument('--result-compare', type=pathlib.Path)
43+
parser.add_argument('--gateways-cfg', type=pathlib.Path)
44+
parser.add_argument('--runner-path', type=pathlib.Path)
45+
46+
return parser.parse_args(argv, namespace=args)
47+
else:
48+
parser = argparse.ArgumentParser()
49+
50+
parser.add_argument('--ydb-root', type=lambda path: pathlib.Path(path).resolve(), default="../../../../")
51+
52+
args = parser.parse_args(argv, namespace=args)
53+
54+
args.dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun"
55+
args.gen_queries = args.ydb_root / "ydb" / "library" / "benchmarks" / "gen_queries" / "gen_queries"
56+
args.downloaders_dir = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner"
57+
args.fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf"
58+
args.flame_graph = args.ydb_root / "contrib" / "tools" / "flame-graph"
59+
args.result_compare = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "result_compare" / "result_compare"
60+
args.gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf"
61+
args.runner_path = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "runner"
62+
63+
udfs_prefix = args.ydb_root / "ydb" / "library" / "yql" / "udfs" / "common"
64+
args.udfs_dir = [udfs_prefix / name for name in ["set", "url_base", "datetime2", "re2", "math", "unicode_base"]]
65+
66+
return args
67+
68+
69+
class Runner:
70+
def prepare_queries_dir(self, custom_pragmas):
71+
print("Preparing queries...", file=stderr)
72+
self.queries_dir.mkdir(parents=True, exist_ok=True)
73+
cmd = [str(self.args.gen_queries)]
74+
cmd += ["--output", f"{self.queries_dir}"]
75+
cmd += ["--variant", f"{self.args.variant}"]
76+
cmd += ["--syntax", "yql"]
77+
cmd += ["--dataset-size", f"{self.args.datasize}"]
78+
for it in custom_pragmas:
79+
cmd += ["--pragma", it]
80+
res = subprocess.run(cmd)
81+
if res.returncode != 0:
82+
raise OSError("Failed to prepare queries")
83+
84+
def prepare_tpc_dir(self):
85+
print("Preparing tpc...", file=stderr)
86+
cmd = [f"./download_files_{self.args.variant}_{self.args.datasize}.sh"]
87+
res = subprocess.run(cmd, cwd=self.args.downloaders_dir)
88+
if res.returncode != 0:
89+
raise OSError("Failed to prepare tpc")
90+
91+
def __init__(self, args, enable_spilling):
92+
self.args = args
93+
self.enable_spilling = enable_spilling
94+
95+
self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}")
96+
if self.args.clean_old or not self.queries_dir.exists():
97+
self.prepare_queries_dir([
98+
f"dq.MaxTasksPerStage={self.args.tasks}",
99+
"dq.OptLLVM=ON"
100+
] + [
101+
"dq.UseFinalizeByKey=true",
102+
"dq.EnableSpillingNodes=All",
103+
] if self.enable_spilling else [])
104+
self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}")
105+
if self.args.clean_old or not self.tpc_dir.exists():
106+
self.prepare_tpc_dir()
107+
if not pathlib.Path("./tpc").exists():
108+
os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True)
109+
110+
self.result_dir = pathlib.Path(f"{self.args.output}/{"with" if self.enable_spilling else "no"}-spilling/{args.variant}-{args.datasize}-{args.tasks}").resolve()
111+
self.result_dir.mkdir(parents=True, exist_ok=True)
112+
113+
def run(self):
114+
cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"]
115+
# cmd += ["--perf"]
116+
for it in self.args.query_filter:
117+
cmd += ["--include-q", it]
118+
cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"]
119+
cmd += ["--bindings", f"{str(self.queries_dir)}/{self.args.variant}/bindings.json"]
120+
cmd += ["--result-dir", str(self.result_dir)]
121+
cmd += ["--flame-graph", str(self.args.flame_graph)]
122+
cmd += [f"{self.args.dqrun}", "-s"]
123+
cmd += ["--enable-spilling"] if self.enable_spilling else []
124+
cmd += ["--udfs-dir", ";".join(map(str, self.args.udfs_dir))]
125+
cmd += ["--fs-cfg", f"{str(self.args.fs_cfg)}"]
126+
cmd += ["--gateways-cfg", f"{str(self.args.gateways_cfg)}"]
127+
print("Running runner...", file=stderr)
128+
subprocess.run(cmd)
129+
130+
print("Run results at: ", self.result_dir)
131+
return self.result_dir
132+
133+
134+
def result_compare(args, to_compare):
135+
print("Comparing...")
136+
cmd = [f"{args.result_compare}"]
137+
cmd += ["-v"]
138+
cmd += to_compare
139+
with open(f"{args.output}/result-{args.variant}-{args.datasize}-{args.tasks}.htm", "w") as result_table:
140+
res = subprocess.run(cmd, stdout=result_table)
141+
if res.returncode != 0:
142+
raise OSError("Failed to compare result")
143+
144+
145+
def main():
146+
args = parse_args()
147+
148+
results = []
149+
print("With spilling...", file=stderr)
150+
results.append(Runner(args, True).run())
151+
print("No spilling...", file=stderr)
152+
results.append(Runner(args, False).run())
153+
154+
if not args.is_test:
155+
result_compare(args, results)
156+
157+
158+
if __name__ == "__main__":
159+
main()
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
PY3_PROGRAM()
2+
3+
PY_SRCS(
4+
MAIN run_tests.py
5+
)
6+
7+
PEERDIR(
8+
)
9+
10+
END()

ydb/library/benchmarks/runner/runner/runner.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):
2727
oldmask = signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGCHLD})
2828
try:
2929
start_time = time_ns()
30-
pid = os.posix_spawn(argv[0], argv, os.environ, setsigmask=oldmask, file_actions=(
30+
pid = os.posix_spawnp(argv[0], argv, os.environ, setsigmask=oldmask, file_actions=(
3131
([(os.POSIX_SPAWN_OPEN, 1, out, os.O_WRONLY | os.O_CREAT, 0o666)] if out else []) +
3232
([(os.POSIX_SPAWN_OPEN, 2, err, os.O_WRONLY | os.O_CREAT, 0o666)] if err else [])
3333
))
@@ -65,25 +65,25 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):
6565

6666

6767
def main():
68-
6968
parser = argparse.ArgumentParser()
7069
parser.add_argument('--query-dir', type=str, default='q/scalar')
7170
parser.add_argument('--bindings', type=str, default='bindings.json')
72-
parser.add_argument('--result-dir', type=str, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
71+
parser.add_argument('--result-dir', type=Path, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
7372
parser.add_argument('--timeout', type=int, default=30*60)
7473
parser.add_argument('--perf', action='store_true')
75-
parser.add_argument('--arc-path', type=str, default='{}/arcadia'.format(os.environ['HOME']))
74+
parser.add_argument('--flame-graph', type=Path, default=None)
7675
parser.add_argument('--include-q', default=[], action='append')
7776
parser.add_argument('--exclude-q', default=[], action='append')
77+
7878
args, argv = parser.parse_known_intermixed_args()
79+
7980
qdir = args.query_dir
8081
bindings = args.bindings
8182
outdir = args.result_dir
8283
assert len(argv)
8384
querydir = Path(qdir)
84-
os.makedirs(outdir + '/' + qdir, exist_ok=True)
85-
with open(outdir + '/' + qdir + "/summary.tsv", "w") as outf, \
86-
open(outdir + '/' + qdir + "/summary.json", "w") as outj:
85+
with open(outdir / "summary.tsv", "w") as outf, \
86+
open(outdir / "summary.json", "w") as outj:
8787
print(' '.join(argv + ['-p', qdir, '--bindings-file', bindings]), file=outf)
8888
print(json.dumps({
8989
'cmdline': argv,
@@ -92,26 +92,28 @@ def main():
9292
'version': 100
9393
}), file=outj)
9494
for query in sorted(querydir.glob('**/*.sql'), key=lambda x: tuple(map(lambda y: int(y) if re.match(RE_DIGITS, y) else y, re.split(RE_DIGITS, str(x))))):
95-
q = str(query)
96-
name = outdir + '/' + q
95+
q = str(query.stem)
96+
print(f"{q}", end="", flush=True)
97+
name = str(outdir / q)
9798
if len(args.include_q):
9899
include = False
99100
for r in args.include_q:
100-
if re.search(r, name):
101+
if re.search(r, str(query)):
101102
include = True
102103
break
103104
if not include:
104105
continue
105106
if len(args.exclude_q):
106107
include = True
107108
for r in args.exclude_q:
108-
if re.search(r, name):
109+
if re.search(r, str(query)):
109110
include = False
110111
break
111112
if not include:
112113
continue
113114
print(q, end='\t', file=outf)
114115
outname = name + '-result.yson'
116+
print(".", end="", flush=True)
115117
exitcode, rusage, elapsed, iostat = run(
116118
argv + [
117119
'--result-file', outname,
@@ -120,7 +122,7 @@ def main():
120122
'--err-file', name + '-err.txt',
121123
'--expr-file', name + '-expr.txt',
122124
'--stat', name + '-stat.yson',
123-
'-p', q
125+
'-p', str(query)
124126
],
125127
name + '-stdout.txt',
126128
name + '-stderr.txt',
@@ -164,25 +166,27 @@ def main():
164166
}
165167
}), file=outj)
166168
outj.flush()
169+
print(".", end="", flush=True)
167170
if args.perf:
168171
exitcode, rusage, elapsed, iostat = run(
169-
['{}/ya'.format(args.arc_path), 'tool', 'perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
172+
['perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
170173
argv + [
171174
'--result-file', '/dev/null',
172175
'--bindings-file', bindings,
173176
'--plan-file', '/dev/null',
174177
'--err-file', '/dev/null',
175178
'--expr-file', '/dev/null',
176-
'-p', q
179+
'-p', str(query)
177180
],
178181
name + '-stdout-perf.txt',
179182
name + '-stderr-perf.txt',
180183
timeout=args.timeout)
181184
os.system('''
182-
{0}/ya tool perf script -i {2}/perf.data --header |
183-
{0}/contrib/tools/flame-graph/stackcollapse-perf.pl |
184-
{0}/contrib/tools/flame-graph/flamegraph.pl > {1}.svg
185-
'''.format(args.arc_path, name, outdir))
185+
perf script -i {2}/perf.data --header |
186+
{0}/stackcollapse-perf.pl |
187+
{0}/flamegraph.pl > {1}.svg
188+
'''.format(args.flame_graph, name, outdir))
189+
print(".", flush=True)
186190

187191

188192
if __name__ == "__main__":

ydb/library/benchmarks/runner/runner/ya.make

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,4 @@ PY_SRCS(
44
MAIN runner.py
55
)
66

7-
PEERDIR(
8-
)
9-
107
END()
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import yatest.common
2+
import pathlib
3+
import sys
4+
import os
5+
6+
7+
class Runner:
8+
DEPS = {
9+
"run_tests" : "ydb/library/benchmarks/runner/run_tests",
10+
"dqrun" : "ydb/library/yql/tools/dqrun",
11+
"gen-queries" : "ydb/library/benchmarks/gen_queries",
12+
"result-compare" : "ydb/library/benchmarks/runner/result_compare",
13+
"runner" : "ydb/library/benchmarks/runner/runner"
14+
}
15+
16+
DATA = {
17+
"fs-cfg" : "ydb/library/yql/tools/dqrun/examples/fs.conf",
18+
"gateways-cfg" : "ydb/library/benchmarks/runner/runner/test-gateways.conf",
19+
"flame-graph" : "contrib/tools/flame-graph",
20+
"downloaders-dir" : "ydb/library/benchmarks/runner",
21+
}
22+
23+
UDFS = [
24+
"ydb/library/yql/udfs/common/set",
25+
"ydb/library/yql/udfs/common/url_base",
26+
"ydb/library/yql/udfs/common/datetime2",
27+
"ydb/library/yql/udfs/common/re2"
28+
]
29+
30+
def __init__(self):
31+
self.deps = {name : pathlib.Path(yatest.common.binary_path(path)) for name, path in self.DEPS.items()}
32+
self.udfs = [pathlib.Path(yatest.common.binary_path(path)) for path in self.UDFS]
33+
self.data = {name : pathlib.Path(yatest.common.source_path(path)) for name, path in self.DATA.items() if name}
34+
self.output = pathlib.Path(yatest.common.output_path())
35+
self.results_path = self.output / "results"
36+
self.results_path.mkdir()
37+
38+
self.cmd = [str(self.deps["run_tests"]) + "/run_tests", "--is-test"]
39+
self.cmd += ["--dqrun", str(self.deps["dqrun"]) + "/dqrun"]
40+
self.cmd += ["--gen-queries", str(self.deps["gen-queries"]) + "/gen_queries"]
41+
self.cmd += ["--result-compare", str(self.deps["result-compare"]) + "/result_compare"]
42+
self.cmd += ["--downloaders-dir", str(self.data["downloaders-dir"])]
43+
self.cmd += ["--runner", str(self.deps["runner"]) + "/runner"]
44+
self.cmd += ["--flame-graph", str(self.data["flame-graph"])]
45+
self.cmd += ["--udfs-dir", ";".join(map(str, self.udfs))]
46+
self.cmd += ["--fs-cfg", str(self.data["fs-cfg"])]
47+
self.cmd += ["--gateways-cfg", str(self.data["gateways-cfg"])]
48+
self.cmd += ["-o", str(self.results_path)]
49+
50+
def wrapped_run(self, variant, datasize, tasks, query_filter):
51+
cmd = self.cmd
52+
cmd += ["--variant", f"{variant}"]
53+
cmd += ["--datasize", f"{datasize}"]
54+
cmd += ["--tasks", f"{tasks}"]
55+
cmd += ["--clean-old"]
56+
if query_filter:
57+
cmd += ["--query-filter", f"{query_filter}"]
58+
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)
59+
60+
61+
def upload(result_path, s3_folder):
62+
uploader = pathlib.Path(yatest.common.source_path("ydb/library/benchmarks/runner/upload_results.py")).resolve()
63+
cmd = ["python3", str(uploader)]
64+
cmd += ["--result-path", str(result_path)]
65+
cmd += ["--s3-folder", str(s3_folder)]
66+
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)
67+
68+
69+
def test_tpc():
70+
is_ci = os.environ.get("PUBLIC_DIR") is not None
71+
72+
runner = Runner()
73+
runner.wrapped_run("h", 1, 1, None)
74+
result_path = runner.results_path.resolve()
75+
print("Results path: ", result_path, file=sys.stderr)
76+
77+
if is_ci:
78+
s3_folder = pathlib.Path(os.environ["PUBLIC_DIR"]).resolve()
79+
80+
upload(result_path, s3_folder)

0 commit comments

Comments
 (0)