32
32
import sys
33
33
from typing import Optional
34
34
35
- from dateutil import parser as date_parser
35
+ import dateutil . parser
36
36
import requests
37
- from requests import Response
38
37
39
38
40
39
def get_md5 (tarball : Path ) -> str :
@@ -52,32 +51,27 @@ def get_md5(tarball: Path) -> str:
52
51
53
52
def upload (
54
53
server : str , token : str , tarball : Path , metadata : Optional [list [str ]] = None
55
- ) -> Response :
56
- query_parameters = {}
57
-
54
+ ) -> requests .Response :
58
55
md5 = get_md5 (tarball )
59
- query_parameters ["access" ] = "public"
60
- dataset = tarball .name
61
- satellite , _ = tarball .parent .name .split ("::" )
62
- meta = [
63
- f"global.server.legacy.migrated:{ datetime .datetime .now (tz = datetime .timezone .utc ):%Y-%m-%dT%H:%M} "
64
- ]
65
- if satellite :
56
+ uploaded = datetime .datetime .fromtimestamp (
57
+ tarball .stat ().st_mtime , tz = datetime .timezone .utc
58
+ )
59
+ meta = [f"global.server.legacy.migrated:{ uploaded :%Y-%m-%dT%H:%M} " ]
60
+ if "::" in tarball .parent .name :
61
+ satellite , _ = tarball .parent .name .split ("::" )
66
62
meta .append (f"server.origin:{ satellite } " )
67
63
if metadata :
68
64
meta .extend (metadata )
69
- query_parameters ["metadata" ] = meta
70
- headers = {
71
- "Content-MD5" : md5 ,
72
- "content-type" : "application/octet-stream" ,
73
- "authorization" : f"bearer { token } " ,
74
- }
75
65
76
66
with tarball .open ("rb" ) as f :
77
67
return requests .put (
78
- f"{ server } /api/v1/upload/{ dataset } " ,
79
- headers = headers ,
80
- params = query_parameters ,
68
+ f"{ server } /api/v1/upload/{ tarball .name } " ,
69
+ headers = {
70
+ "Content-MD5" : md5 ,
71
+ "content-type" : "application/octet-stream" ,
72
+ "authorization" : f"bearer { token } " ,
73
+ },
74
+ params = {"metadata" : meta , "access" : "public" },
81
75
data = f ,
82
76
)
83
77
@@ -127,6 +121,9 @@ def main() -> int:
127
121
s = Path ("/opt/pbench-server/SHA1" )
128
122
version = v .read_text ().strip () if v .exists () else "unknown"
129
123
sha1 = s .read_text ().strip () if s .exists () else "unknown"
124
+
125
+ # The standard metadata keys here are modeled on those provided by the
126
+ # 0.69-11 passthrough server.
130
127
metadata = [
131
128
f"global.server.legacy.hostname:{ host } " ,
132
129
f"global.server.legacy.sha1:{ sha1 } " ,
@@ -135,21 +132,29 @@ def main() -> int:
135
132
if parsed .metadata :
136
133
metadata .extend (parsed .metadata )
137
134
135
+ # Process date range filtering arguments
138
136
since : Optional [datetime .datetime ] = None
139
137
before : Optional [datetime .datetime ] = None
140
138
141
139
since_ts : Optional [float ] = None
142
140
before_ts : Optional [float ] = None
143
141
144
142
if parsed .since :
145
- since = date_parser .parse (parsed .since )
143
+ since = dateutil . parser .parse (parsed .since )
146
144
since_ts = since .timestamp ()
147
145
148
146
if parsed .before :
149
- before = date_parser .parse (parsed .before )
147
+ before = dateutil . parser .parse (parsed .before )
150
148
before_ts = before .timestamp ()
151
149
152
150
if since and before :
151
+ if before <= since :
152
+ print (
153
+ f"SINCE ({ since } ) must be earlier than BEFORE ({ before } )" ,
154
+ file = sys .stderr ,
155
+ )
156
+ return 1
157
+
153
158
when = f" (from { since :%Y-%m-%d %H:%M} to { before :%Y-%m-%d %H:%M} )"
154
159
elif since :
155
160
when = f" (since { since :%Y-%m-%d %H:%M} )"
@@ -165,21 +170,23 @@ def main() -> int:
165
170
166
171
print (f"{ what } { when } -> SERVER { parsed .server } " )
167
172
173
+ # If a checkpoint file is specified, and already exists, load the list of
174
+ # files already uploaded.
168
175
checkpoint : Optional [Path ] = None
169
176
processed : list [str ] = []
170
177
if parsed .checkpoint :
171
178
checkpoint = parsed .checkpoint
172
- if parsed .verify :
173
- print (f"Processing checkpoint state from { checkpoint } ..." )
174
179
if checkpoint .exists ():
180
+ if parsed .verify :
181
+ print (f"Processing checkpoint state from { checkpoint } ..." )
175
182
processed = checkpoint .read_text ().splitlines ()
176
183
if parsed .verify :
177
- print (f"[CPT] done { len (processed )} " )
178
- if parsed .verify :
179
- print (
180
- f"Finished processing checkpoint data: { len (processed )} checkpointed files"
181
- )
184
+ print (
185
+ f"Finished processing checkpoint data: { len (processed )} checkpointed files"
186
+ )
182
187
188
+ # Using the specified filters and checkpoint data, determine the set of
189
+ # tarballs we'll try to upload.
183
190
if parsed .verify :
184
191
print ("Identifying target tarballs" )
185
192
skipped = 0
@@ -201,23 +208,29 @@ def main() -> int:
201
208
if str (t ) in processed :
202
209
skipped += 1
203
210
if parsed .verify :
204
- print (f"[CPT ] skip { t } " )
211
+ print (f"[checkpoint ] skip { t } " )
205
212
continue
206
213
which .append (t )
207
214
elif parsed .tarball .is_file ():
208
215
which = [parsed .tarball ]
209
216
else :
210
- print (f"Path { parsed .tarball } doesn 't exist " , file = sys .stderr )
217
+ print (f"Path { parsed .tarball } isn 't a directory or file " , file = sys .stderr )
211
218
return 1
212
219
if parsed .verify :
213
220
print (
214
221
f"Identified { len (which )} target tarballs: skipped { skipped } , { early } too old, { late } too new"
215
222
)
216
223
224
+ # We'll append successful uploads to the checkpoint file.
217
225
checkwriter : Optional [TextIOWrapper ] = None
218
226
if checkpoint :
219
227
checkwriter = checkpoint .open (mode = "a" )
220
228
229
+ # Now start the upload, checkpointing each file after a successful upload
230
+ #
231
+ # We maintain the checkpoint file for --preview to assist in testing: if
232
+ # you want to test with --preview and then do a real upload, delete the
233
+ # checkpoint file!
221
234
if parsed .verify :
222
235
print ("Uploading target files..." )
223
236
success = 0
0 commit comments