Skip to content

Commit de55992

Browse files
authored
Tarball migration utility (#3563)
* Tarball migration utility PBENCH-1279 A utility to bulk-migrate Pbench results tarballs to a Pbench Server using the `PUT` API. This supports checkpointing to more efficiently retry after a failure, and filtering by file system modification date.
1 parent 5ae25c9 commit de55992

File tree

1 file changed

+285
-0
lines changed

1 file changed

+285
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
#!/usr/bin/env python3.9
2+
3+
"""Upload results tarballs to a Pbench Server
4+
5+
This utility supports bulk migration of results tarballs to a Pbench Server.
6+
7+
It's a standalone utility (with no dependency on any Pbench packages) that can
8+
be used to send a specific tarball or a potentially large set of tarballs to a
9+
server. It supports filtering by file modification date, and checkpointing to
10+
allow restarting in case of a failure. It can be run in a "preview" mode to
11+
identify the selected tarballs without uploading.
12+
13+
`pbench-upload-results.py https://<server> /srv/pbench/backup` will upload all
14+
result tarballs nested under the directory `/srv/pbench/backup`. You can use
15+
the `--since` and `--before` timestamp selectors to include only results files
16+
modified "since" (>=) and/or "before" (<) the specified datetimes. Specify a
17+
checkpoint file with `--checkpoint <file>` to record all results successfully
18+
uploaded; after unexpected termination (e.g., crash or ^C), rerunning the same
19+
command will skip the already uploaded files.
20+
21+
Uploaded files are marked with metadata consistent with the behavior of the
22+
0.69-11 passthrough server. If the script is run on a Pbench server, metadata
23+
will record the version string and SHA1 of the Pbench installation. We also
24+
record the hostname, and a marker that it was "migrated" by this utility.
25+
"""
26+
from argparse import ArgumentParser
27+
import datetime
28+
from http import HTTPStatus
29+
from io import TextIOWrapper
30+
from pathlib import Path
31+
import socket
32+
import sys
33+
from typing import Optional
34+
35+
import dateutil.parser
36+
import requests
37+
38+
39+
def get_md5(tarball: Path) -> str:
40+
"""Read the tarball MD5 from the tarball's companion file
41+
42+
Args:
43+
tarball: Path to a tarball file with a {name}.md5 companion
44+
45+
Returns:
46+
The tarball's MD5 value
47+
"""
48+
md5_file = Path(f"{str(tarball)}.md5")
49+
return md5_file.read_text().split()[0]
50+
51+
52+
def upload(
53+
server: str, token: str, tarball: Path, metadata: Optional[list[str]] = None
54+
) -> requests.Response:
55+
md5 = get_md5(tarball)
56+
uploaded = datetime.datetime.fromtimestamp(
57+
tarball.stat().st_mtime, tz=datetime.timezone.utc
58+
)
59+
meta = [f"global.server.legacy.migrated:{uploaded:%Y-%m-%dT%H:%M}"]
60+
if "::" in tarball.parent.name:
61+
satellite, _ = tarball.parent.name.split("::", 1)
62+
meta.append(f"server.origin:{satellite}")
63+
if metadata:
64+
meta.extend(metadata)
65+
66+
with tarball.open("rb") as f:
67+
return requests.put(
68+
f"{server}/api/v1/upload/{tarball.name}",
69+
headers={
70+
"Content-MD5": md5,
71+
"content-type": "application/octet-stream",
72+
"authorization": f"bearer {token}",
73+
},
74+
params={"metadata": meta, "access": "public"},
75+
data=f,
76+
)
77+
78+
79+
def main() -> int:
80+
prog = Path(sys.argv[0])
81+
parser = ArgumentParser(prog=prog.name, description="Upload tarballs")
82+
parser.add_argument("server", help="Specify the Pbench Server address")
83+
parser.add_argument(
84+
"tarball", type=Path, help="Specify a tarball or directory path"
85+
)
86+
parser.add_argument("-b", "--before", help="Select tarballs older than this")
87+
parser.add_argument(
88+
"-c",
89+
"--checkpoint",
90+
type=Path,
91+
dest="checkpoint",
92+
help="Checkpoint file for restart",
93+
)
94+
parser.add_argument(
95+
"-m",
96+
"--metadata",
97+
action="append",
98+
dest="metadata",
99+
help="Set metadata on dataset upload",
100+
)
101+
parser.add_argument(
102+
"-p",
103+
"--preview",
104+
action="store_true",
105+
help="Report actions but make no changes",
106+
)
107+
parser.add_argument(
108+
"-s", "--since", action="store", help="Select tarballs no older than this"
109+
)
110+
parser.add_argument(
111+
"-t", "--token", action="store", dest="token", help="Pbench Server API token"
112+
)
113+
parser.add_argument(
114+
"--verify", "-v", dest="verify", action="store_true", help="Show progress"
115+
)
116+
parsed = parser.parse_args()
117+
118+
# Get basic configuration
119+
host = socket.gethostname()
120+
v = Path("/opt/pbench-server/VERSION")
121+
s = Path("/opt/pbench-server/SHA1")
122+
version = v.read_text().strip() if v.exists() else "unknown"
123+
sha1 = s.read_text().strip() if s.exists() else "unknown"
124+
125+
# The standard metadata keys here are modeled on those provided by the
126+
# 0.69-11 passthrough server.
127+
metadata = [
128+
f"global.server.legacy.hostname:{host}",
129+
f"global.server.legacy.sha1:{sha1}",
130+
f"global.server.legacy.version:{version}",
131+
]
132+
if parsed.metadata:
133+
metadata.extend(parsed.metadata)
134+
135+
# Process date range filtering arguments
136+
since: Optional[datetime.datetime] = None
137+
before: Optional[datetime.datetime] = None
138+
139+
since_ts: Optional[float] = None
140+
before_ts: Optional[float] = None
141+
142+
if parsed.since:
143+
since = dateutil.parser.parse(parsed.since)
144+
since_ts = since.timestamp()
145+
146+
if parsed.before:
147+
before = dateutil.parser.parse(parsed.before)
148+
before_ts = before.timestamp()
149+
150+
if since and before:
151+
if before <= since:
152+
print(
153+
f"SINCE ({since}) must be earlier than BEFORE ({before})",
154+
file=sys.stderr,
155+
)
156+
return 1
157+
158+
when = f" (from {since:%Y-%m-%d %H:%M} to {before:%Y-%m-%d %H:%M})"
159+
elif since:
160+
when = f" (since {since:%Y-%m-%d %H:%M})"
161+
elif before:
162+
when = f" (before {before:%Y-%m-%d %H:%M})"
163+
else:
164+
when = ""
165+
166+
if parsed.tarball.is_dir():
167+
what = f"DIRECTORY {parsed.tarball}"
168+
else:
169+
what = f"TARBALL {parsed.tarball}"
170+
171+
print(f"{what}{when} -> SERVER {parsed.server}")
172+
173+
# If a checkpoint file is specified, and already exists, load the list of
174+
# files already uploaded.
175+
checkpoint: Optional[Path] = None
176+
processed: list[str] = []
177+
if parsed.checkpoint:
178+
checkpoint = parsed.checkpoint
179+
if checkpoint.exists():
180+
if parsed.verify:
181+
print(f"Processing checkpoint state from {checkpoint}...")
182+
processed = checkpoint.read_text().splitlines()
183+
if parsed.verify:
184+
print(
185+
f"Finished processing checkpoint data: {len(processed)} checkpointed files"
186+
)
187+
188+
# Using the specified filters and checkpoint data, determine the set of
189+
# tarballs we'll try to upload.
190+
if parsed.verify:
191+
print("Identifying target tarballs")
192+
skipped = 0
193+
early = 0
194+
late = 0
195+
if parsed.tarball.is_dir():
196+
pool = parsed.tarball.glob("**/*.tar.xz")
197+
which = []
198+
for t in pool:
199+
if not t.is_file():
200+
continue
201+
date = t.stat().st_mtime
202+
if since_ts and date < since_ts:
203+
early += 1
204+
continue
205+
if before_ts and date >= before_ts:
206+
late += 1
207+
continue
208+
if str(t) in processed:
209+
skipped += 1
210+
if parsed.verify:
211+
print(f"[checkpoint] skip {t}")
212+
continue
213+
which.append(t)
214+
elif parsed.tarball.is_file():
215+
which = [parsed.tarball]
216+
else:
217+
print(f"Path {parsed.tarball} isn't a directory or file", file=sys.stderr)
218+
return 1
219+
if parsed.verify:
220+
print(
221+
f"Identified {len(which)} target tarballs: skipped {skipped}, {early} too old, {late} too new"
222+
)
223+
224+
# We'll append successful uploads to the checkpoint file.
225+
checkwriter: Optional[TextIOWrapper] = None
226+
if checkpoint:
227+
checkwriter = checkpoint.open(mode="a")
228+
229+
# Now start the upload, checkpointing each file after a successful upload
230+
#
231+
# We maintain the checkpoint file for --preview to assist in testing: if
232+
# you want to test with --preview and then do a real upload, delete the
233+
# checkpoint file! Also, note that using --preview after a failure will
234+
# change the checkpoint file; if you want to do that, you should use a
235+
# copy of the checkpoint file.
236+
if parsed.verify:
237+
print("Uploading target files...")
238+
success = 0
239+
failure = 0
240+
failures = set()
241+
duplicate = 0
242+
for t in which:
243+
try:
244+
if parsed.preview:
245+
print(f"UPLOAD {t}")
246+
success += 1
247+
if checkwriter:
248+
print(t, file=checkwriter, flush=True)
249+
else:
250+
response = upload(parsed.server, parsed.token, t, metadata=metadata)
251+
if response.ok:
252+
if response.status_code == HTTPStatus.OK:
253+
duplicate += 1
254+
else:
255+
success += 1
256+
if parsed.verify:
257+
print(f"UPLOAD {t}: {response.status_code}")
258+
if checkwriter:
259+
print(t, file=checkwriter, flush=True)
260+
else:
261+
failure += 1
262+
failures.add(response.status_code)
263+
try:
264+
message = response.json()
265+
except Exception:
266+
message = response.text
267+
print(
268+
f"Upload of {t} failed: {response.status_code} ({message})",
269+
file=sys.stderr,
270+
)
271+
except Exception as e:
272+
failure += 1
273+
print(f"Failed uploading {t}: {str(e)!r}", file=sys.stderr)
274+
275+
if checkwriter:
276+
checkwriter.close()
277+
278+
print(
279+
f"Uploaded {success} successfully; {duplicate} duplicates, {failure} failures: {failures}"
280+
)
281+
return 0
282+
283+
284+
if __name__ == "__main__":
285+
sys.exit(main())

0 commit comments

Comments
 (0)