From 156538e95c8f0f031a10a5bd6b67abcf0100a2e5 Mon Sep 17 00:00:00 2001 From: Brendan Fahy Date: Tue, 24 Oct 2023 16:02:02 -0400 Subject: [PATCH] Modeled after fsspec.generic.rsync this uses two fs explicitly Pretty much identical to the generic one but doesn't create the directories async. The generic one defines a make_many_dirs which tries to make them async if it can and falls back to serial, this just uses the simpler version of non async. Can do the same helper function though. This is needed for cases of syncing two different storage systems that might have the same protocol. An example would be two different s3 buckets that use different credentials. This allows you to make two different s3fs instances each passing different parameters and still sync them. The generic interface is great for giving a simple implicit way to sync things, but often we need fine grained control of what our filesystem parameters are so having an explicit method as well is useful. (Zen of Python line 2) --- fsspec/utils.py | 98 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/fsspec/utils.py b/fsspec/utils.py index c7e3e3ddc..ee9c5f28d 100644 --- a/fsspec/utils.py +++ b/fsspec/utils.py @@ -627,3 +627,101 @@ def atomic_write(path: str, mode: str = "wb"): raise else: os.replace(fn, path) + + +def rsync( + source, + destination, + source_fs: AbstractFileSystem, + target_fs: AbstractFileSystem, + delete_missing=False, + source_field="size", + dest_field="size", + update_cond="different", + inst_kwargs=None, +) -> None: + """Sync files between two directory trees + + (experimental) + + Parameters + ---------- + source: str + Root of the directory tree to take files from. This must be a directory, but + do not include any terminating "/" character + destination: str + Root path to copy into. The contents of this location should be + identical to the contents of ``source`` when done. This will be made a + directory, and the terminal "/" should not be included. + delete_missing: bool + If there are paths in the destination that don't exist in the + source and this is True, delete them. Otherwise, leave them alone. + source_field: str | callable + If ``update_field`` is "different", this is the key in the info + of source files to consider for difference. Maybe a function of the + info dict. + dest_field: str | callable + If ``update_field`` is "different", this is the key in the info + of destination files to consider for difference. May be a function of + the info dict. + update_cond: "different"|"always"|"never" + If "always", every file is copied, regardless of whether it exists in + the destination. If "never", files that exist in the destination are + not copied again. If "different" (default), only copy if the info + fields given by ``source_field`` and ``dest_field`` (usually "size") + are different. Other comparisons may be added in the future. + inst_kwargs: dict|None + If ``fs`` is None, use this set of keyword arguments to make a + GenericFileSystem instance + fs: GenericFileSystem|None + Instance to use if explicitly given. The instance defines how to + to make downstream file system instances from paths. + """ + logger = logging.getLogger("fsspec.rsync") + source = source_fs._strip_protocol(source) + destination = target_fs._strip_protocol(destination) + allfiles = source_fs.find(source, withdirs=True, detail=True) + if not source_fs.isdir(source): + raise ValueError("Can only rsync on a directory") + otherfiles = target_fs.find(destination, withdirs=True, detail=True) + dirs = [ + a + for a, v in allfiles.items() + if v["type"] == "directory" and a.replace(source, destination) not in otherfiles + ] + logger.debug(f"{len(dirs)} directories to create") + if dirs: + for dirn in dirs: + target_fs.makedirs(dirn.replace(source, destination), exist_ok=True) + allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"} + logger.debug(f"{len(allfiles)} files to consider for copy") + to_delete = [ + o + for o, v in otherfiles.items() + if o.replace(destination, source) not in allfiles and v["type"] == "file" + ] + for k, v in allfiles.copy().items(): + otherfile = k.replace(source, destination) + if otherfile in otherfiles: + if update_cond == "always": + allfiles[k] = otherfile + elif update_cond == "different": + inf1 = source_field(v) if callable(source_field) else v[source_field] + v2 = otherfiles[otherfile] + inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field] + if inf1 != inf2: + # details mismatch, make copy + allfiles[k] = otherfile + else: + # details match, don't copy + allfiles.pop(k) + else: + # file not in target yet + allfiles[k] = otherfile + logger.debug(f"{len(allfiles)} files to copy") + if allfiles: + source_files, target_files = zip(*allfiles.items()) + source_fs.cp(source_files, target_files, **kwargs) + logger.debug(f"{len(to_delete)} files to delete") + if delete_missing: + target_fs.rm(to_delete)