Skip to content

Commit 6a728fe

Browse files
authored
feat: Add support for table_create_disposition in bigquery job for offline store (feast-dev#3762)
* Add bigquery table create disposition to offline store Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com> * linting Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com> --------- Signed-off-by: Nick Zeolla <nick.zeolla@starlingbank.com>
1 parent fa600fe commit 6a728fe

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import pandas as pd
2020
import pyarrow
2121
import pyarrow.parquet
22-
from pydantic import StrictStr, validator
22+
from pydantic import ConstrainedStr, StrictStr, validator
2323
from pydantic.typing import Literal
2424
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
2525

@@ -72,6 +72,13 @@ def get_http_client_info():
7272
return http_client_info.ClientInfo(user_agent=get_user_agent())
7373

7474

75+
class BigQueryTableCreateDisposition(ConstrainedStr):
76+
"""Custom constraint for table_create_disposition. To understand more, see:
77+
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad.FIELDS.create_disposition"""
78+
79+
values = {"CREATE_NEVER", "CREATE_IF_NEEDED"}
80+
81+
7582
class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
7683
"""Offline store config for GCP BigQuery"""
7784

@@ -95,6 +102,9 @@ class BigQueryOfflineStoreConfig(FeastConfigBaseModel):
95102
gcs_staging_location: Optional[str] = None
96103
""" (optional) GCS location used for offloading BigQuery results as parquet files."""
97104

105+
table_create_disposition: Optional[BigQueryTableCreateDisposition] = None
106+
""" (optional) Specifies whether the job is allowed to create new tables. The default value is CREATE_IF_NEEDED."""
107+
98108
@validator("billing_project_id")
99109
def project_id_exists(cls, v, values, **kwargs):
100110
if v and not values["project_id"]:
@@ -324,6 +334,7 @@ def write_logged_features(
324334
job_config = bigquery.LoadJobConfig(
325335
source_format=bigquery.SourceFormat.PARQUET,
326336
schema=arrow_schema_to_bq_schema(source.get_schema(registry)),
337+
create_disposition=config.offline_store.table_create_disposition,
327338
time_partitioning=bigquery.TimePartitioning(
328339
type_=bigquery.TimePartitioningType.DAY,
329340
field=source.get_log_timestamp_column(),
@@ -384,6 +395,7 @@ def offline_write_batch(
384395
job_config = bigquery.LoadJobConfig(
385396
source_format=bigquery.SourceFormat.PARQUET,
386397
schema=arrow_schema_to_bq_schema(pa_schema),
398+
create_disposition=config.offline_store.table_create_disposition,
387399
write_disposition="WRITE_APPEND", # Default but included for clarity
388400
)
389401

0 commit comments

Comments
 (0)