-
Notifications
You must be signed in to change notification settings - Fork 7.1k
Closing streams to avoid testing issues #6128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closing streams to avoid testing issues #6128
Conversation
[ghstack-poisoned]
@@ -111,6 +114,8 @@ def _datapipe(self, resource_dps: List[IterDataPipe]) -> IterDataPipe[Dict[str, | |||
drop_none=True, | |||
) | |||
if self._split == "train_noval": | |||
for i in split_dp: | |||
StreamWrapper.cleanup_structure(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noob question: What is the functionality of clean_structure
?
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just dropping in to add some context. Note that the spurious errors were not visible on Python 3.7, which is currently the only version our CI tests against. Either merge #6065 first or at least activate the other versions temporarily to see if this PR actually fixes them.
@@ -107,7 +107,9 @@ def _prepare_sample( | |||
ann_path, ann_buffer = ann_data | |||
|
|||
image = EncodedImage.from_file(image_buffer) | |||
image_buffer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The errors we have seen in our test suite have never been with these files, but only with archives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests complain that archive stream is not closed. This is because child (unpacked file stream) also remains open and referencing parent. In pytorch/pytorch#78952 and pytorch/data#560 we introduced mechanism to close parent steams when every child is closed.
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
Rebased, would be nice to land to clean torchdata's dependency tests. |
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the effort @VitalyFedyunin! I left some questions and suggestions inline. If we have reached consensus on everything, I can take over and implement it if you want me to.
image_buffer.close() | ||
ann = read_mat(ann_buffer) | ||
ann_buffer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of doing that in every dataset individually, can't we just do it in
def fromfile( |
and
def read_mat(buffer: BinaryIO, **kwargs: Any) -> Any: |
? I think so far we don't have a case where we need to read from the same file handle twice. Plus, that would only work if the stream is seekable, which I don't know if we can guarantee.
@@ -29,8 +29,8 @@ def __init__( | |||
self.fieldnames = fieldnames | |||
|
|||
def __iter__(self) -> Iterator[Tuple[str, Dict[str, str]]]: | |||
for _, file in self.datapipe: | |||
file = (line.decode() for line in file) | |||
for _, fh in self.datapipe: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with the closing here, but why the rename? Can you revert that?
for i in scenes_dp: | ||
janitor(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Can we make the loop variable more expressive?
- Can we use
torchdata.janitor
instead to make it more clear where this is coming from?
for i in scenes_dp: | |
janitor(i) | |
for _, file in scenes_dp: | |
janitor(file) |
Plus, do we even need to use torchdata.janitor
here? Would just .close()
by sufficient?
for i in scenes_dp: | |
janitor(i) | |
for _, file in scenes_dp: | |
file.close() |
@@ -182,9 +184,11 @@ def _prepare_sample( | |||
anns, image_meta = ann_data | |||
|
|||
sample = self._prepare_image(image_data) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you revert the formatting changes?
@@ -169,9 +170,10 @@ def _classify_meta(self, data: Tuple[str, Any]) -> Optional[int]: | |||
|
|||
def _prepare_image(self, data: Tuple[str, BinaryIO]) -> Dict[str, Any]: | |||
path, buffer = data | |||
image = close_buffer(EncodedImage.from_file, buffer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If EncodedImage.from_file
closes automatically we also don't need this wrapper.
for i in split_dp: | ||
janitor(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus, don't we need to do the same on extra_split_dp
in the else
branch?
|
||
|
||
def close_buffer(fn: Callable, buffer: IO) -> Any: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was only used once and can be superseded if our read functions clean up after themselves. So this can probably be removed.
try: | ||
sample = next(iter(dataset)) | ||
iterator = iter(dataset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of sticking with the iterator pattern here, can't we just simply do
samples = list(dataset)
if not samples:
raise AssertionError(...)
sample = samples[0]
...
iterator = iter(dataset) | ||
one_element = next(iterator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
if len(StreamWrapper.session_streams) > 0: | ||
raise Exception(StreamWrapper.session_streams) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain what this does? Is StreamWrapper.session_streams
just a counter for open streams? If yes, why are we only testing this here and not in the other tests? If this is something we should check in general, we can use a decorator like
def check_unclosed_streams(test_fn):
@functools.wraps(test_fn)
def wrapper(*args, **kwargs):
if len(StreamWrapper.session_streams) > 0:
raise pytest.UsageError("Some previous test didn't clean up")
test_fn(*args, **kwargs)
if len(StreamWrapper.session_streams) > 0:
raise Assertion("This test didn't clean up")
return wrapper
@VitalyFedyunin In #6647 I redid this PR with all my suggested changes. We can take the discussion there if you want. |
Hi @VitalyFedyunin! Thank you for your pull request. We require contributors to sign our Contributor License Agreement, and yours needs attention. You currently have a record in our system, but the CLA is no longer valid, and will need to be resubmitted. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at [email protected]. Thanks! |
Superseded by #6647. |
Stack from ghstack (oldest at bottom):
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually.