Skip to content

Implement tpc benchmarks as tests #8125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
4c3676c
Intial from arcadia night launches
vladl2802 Aug 21, 2024
bb9cbb5
Revert "Intial from arcadia night launches"
vladl2802 Aug 22, 2024
a1e2644
Initial from runner
vladl2802 Aug 22, 2024
2c65657
Merge branch 'ydb-platform:main' into implement-tpc-benchmarks-as-tests
vladl2802 Aug 26, 2024
44e0a8e
(Broken) made test medium and run only 1 query in test
vladl2802 Aug 26, 2024
c0db3dd
(Broken) making test properly in-progress
vladl2802 Aug 27, 2024
3ab5595
complete making proper test: add downloaders and flame-graph as data
vladl2802 Aug 27, 2024
c840f9b
Change posix_spawn to posix_spawnp
vladl2802 Aug 28, 2024
a55aa83
turn off perf
vladl2802 Aug 28, 2024
a3e9cb3
Clean up
vladl2802 Aug 28, 2024
f07ac0b
change DATA to DATA_FILES
vladl2802 Aug 28, 2024
32bb492
change direct call to run_tests with executing it
vladl2802 Aug 28, 2024
fd59a00
make flake happy
vladl2802 Aug 28, 2024
044493f
uploading results to s3 in-progress
vladl2802 Aug 28, 2024
f8de959
Break test in order to debug s3 upload
vladl2802 Aug 29, 2024
3ca7b3e
Fix envv
vladl2802 Aug 29, 2024
f7b2d56
collect stdout hash from runs
vladl2802 Aug 29, 2024
7a85e6d
complete uploading to ydb
vladl2802 Aug 29, 2024
0dc027d
Merge branch 'ydb-platform:main' into implement-tpc-benchmarks-as-tests
vladl2802 Aug 29, 2024
1e6545c
fix upload
vladl2802 Aug 29, 2024
ca6cb32
add math and unicode_base to udfs deps
vladl2802 Aug 30, 2024
c64be54
improve run_tests for local runs
vladl2802 Aug 30, 2024
767d8ad
add ydb to peerdir
vladl2802 Aug 30, 2024
69641ec
make flake happy
vladl2802 Aug 30, 2024
910d974
init cred environ
vladl2802 Aug 30, 2024
bf245b3
pip installing something required for login in ydb
vladl2802 Aug 30, 2024
3c74c82
another try
vladl2802 Aug 30, 2024
1338d59
another try
vladl2802 Aug 30, 2024
d8d6bf5
yet another try
vladl2802 Aug 30, 2024
9799dee
missing update of ya.make for previous commit
vladl2802 Aug 30, 2024
160ebdd
add creds to environ
vladl2802 Aug 30, 2024
2c91529
revert try with building ydb
vladl2802 Aug 30, 2024
8a5d6e8
move upload_results to separate file and run it with env python
vladl2802 Sep 2, 2024
47798ac
nano fix
vladl2802 Sep 2, 2024
970bf12
Turn off perf and nano fix
vladl2802 Sep 2, 2024
5b2d3a7
another nano fix
vladl2802 Sep 2, 2024
cabe578
fix printing string value in query
vladl2802 Sep 2, 2024
595f8fd
change none to null for empty optinal value
vladl2802 Sep 2, 2024
785ef10
remove into table-name from qury
vladl2802 Sep 2, 2024
4bf9bb7
get back into table-name and fix table path
vladl2802 Sep 2, 2024
bfee6e1
another fix to query
vladl2802 Sep 2, 2024
6305d68
nano fix
vladl2802 Sep 2, 2024
cb6efda
make test large and clean up
vladl2802 Sep 2, 2024
3f537c4
return to medium size for testing and removed exit(1) at the end of test
vladl2802 Sep 2, 2024
75b8c12
final fixes and make test large and fat
vladl2802 Sep 3, 2024
e8e3e75
remove kirpich
vladl2802 Sep 3, 2024
aab6fd4
debugging
vladl2802 Sep 3, 2024
0789805
make test medium again
vladl2802 Sep 3, 2024
6e14f85
cd in working downloader dir before dowloading
vladl2802 Sep 3, 2024
db6f2b9
change cd to cwd in run and clean old
vladl2802 Sep 3, 2024
0e065d9
return creating symlink to tpc
vladl2802 Sep 3, 2024
5483b11
clean up and make test large once again
vladl2802 Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions ydb/library/benchmarks/runner/run_tests/run_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import argparse
import subprocess
import pathlib
import os
from sys import stderr


def variant(string):
if string not in ["h", "ds"]:
raise ValueError("variant must be h or ds")
return string


def paths(string):
return list(map(pathlib.Path, string.split(";")))


def parse_args():
subparser = argparse.ArgumentParser()

subparser.add_argument('--is-test', action="store_true", default=False)

subparser.add_argument('--datasize', type=int, default=1)
subparser.add_argument('--variant', type=variant, default='h')
subparser.add_argument('--tasks', type=int, default=1)

subparser.add_argument('-o', '--output', default="./results")
subparser.add_argument('--clean-old', action="store_true", default=False)
subparser.add_argument('--query-filter', action="append", default=[])

args, argv = subparser.parse_known_args()

if args.is_test:
parser = argparse.ArgumentParser()

parser.add_argument('--dqrun', type=pathlib.Path)
parser.add_argument('--gen-queries', type=pathlib.Path)
parser.add_argument('--downloaders-dir', type=pathlib.Path)
parser.add_argument('--udfs-dir', type=paths)
parser.add_argument('--fs-cfg', type=pathlib.Path)
parser.add_argument('--flame-graph', type=pathlib.Path)
parser.add_argument('--result-compare', type=pathlib.Path)
parser.add_argument('--gateways-cfg', type=pathlib.Path)
parser.add_argument('--runner-path', type=pathlib.Path)

return parser.parse_args(argv, namespace=args)
else:
parser = argparse.ArgumentParser()

parser.add_argument('--ydb-root', type=lambda path: pathlib.Path(path).resolve(), default="../../../../")

args = parser.parse_args(argv, namespace=args)

args.dqrun = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "dqrun"
args.gen_queries = args.ydb_root / "ydb" / "library" / "benchmarks" / "gen_queries" / "gen_queries"
args.downloaders_dir = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner"
args.fs_cfg = args.ydb_root / "ydb" / "library" / "yql" / "tools" / "dqrun" / "examples" / "fs.conf"
args.flame_graph = args.ydb_root / "contrib" / "tools" / "flame-graph"
args.result_compare = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "result_compare" / "result_compare"
args.gateways_cfg = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "test-gateways.conf"
args.runner_path = args.ydb_root / "ydb" / "library" / "benchmarks" / "runner" / "runner" / "runner"

udfs_prefix = args.ydb_root / "ydb" / "library" / "yql" / "udfs" / "common"
args.udfs_dir = [udfs_prefix / name for name in ["set", "url_base", "datetime2", "re2", "math", "unicode_base"]]

return args


class Runner:
def prepare_queries_dir(self, custom_pragmas):
print("Preparing queries...", file=stderr)
self.queries_dir.mkdir(parents=True, exist_ok=True)
cmd = [str(self.args.gen_queries)]
cmd += ["--output", f"{self.queries_dir}"]
cmd += ["--variant", f"{self.args.variant}"]
cmd += ["--syntax", "yql"]
cmd += ["--dataset-size", f"{self.args.datasize}"]
for it in custom_pragmas:
cmd += ["--pragma", it]
res = subprocess.run(cmd)
if res.returncode != 0:
raise OSError("Failed to prepare queries")

def prepare_tpc_dir(self):
print("Preparing tpc...", file=stderr)
cmd = [f"./download_files_{self.args.variant}_{self.args.datasize}.sh"]
res = subprocess.run(cmd, cwd=self.args.downloaders_dir)
if res.returncode != 0:
raise OSError("Failed to prepare tpc")

def __init__(self, args, enable_spilling):
self.args = args
self.enable_spilling = enable_spilling

self.queries_dir = pathlib.Path(f"queries{"+" if self.enable_spilling else "-"}spilling-{args.datasize}-{args.tasks}")
if self.args.clean_old or not self.queries_dir.exists():
self.prepare_queries_dir([
f"dq.MaxTasksPerStage={self.args.tasks}",
"dq.OptLLVM=ON"
] + [
"dq.UseFinalizeByKey=true",
"dq.EnableSpillingNodes=All",
] if self.enable_spilling else [])
self.tpc_dir = pathlib.Path(f"{self.args.downloaders_dir}/tpc/{self.args.variant}/{self.args.datasize}")
if self.args.clean_old or not self.tpc_dir.exists():
self.prepare_tpc_dir()
if not pathlib.Path("./tpc").exists():
os.symlink(f"{self.args.downloaders_dir}/tpc", f"{pathlib.Path("./tpc")}", target_is_directory=True)

self.result_dir = pathlib.Path(f"{self.args.output}/{"with" if self.enable_spilling else "no"}-spilling/{args.variant}-{args.datasize}-{args.tasks}").resolve()
self.result_dir.mkdir(parents=True, exist_ok=True)

def run(self):
cmd = ["/usr/bin/time", f"{str(self.args.runner_path)}"]
# cmd += ["--perf"]
for it in self.args.query_filter:
cmd += ["--include-q", it]
cmd += ["--query-dir", f"{str(self.queries_dir)}/{self.args.variant}"]
cmd += ["--bindings", f"{str(self.queries_dir)}/{self.args.variant}/bindings.json"]
cmd += ["--result-dir", str(self.result_dir)]
cmd += ["--flame-graph", str(self.args.flame_graph)]
cmd += [f"{self.args.dqrun}", "-s"]
cmd += ["--enable-spilling"] if self.enable_spilling else []
cmd += ["--udfs-dir", ";".join(map(str, self.args.udfs_dir))]
cmd += ["--fs-cfg", f"{str(self.args.fs_cfg)}"]
cmd += ["--gateways-cfg", f"{str(self.args.gateways_cfg)}"]
print("Running runner...", file=stderr)
subprocess.run(cmd)

print("Run results at: ", self.result_dir)
return self.result_dir


def result_compare(args, to_compare):
print("Comparing...")
cmd = [f"{args.result_compare}"]
cmd += ["-v"]
cmd += to_compare
with open(f"{args.output}/result-{args.variant}-{args.datasize}-{args.tasks}.htm", "w") as result_table:
res = subprocess.run(cmd, stdout=result_table)
if res.returncode != 0:
raise OSError("Failed to compare result")


def main():
args = parse_args()

results = []
print("With spilling...", file=stderr)
results.append(Runner(args, True).run())
print("No spilling...", file=stderr)
results.append(Runner(args, False).run())

if not args.is_test:
result_compare(args, results)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions ydb/library/benchmarks/runner/run_tests/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
PY3_PROGRAM()

PY_SRCS(
MAIN run_tests.py
)

PEERDIR(
)

END()
40 changes: 22 additions & 18 deletions ydb/library/benchmarks/runner/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):
oldmask = signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGCHLD})
try:
start_time = time_ns()
pid = os.posix_spawn(argv[0], argv, os.environ, setsigmask=oldmask, file_actions=(
pid = os.posix_spawnp(argv[0], argv, os.environ, setsigmask=oldmask, file_actions=(
([(os.POSIX_SPAWN_OPEN, 1, out, os.O_WRONLY | os.O_CREAT, 0o666)] if out else []) +
([(os.POSIX_SPAWN_OPEN, 2, err, os.O_WRONLY | os.O_CREAT, 0o666)] if err else [])
))
Expand Down Expand Up @@ -65,25 +65,25 @@ def run(argv, out, err, timeout=30*60, hard_timeout=5):


def main():

parser = argparse.ArgumentParser()
parser.add_argument('--query-dir', type=str, default='q/scalar')
parser.add_argument('--bindings', type=str, default='bindings.json')
parser.add_argument('--result-dir', type=str, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--result-dir', type=Path, default="result-{:%Y%m%dT%H%M%S}".format(datetime.datetime.now()))
parser.add_argument('--timeout', type=int, default=30*60)
parser.add_argument('--perf', action='store_true')
parser.add_argument('--arc-path', type=str, default='{}/arcadia'.format(os.environ['HOME']))
parser.add_argument('--flame-graph', type=Path, default=None)
parser.add_argument('--include-q', default=[], action='append')
parser.add_argument('--exclude-q', default=[], action='append')

args, argv = parser.parse_known_intermixed_args()

qdir = args.query_dir
bindings = args.bindings
outdir = args.result_dir
assert len(argv)
querydir = Path(qdir)
os.makedirs(outdir + '/' + qdir, exist_ok=True)
with open(outdir + '/' + qdir + "/summary.tsv", "w") as outf, \
open(outdir + '/' + qdir + "/summary.json", "w") as outj:
with open(outdir / "summary.tsv", "w") as outf, \
open(outdir / "summary.json", "w") as outj:
print(' '.join(argv + ['-p', qdir, '--bindings-file', bindings]), file=outf)
print(json.dumps({
'cmdline': argv,
Expand All @@ -92,26 +92,28 @@ def main():
'version': 100
}), file=outj)
for query in sorted(querydir.glob('**/*.sql'), key=lambda x: tuple(map(lambda y: int(y) if re.match(RE_DIGITS, y) else y, re.split(RE_DIGITS, str(x))))):
q = str(query)
name = outdir + '/' + q
q = str(query.stem)
print(f"{q}", end="", flush=True)
name = str(outdir / q)
if len(args.include_q):
include = False
for r in args.include_q:
if re.search(r, name):
if re.search(r, str(query)):
include = True
break
if not include:
continue
if len(args.exclude_q):
include = True
for r in args.exclude_q:
if re.search(r, name):
if re.search(r, str(query)):
include = False
break
if not include:
continue
print(q, end='\t', file=outf)
outname = name + '-result.yson'
print(".", end="", flush=True)
exitcode, rusage, elapsed, iostat = run(
argv + [
'--result-file', outname,
Expand All @@ -120,7 +122,7 @@ def main():
'--err-file', name + '-err.txt',
'--expr-file', name + '-expr.txt',
'--stat', name + '-stat.yson',
'-p', q
'-p', str(query)
],
name + '-stdout.txt',
name + '-stderr.txt',
Expand Down Expand Up @@ -164,25 +166,27 @@ def main():
}
}), file=outj)
outj.flush()
print(".", end="", flush=True)
if args.perf:
exitcode, rusage, elapsed, iostat = run(
['{}/ya'.format(args.arc_path), 'tool', 'perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
['perf', 'record', '-F250', '-g', '--call-graph', 'dwarf', '-o', '{}/perf.data'.format(outdir), '--'] +
argv + [
'--result-file', '/dev/null',
'--bindings-file', bindings,
'--plan-file', '/dev/null',
'--err-file', '/dev/null',
'--expr-file', '/dev/null',
'-p', q
'-p', str(query)
],
name + '-stdout-perf.txt',
name + '-stderr-perf.txt',
timeout=args.timeout)
os.system('''
{0}/ya tool perf script -i {2}/perf.data --header |
{0}/contrib/tools/flame-graph/stackcollapse-perf.pl |
{0}/contrib/tools/flame-graph/flamegraph.pl > {1}.svg
'''.format(args.arc_path, name, outdir))
perf script -i {2}/perf.data --header |
{0}/stackcollapse-perf.pl |
{0}/flamegraph.pl > {1}.svg
'''.format(args.flame_graph, name, outdir))
print(".", flush=True)


if __name__ == "__main__":
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/benchmarks/runner/runner/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,4 @@ PY_SRCS(
MAIN runner.py
)

PEERDIR(
)

END()
80 changes: 80 additions & 0 deletions ydb/library/benchmarks/runner/tpc_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import yatest.common
import pathlib
import sys
import os


class Runner:
DEPS = {
"run_tests" : "ydb/library/benchmarks/runner/run_tests",
"dqrun" : "ydb/library/yql/tools/dqrun",
"gen-queries" : "ydb/library/benchmarks/gen_queries",
"result-compare" : "ydb/library/benchmarks/runner/result_compare",
"runner" : "ydb/library/benchmarks/runner/runner"
}

DATA = {
"fs-cfg" : "ydb/library/yql/tools/dqrun/examples/fs.conf",
"gateways-cfg" : "ydb/library/benchmarks/runner/runner/test-gateways.conf",
"flame-graph" : "contrib/tools/flame-graph",
"downloaders-dir" : "ydb/library/benchmarks/runner",
}

UDFS = [
"ydb/library/yql/udfs/common/set",
"ydb/library/yql/udfs/common/url_base",
"ydb/library/yql/udfs/common/datetime2",
"ydb/library/yql/udfs/common/re2"
]

def __init__(self):
self.deps = {name : pathlib.Path(yatest.common.binary_path(path)) for name, path in self.DEPS.items()}
self.udfs = [pathlib.Path(yatest.common.binary_path(path)) for path in self.UDFS]
self.data = {name : pathlib.Path(yatest.common.source_path(path)) for name, path in self.DATA.items() if name}
self.output = pathlib.Path(yatest.common.output_path())
self.results_path = self.output / "results"
self.results_path.mkdir()

self.cmd = [str(self.deps["run_tests"]) + "/run_tests", "--is-test"]
self.cmd += ["--dqrun", str(self.deps["dqrun"]) + "/dqrun"]
self.cmd += ["--gen-queries", str(self.deps["gen-queries"]) + "/gen_queries"]
self.cmd += ["--result-compare", str(self.deps["result-compare"]) + "/result_compare"]
self.cmd += ["--downloaders-dir", str(self.data["downloaders-dir"])]
self.cmd += ["--runner", str(self.deps["runner"]) + "/runner"]
self.cmd += ["--flame-graph", str(self.data["flame-graph"])]
self.cmd += ["--udfs-dir", ";".join(map(str, self.udfs))]
self.cmd += ["--fs-cfg", str(self.data["fs-cfg"])]
self.cmd += ["--gateways-cfg", str(self.data["gateways-cfg"])]
self.cmd += ["-o", str(self.results_path)]

def wrapped_run(self, variant, datasize, tasks, query_filter):
cmd = self.cmd
cmd += ["--variant", f"{variant}"]
cmd += ["--datasize", f"{datasize}"]
cmd += ["--tasks", f"{tasks}"]
cmd += ["--clean-old"]
if query_filter:
cmd += ["--query-filter", f"{query_filter}"]
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)


def upload(result_path, s3_folder):
uploader = pathlib.Path(yatest.common.source_path("ydb/library/benchmarks/runner/upload_results.py")).resolve()
cmd = ["python3", str(uploader)]
cmd += ["--result-path", str(result_path)]
cmd += ["--s3-folder", str(s3_folder)]
yatest.common.execute(cmd, stdout=sys.stdout, stderr=sys.stderr)


def test_tpc():
is_ci = os.environ.get("PUBLIC_DIR") is not None

runner = Runner()
runner.wrapped_run("h", 1, 1, None)
result_path = runner.results_path.resolve()
print("Results path: ", result_path, file=sys.stderr)

if is_ci:
s3_folder = pathlib.Path(os.environ["PUBLIC_DIR"]).resolve()

upload(result_path, s3_folder)
Loading