Skip to content

[feature] Table Scan should take into account the table's sort order #1637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
kevinjqliu opened this issue Feb 10, 2025 · 7 comments
Open
Assignees

Comments

@kevinjqliu
Copy link
Contributor

Feature Request / Improvement

From slack: https://apache-iceberg.slack.com/archives/C029EE6HQ5D/p1739184019493269

This should probably be in plan_files

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
Returns:
List of FileScanTasks that contain both data and delete files.
"""
snapshot = self.snapshot()
if not snapshot:
return iter([])
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id
manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]
# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
metrics_evaluator = _InclusiveMetricsEvaluator(
self.table_metadata.schema(),
self.row_filter,
self.case_sensitive,
strtobool(self.options.get("include_empty_files", "false")),
).eval
min_sequence_number = _min_sequence_number(manifests)
data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)
executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
metrics_evaluator,
)
for manifest in manifests
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

@iyad-f
Copy link

iyad-f commented Feb 11, 2025

@kevinjqliu I would like to work on this, but i need a bit of clarification on what to do exactly?

@kevinjqliu
Copy link
Contributor Author

hey @iyad-f sure thing.

Iceberg has the concept of sort order https://iceberg.apache.org/spec/#sorting
An Iceberg table can declare the data is sorted in certain way so that the engine can read the data more effectively.
Write support for sort order is in #271
In this issue, I want to explore read support. Given an iceberg table that is sorted, can we efficiently leverage the sort order when reading?

I think there are two components to this.

  1. pruning manifests. use the table's sort order to efficiently skip manifest's based on its min/max values (i think this should be part of _InclusiveMetricsEvaluator above)
  2. pruning data. push down the sort order to the data file scan (we should investigate whether this is supported in pyarrow)

Let me know if that's clear. Happy to chat more

@Fokko
Copy link
Contributor

Fokko commented Feb 12, 2025

Keep in mind that the sort order is not a global sort-order but on a file level. This is very nice if you do joins (using the sort-merge strategy) and can avoid an additional sort. Also, this plays a huge role in compression since if you use run-length encoding, you can very efficiently encode the values.

Since we have all the transforms in, I think it would be a good time to check if we can implement sort-order on the write side of things.

@gabeiglio
Copy link
Contributor

If this is still unassigned I'll take it. seems fun :)

@kevinjqliu
Copy link
Contributor Author

sure @gabeiglio feel free to tag me for review :)

@gabeiglio
Copy link
Contributor

pruning manifests. use the table's sort order to efficiently skip manifest's based on its min/max values (i think this should be part of _InclusiveMetricsEvaluator above)

IIUC is already happening as _InclusiveMetricsEvaluator does check for the lower and upper bounds of the file, so we would only be looking for a way to optimize the data scan at the arrow level? @kevinjqliu

@kevinjqliu
Copy link
Contributor Author

_InclusiveMetricsEvaluator does check for the lower and upper bounds of the file

It does, but i dont think sort order is currently applied on the read side for the metadata level. I havent dug into the process yet so bare with me. I would assume that setting a sort order on a table will allow us to skip even more data files. Currently the evaluator would look at each data file and evaluate the lower/upper bound. But if the column is sorted, we can apply a binary search. Not sure how this is done on the spark/java side but im definitely interested to learn more.

looking for a way to optimize the data scan at the arrow level?

yea i wonder if pyarrow already allows us to pass in a sort order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants