Skip to content

Commit 9cea4d1

Browse files
feat(core): error-resilience in workflow migrations (#2481)
1 parent 5943f5f commit 9cea4d1

File tree

7 files changed

+79
-27
lines changed

7 files changed

+79
-27
lines changed

renku/core/management/migrations/m_0005__2_cwl.py

+21-12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import glob
2121
import os
22+
import traceback
2223
import uuid
2324
from collections import defaultdict
2425
from functools import cmp_to_key
@@ -58,10 +59,10 @@ def migrate(migration_context):
5859
"""Migration function."""
5960
if MigrationType.WORKFLOWS not in migration_context.options.type:
6061
return
61-
_migrate_old_workflows(migration_context.client)
62+
_migrate_old_workflows(client=migration_context.client, strict=migration_context.options.strict)
6263

6364

64-
def _migrate_old_workflows(client):
65+
def _migrate_old_workflows(client, strict):
6566
"""Migrates old cwl workflows to new jsonld format."""
6667

6768
def sort_cwl_commits(e1, e2):
@@ -86,17 +87,25 @@ def sort_cwl_commits(e1, e2):
8687
communication.echo(f"Processing commit {n}/{len(cwl_paths)}", end="\r")
8788

8889
cwl_file, commit = element
89-
if not Path(cwl_file).exists():
90-
continue
91-
path = _migrate_cwl(client, cwl_file, commit)
92-
os.remove(cwl_file)
93-
94-
client.repository.add(cwl_file, path)
9590

96-
if client.repository.is_dirty():
97-
commit_msg = "renku migrate: committing migrated workflow"
98-
committer = Actor(name=f"renku {__version__}", email=version_url)
99-
client.repository.commit(commit_msg, committer=committer, no_verify=True)
91+
try:
92+
if not Path(cwl_file).exists():
93+
continue
94+
95+
path = _migrate_cwl(client, cwl_file, commit)
96+
os.remove(cwl_file)
97+
98+
client.repository.add(cwl_file, path)
99+
100+
if client.repository.is_dirty():
101+
commit_msg = "renku migrate: committing migrated workflow"
102+
committer = Actor(name=f"renku {__version__}", email=version_url)
103+
client.repository.commit(commit_msg, committer=committer, no_verify=True)
104+
except Exception:
105+
if strict:
106+
raise
107+
communication.echo("")
108+
communication.warn(f"Cannot process commit '{commit.hexsha}' - Exception: {traceback.format_exc()}")
100109

101110

102111
def _migrate_cwl(client, path, commit):

renku/core/management/migrations/m_0009__new_metadata_storage.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def generate_new_metadata(
183183
n_commits = len(commits)
184184

185185
for n, commit in enumerate(commits, start=1):
186-
communication.echo(f"Processing commits {n}/{n_commits} {commit.hexsha}", end="\n")
186+
communication.echo(f"Processing commits {n}/{n_commits} {commit.hexsha}", end="\r")
187187

188188
# NOTE: Treat the last commit differently if it was done by this migration
189189
is_last_commit = committed and n == n_commits

renku/core/management/migrations/models/v9.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ def from_revision(cls, client, path, revision="HEAD", parent=None, find_previous
316316
client, member_path, commit, parent=entity, find_previous=find_previous, **kwargs
317317
)
318318
)
319-
except KeyError:
319+
except errors.GitCommitNotFoundError:
320320
pass
321321

322322
else:

renku/core/metadata/repository.py

+20-2
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,19 @@ def get_object_hash(self, path: Union[Path, str], revision: str = None) -> Optio
438438
# NOTE: If object does not exist anymore, hash-object doesn't work, fall back to rev-parse
439439
revision = "HEAD"
440440

441+
def get_staged_directory_hash() -> Optional[str]:
442+
if not os.path.isdir(absolute_path):
443+
return
444+
445+
stashed_revision = self.run_git_command("stash", "create")
446+
if not stashed_revision:
447+
return
448+
449+
try:
450+
return self.run_git_command("rev-parse", f"{stashed_revision}:{relative_path}")
451+
except errors.GitCommandError:
452+
return
453+
441454
def get_object_hash_from_submodules() -> Optional[str]:
442455
for submodule in self.submodules:
443456
try:
@@ -452,8 +465,13 @@ def get_object_hash_from_submodules() -> Optional[str]:
452465
try:
453466
return self.run_git_command("rev-parse", f"{revision}:{relative_path}")
454467
except errors.GitCommandError:
455-
# NOTE: The file can be in a submodule or it was not there when the command ran but was there when workflows
456-
# were migrated (this can happen only for Usage); the project might be broken too.
468+
# NOTE: The file can be in a submodule or it can be a directory which is staged but not committed yet.
469+
# It's also possible that the file was not there when the command ran but was there when workflows were
470+
# migrated (this can happen only for Usage); the project might be broken too.
471+
staged_directory_hash = get_staged_directory_hash()
472+
if staged_directory_hash:
473+
return staged_directory_hash
474+
457475
return get_object_hash_from_submodules()
458476

459477
def get_user(self) -> "Actor":

renku/core/utils/git.py

+6-10
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,12 @@ def get_directory_members(absolute_path: Path) -> List[Entity]:
228228

229229
member_path = member.relative_to(repository.path)
230230

231-
try:
232-
assert all(member_path != m.path for m in members)
231+
assert all(member_path != m.path for m in members)
233232

234-
entity = get_entity_from_revision(repository, member_path, revision)
235-
# NOTE: If a path is not found at a revision we assume that it didn't exist at that revision
236-
if entity:
237-
members.append(entity)
238-
except KeyError:
239-
pass
233+
entity = get_entity_from_revision(repository, member_path, revision)
234+
# NOTE: If a path is not found at a revision we assume that it didn't exist at that revision
235+
if entity:
236+
members.append(entity)
240237

241238
return members
242239

@@ -246,12 +243,11 @@ def get_directory_members(absolute_path: Path) -> List[Entity]:
246243
if cached_entry:
247244
return cached_entry
248245

249-
# TODO: What checksum we get at "HEAD" if object is staged but not committed
246+
# NOTE: For untracked directory the hash is None; make sure to stage them first before calling this function.
250247
checksum = repository.get_object_hash(revision=revision, path=path)
251248
# NOTE: If object was not found at a revision it's either removed or exists in a different revision; keep the
252249
# entity and use revision as checksum
253250
checksum = checksum or revision
254-
# TODO: What would be checksum for a directory if it's not committed yet.
255251
id = Entity.generate_id(checksum=checksum, path=path)
256252

257253
absolute_path = repository.path / path

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def run(self):
137137
"Jinja2>=2.11.3,<3.0.2",
138138
"renku-sphinx-theme>=0.2.0",
139139
"sphinx-rtd-theme>=0.5.0,<1.1",
140-
"sphinxcontrib-spelling==7.*"
140+
"sphinxcontrib-spelling==7.*",
141141
],
142142
"runner": ["cwlref-runner==1.0"],
143143
"notebook": [],

tests/core/metadata/test_repository.py

+29
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,32 @@ def test_hash_deleted_objects(git_repository):
198198

199199
with pytest.raises(errors.GitCommandError):
200200
Repository.hash_object("B")
201+
202+
203+
def test_hash_directories(git_repository):
204+
"""Test hashing tree objects."""
205+
(git_repository.path / "X").mkdir()
206+
(git_repository.path / "X" / "A").write_text("modified")
207+
208+
assert git_repository.get_object_hash("X", revision="HEAD") is None
209+
assert git_repository.get_object_hash("X") is None
210+
211+
with pytest.raises(errors.GitCommandError):
212+
Repository.hash_object("X")
213+
214+
# NOTE: When staging a directory then the hash can be calculated
215+
git_repository.add("X")
216+
217+
directory_hash = git_repository.get_object_hash("X", revision="HEAD")
218+
219+
assert directory_hash is not None
220+
assert directory_hash == git_repository.get_object_hash("X")
221+
222+
# NOTE: Hash of the committed directory is the same as the staged hash
223+
git_repository.commit("Committed X")
224+
225+
assert directory_hash == git_repository.get_object_hash("X", revision="HEAD")
226+
assert directory_hash == git_repository.get_object_hash("X")
227+
228+
with pytest.raises(errors.GitCommandError):
229+
Repository.hash_object("X")

0 commit comments

Comments
 (0)