@@ -716,8 +716,8 @@ def _get_entity_df_event_timestamp_range(
716
716
717
717
MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN = """
718
718
/*
719
- Compute a deterministic hash for the `left_table_query_string` that will be used throughout
720
- all the logic as the field to GROUP BY the data
719
+ 0. Compute a deterministic hash for the `left_table_query_string` that will be used throughout
720
+ all the logic as the field to GROUP BY the data.
721
721
*/
722
722
WITH "entity_dataframe" AS (
723
723
SELECT *,
@@ -739,6 +739,10 @@ def _get_entity_df_event_timestamp_range(
739
739
740
740
{% for featureview in featureviews %}
741
741
742
+ /*
743
+ 1. Only select the required columns with entities of the featureview.
744
+ */
745
+
742
746
"{{ featureview.name }}__entity_dataframe" AS (
743
747
SELECT
744
748
{{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
@@ -752,20 +756,7 @@ def _get_entity_df_event_timestamp_range(
752
756
),
753
757
754
758
/*
755
- This query template performs the point-in-time correctness join for a single feature set table
756
- to the provided entity table.
757
-
758
- 1. We first join the current feature_view to the entity dataframe that has been passed.
759
- This JOIN has the following logic:
760
- - For each row of the entity dataframe, only keep the rows where the `timestamp_field`
761
- is less than the one provided in the entity dataframe
762
- - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
763
- is higher the the one provided minus the TTL
764
- - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
765
- computed previously
766
-
767
- The output of this CTE will contain all the necessary information and already filtered out most
768
- of the data that is not relevant.
759
+ 2. Use subquery to prepare event_timestamp, created_timestamp, entity columns and feature columns.
769
760
*/
770
761
771
762
"{{ featureview.name }}__subquery" AS (
@@ -777,94 +768,61 @@ def _get_entity_df_event_timestamp_range(
777
768
"{{ feature }}" as {% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if loop.last %}{% else %}, {% endif %}
778
769
{% endfor %}
779
770
FROM {{ featureview.table_subquery }}
780
- WHERE "{{ featureview.timestamp_field }}" <= '{{ featureview.max_event_timestamp }}'
781
- {% if featureview.ttl == 0 %}{% else %}
782
- AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
783
- {% endif %}
784
- ),
785
-
786
- "{{ featureview.name }}__base" AS (
787
- SELECT
788
- "subquery".*,
789
- "entity_dataframe"."entity_timestamp",
790
- "entity_dataframe"."{{featureview.name}}__entity_row_unique_id"
791
- FROM "{{ featureview.name }}__subquery" AS "subquery"
792
- INNER JOIN "{{ featureview.name }}__entity_dataframe" AS "entity_dataframe"
793
- ON TRUE
794
- AND "subquery"."event_timestamp" <= "entity_dataframe"."entity_timestamp"
795
-
796
- {% if featureview.ttl == 0 %}{% else %}
797
- AND "subquery"."event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_dataframe"."entity_timestamp")
798
- {% endif %}
799
-
800
- {% for entity in featureview.entities %}
801
- AND "subquery"."{{ entity }}" = "entity_dataframe"."{{ entity }}"
802
- {% endfor %}
803
771
),
804
772
805
773
/*
806
- 2. If the `created_timestamp_column` has been set, we need to
807
- deduplicate the data first. This is done by calculating the
808
- `MAX(created_at_timestamp)` for each event_timestamp.
809
- We then join the data on the next CTE
774
+ 3. If the `created_timestamp_column` has been set, we need to
775
+ deduplicate the data first. This is done by calculating the
776
+ `MAX(created_at_timestamp)` for each event_timestamp and joining back on the subquery.
777
+ Otherwise, the ASOF JOIN can have unstable side effects
778
+ https://docs.snowflake.com/en/sql-reference/constructs/asof-join#expected-behavior-when-ties-exist-in-the-right-table
810
779
*/
780
+
811
781
{% if featureview.created_timestamp_column %}
812
782
"{{ featureview.name }}__dedup" AS (
813
- SELECT
814
- "{{featureview.name}}__entity_row_unique_id",
815
- "event_timestamp",
816
- MAX("created_timestamp") AS "created_timestamp"
817
- FROM "{{ featureview.name }}__base"
818
- GROUP BY "{{featureview.name}}__entity_row_unique_id", "event_timestamp"
783
+ SELECT *
784
+ FROM "{{ featureview.name }}__subquery"
785
+ INNER JOIN (
786
+ SELECT
787
+ {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
788
+ "event_timestamp",
789
+ MAX("created_timestamp") AS "created_timestamp"
790
+ FROM "{{ featureview.name }}__subquery"
791
+ GROUP BY {{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp"
792
+ )
793
+ USING({{ featureview.entities | map('tojson') | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} "event_timestamp", "created_timestamp")
819
794
),
820
795
{% endif %}
821
796
822
797
/*
823
- 3. The data has been filtered during the first CTE "*__base"
824
- Thus we only need to compute the latest timestamp of each feature.
798
+ 4. Make ASOF JOIN of deduplicated feature CTE on reduced entity dataframe.
825
799
*/
826
- "{{ featureview.name }}__latest" AS (
800
+
801
+ "{{ featureview.name }}__asof_join" AS (
827
802
SELECT
828
- "event_timestamp",
829
- {% if featureview.created_timestamp_column %}"created_timestamp",{% endif %}
830
- "{{featureview.name}}__entity_row_unique_id"
831
- FROM
832
- (
833
- SELECT *,
834
- ROW_NUMBER() OVER(
835
- PARTITION BY "{{featureview.name}}__entity_row_unique_id"
836
- ORDER BY "event_timestamp" DESC{% if featureview.created_timestamp_column %},"created_timestamp" DESC{% endif %}
837
- ) AS "row_number"
838
- FROM "{{ featureview.name }}__base"
839
- {% if featureview.created_timestamp_column %}
840
- INNER JOIN "{{ featureview.name }}__dedup"
841
- USING ("{{featureview.name}}__entity_row_unique_id", "event_timestamp", "created_timestamp")
842
- {% endif %}
843
- )
844
- WHERE "row_number" = 1
803
+ e.*,
804
+ v.*
805
+ FROM "{{ featureview.name }}__entity_dataframe" e
806
+ ASOF JOIN {% if featureview.created_timestamp_column %}"{{ featureview.name }}__dedup"{% else %}"{{ featureview.name }}__subquery"{% endif %} v
807
+ MATCH_CONDITION (e."entity_timestamp" >= v."event_timestamp")
808
+ {% if featureview.entities %} USING({{ featureview.entities | map('tojson') | join(', ')}}) {% endif %}
845
809
),
846
810
847
811
/*
848
- 4. Once we know the latest value of each feature for a given timestamp,
849
- we can join again the data back to the original "base" dataset
812
+ 5. If TTL is configured filter the CTE to remove rows where the feature values are older than the configured ttl.
850
813
*/
851
- "{{ featureview.name }}__cleaned" AS (
852
- SELECT "base".*
853
- FROM "{{ featureview.name }}__base" AS "base"
854
- INNER JOIN "{{ featureview.name }}__latest"
855
- USING(
856
- "{{featureview.name}}__entity_row_unique_id",
857
- "event_timestamp"
858
- {% if featureview.created_timestamp_column %}
859
- ,"created_timestamp"
860
- {% endif %}
861
- )
862
- ){% if loop.last %}{% else %}, {% endif %}
863
814
815
+ "{{ featureview.name }}__ttl" AS (
816
+ SELECT *
817
+ FROM "{{ featureview.name }}__asof_join"
818
+ {% if featureview.ttl == 0 %}{% else %}
819
+ WHERE "event_timestamp" >= TIMESTAMPADD(second,-{{ featureview.ttl }},"entity_timestamp")
820
+ {% endif %}
821
+ ){% if loop.last %}{% else %}, {% endif %}
864
822
865
823
{% endfor %}
866
824
/*
867
- Joins the outputs of multiple time travel joins to a single table.
825
+ Join the outputs of multiple time travel joins to a single table.
868
826
The entity_dataframe dataset being our source of truth here.
869
827
*/
870
828
@@ -877,7 +835,7 @@ def _get_entity_df_event_timestamp_range(
877
835
{% for feature in featureview.features %}
878
836
,{% if full_feature_names %}"{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}
879
837
{% endfor %}
880
- FROM "{{ featureview.name }}__cleaned "
881
- ) "{{ featureview.name }}__cleaned " USING ("{{featureview.name}}__entity_row_unique_id")
838
+ FROM "{{ featureview.name }}__ttl "
839
+ ) "{{ featureview.name }}__ttl " USING ("{{featureview.name}}__entity_row_unique_id")
882
840
{% endfor %}
883
841
"""
0 commit comments