-
Notifications
You must be signed in to change notification settings - Fork 1.5k
parquet reader: move pruning predicate creation from ParquetSource to ParquetOpener #15561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc @alamb |
} | ||
.build(); | ||
builder = builder.set_bloom_filter_enabled(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I'm now using the actual explain plan to check that bloom filters are used it's easiest to just enable them and check that they pruned. It's also a more realistic test in that it's what I as a user would do to check if bloom filters are working.
let analyze_exec = Arc::new(AnalyzeExec::new( | ||
false, | ||
false, | ||
// use a new ParquetSource to avoid sharing execution metrics | ||
self.build_parquet_exec( | ||
file_schema.clone(), | ||
file_group.clone(), | ||
self.build_file_source(file_schema.clone()), | ||
), | ||
Arc::new(Schema::new(vec![ | ||
Field::new("plan_type", DataType::Utf8, true), | ||
Field::new("plan", DataType::Utf8, true), | ||
])), | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I'm refactoring RoundTrip
this is that I need to create the file source twice: ParquetSource
has internal Metrics
so if we run the query against the same ParquetSource
twice (once for the data and once for the explain analyze plan) then we end up with duplicate metrics. Cloning it doesn't help / work because the metrics themselves are Arc
ed.
I think generally asserting against the explain plan and not a handle to the ParquetSource
is more in line with how real world users use DataFusion to debug if the page index, stats, etc. are working / pruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this sounds good to me
#[tokio::test] | ||
async fn parquet_exec_display_deterministic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I canned this whole test because it seems the the point is to check that required_guarantees=
doesn't change run to run but that is no longer part of the output, so I don't see what this test would be testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the guarantees were in a HashSet and printed to the explain plan so they could change from run to run
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right I guess the point is: now that they are no longer printed to the explain plan, is this test needed or can we bin it?
// When both matched and pruned are 0, it means that the pruning predicate | ||
// was not used at all. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super intuitive (I'd prefer it was null
or none
or the whole row_groups_matched_statistics
was just not included. But I've left a comment to clarify for future readers.
@@ -498,6 +473,7 @@ impl FileSource for ParquetSource { | |||
reorder_filters: self.reorder_filters(), | |||
enable_page_index: self.enable_page_index(), | |||
enable_bloom_filter: self.bloom_filter_on_read(), | |||
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for consistency with the global option: before this I don't think this option was being referenced anywhere / it was broken. Maybe it's being decided in ListingTable
? I feel like there's a couple options that you'd think apply to ParquetSource
but only get handled via ListingTable
.
async fn load_page_index<T: AsyncFileReader>( | ||
arrow_reader: ArrowReaderMetadata, | ||
input: &mut T, | ||
options: ArrowReaderOptions, | ||
) -> Result<ArrowReaderMetadata> { | ||
let parquet_metadata = arrow_reader.metadata(); | ||
let missing_column_index = parquet_metadata.column_index().is_none(); | ||
let missing_offset_index = parquet_metadata.offset_index().is_none(); | ||
if missing_column_index || missing_offset_index { | ||
let m = Arc::try_unwrap(Arc::clone(&parquet_metadata)) | ||
.unwrap_or_else(|e| e.as_ref().clone()); | ||
let mut reader = | ||
ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); | ||
reader.load_page_index(input).await?; | ||
let new_parquet_metadata = reader.finish()?; | ||
let new_arrow_reader = | ||
ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?; | ||
Ok(new_arrow_reader) | ||
} else { | ||
// No page index, return the original metadata | ||
Ok(arrow_reader) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this should be in arrow alongside load_async
, but putting it here for now
let missing_column_index = parquet_metadata.column_index().is_none(); | ||
let missing_offset_index = parquet_metadata.offset_index().is_none(); | ||
if missing_column_index || missing_offset_index { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different than the original logic: the original logic was if missing_column_index && missing_offset_index
, which is probably fine in practice (I think they always get loaded at the same time) in theory you need to re-load if either is missing.
// Note about schemas: we are actually dealing with **3 different schemas** here: | ||
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc. | ||
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to. | ||
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not actually changing anything here, just documenting the current status quo because it's somewhat confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I think this PR makes a lot of sense to me
I do think it would be good to restore the explain plan display of pruning predicates
Also, I want to run some performance benchmarks on this branch to ensure there isn't a slowdown. Otherwise looks great to me
let analyze_exec = Arc::new(AnalyzeExec::new( | ||
false, | ||
false, | ||
// use a new ParquetSource to avoid sharing execution metrics | ||
self.build_parquet_exec( | ||
file_schema.clone(), | ||
file_group.clone(), | ||
self.build_file_source(file_schema.clone()), | ||
), | ||
Arc::new(Schema::new(vec![ | ||
Field::new("plan_type", DataType::Utf8, true), | ||
Field::new("plan", DataType::Utf8, true), | ||
])), | ||
)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this sounds good to me
#[tokio::test] | ||
async fn parquet_exec_display_deterministic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the guarantees were in a HashSet and printed to the explain plan so they could change from run to run
{ | ||
// (bloom filters use `pruning_predicate` too). | ||
// Because filter pushdown may happen dynamically as long as there is a predicate | ||
// if we have *any* predicate applied, we can't guarantee the statistics are exact. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the correct check and makes sense to me
@@ -625,7 +625,7 @@ physical_plan | |||
01)CoalesceBatchesExec: target_batch_size=8192 | |||
02)--FilterExec: column1@0 LIKE f% | |||
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 | |||
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[] | |||
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f% |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is basically my only concern with the entire PR -- that the explain plans now do not show pruning predicates / literal guarantees anymore.
This will make it harder to understand when optimizations are happening or not (because a particular predicate can't be converted for example)
Would it be possible to add this part of the explain back in (by trying to create a pruning predicate for only the explain plan, when needed)?
I realize that approach has the downside that the pruning predicate used for each file may be different than what is shown in the explain plan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think we can do that. I feared that it would be more confusing because the pruning predicate you see is not what you get in the end...
Is there any way we can inject this information at runtime? Metrics already kind of do that. It'd be nice to record the per-file pruning predicates, per file schema mappings and per-file filters once those exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reverted this part. Do you think maybe we can rename pruning_predicate=
to table_schema_pruning_predicate=
, estimated_pruning_predicate=
something to give some indication that it may not match what actually happens/happened at runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @adriangb -- I am going to run the benchmarks on this PR to make sure we didn't mess anything up, but of that goes well i think this is ready to merge
// Didn't we explicitly *not* load it above? | ||
// Well it's possible that a custom implementation of `AsyncFileReader` gives you | ||
// the page index even if you didn't ask for it (e.g. because it's cached) | ||
// so it's important to check that here to avoid extra work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
🤖 |
Crossing my fingers that the overhead of building the predicates is not an issue. If it is... we could always create it once upfront and only re-create it if the predicate or schema changes. But I'm hoping that's not necessary. |
🤖: Benchmark completed Details
|
Hmm, the benchmark results suggest something else is going on -- I will try and reproduce them |
🤖 |
That's weird. I would not have been too surprised by a 1.01x slowdown or something. 1.4x seems really high unless (1) building pruning predicates was a significant portion of query time to begin with or (2) I did something wrong with the IO and it's now making 2 FS calls instead of 1 (even then was reading metadata that slow???) |
I can reproduce the slowdown locally using this command datafusion-cli -c "SELECT * FROM hits WHERE \"URL\" LIKE '%google%' ORDER BY \"EventTime\" LIMIT 10;"
|
Huh okay maybe generating the pruning predicates is just that slow. Is that surprising to you? I'll investigate a bit later. |
I bet it is something silly -- I am doing some profiling now |
Finally off my calls. Me as well. Feel free to ping me for a call if you think I can be of any help. |
I think I see the issue -- I am just verifying now |
Here is a fix that restores the performance on my local tests: (thanks to @zhuqi-lucas for spotting the problem) |
🤖 |
Seems like the script died 😢 |
🤖: Benchmark completed Details
|
Nice! @alamb are you worried about the I am happy to implement some basic caching here (compute a predicate in |
I think it is noise. I'll run
This would be great -- maybe you can file a ticket to track it |
Looking at the numbers I think it's noise, e.g. the biggest slowdown in the most recent bench is:
But in the previous run, which included the bug causing other very large slowdowns:
|
I mean my point is: if we think the impact on performance is minimal / not measurable in a real world scenario, is it worth having the added code / complexity to do this sort of caching? |
A caching implementation also adds overhead, if we can't measure it how do we know the caching version is not slower, etc. |
If the performance impact is not measurable then no we shouldn't add the code in my opinion |
Agreed then let's just proceed as is. Thanks for your help with this; sorry I was a bit MIA this morning! |
🤖 |
🤖: Benchmark completed Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Thanks!
I plan to merge this tomorrow unless other people would like a chance to commetn |
Onwards towards topk pushdown |
This definitely is an API change -- I hit it in the delta-rs upgrade: I'll make a note to add it to the upgrade guide |
Needed for #15301 and #15057.
Closes #15534
Additionally I think this will make predicate evaluation slightly more performant for files with missing columns.
There are actually 3 file schemas going around:
TableProvider
, etc.file_schema
passed intoFileScanConfig
which is not the physical file schema, rather it's the table schema - partition columns.Currently we build predicates against (2), which means that a predicate may reference columns not found in the actual file. I believe this would result in
null
stats being created on the fly (some minimal work) and pointless evaluation of predicates (some more work).I'm not sure how this stacks up with the extra work of creating the predicates multiple times, that also has a cost. But that should be easier to cache and is O(number of files) instead of O(number of row pages), so I think it should be better.
At the very least this is more correct in my mind.