This repository was archived by the owner on May 7, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 69
fix: add df snapshots lookup for read_gbq
#229
Merged
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
e4efcf7
feat: add the recent api method for ML component
ashleyxuu 2d5a85b
fix: fix unit test failure
ashleyxuu 862a2af
fix: use the oldest timestamp for same table lookup
ashleyxuu 29b511e
Merge remote-tracking branch 'origin/main' into ashleyxu-df-snapshots…
ashleyxuu be8837e
Merge branch 'main' into ashleyxu-df-snapshots-lookup
ashleyxuu 915d7d3
Give the users the option to clear the table cached snapshot
ashleyxuu c55686a
Merge branch 'main' into ashleyxu-df-snapshots-lookup
ashleyxuu 2db606f
Merge remote-tracking branch 'origin/ashleyxu-df-snapshots-lookup' in…
ashleyxuu 995eb46
fix: add docstring to pandas
ashleyxuu 6a07ed0
add use_cache param for read_gbq_table
ashleyxuu 39e1624
reinforce running
ashleyxuu 68da750
allow empty to trigger run again
ashleyxuu 76df8fb
address comments
ashleyxuu 79f090f
fix the mypy failure
ashleyxuu fe61d97
Merge branch 'main' into ashleyxu-df-snapshots-lookup
ashleyxuu df79818
address comments
ashleyxuu 59b65e5
Merge branch 'main' into ashleyxu-df-snapshots-lookup
ashleyxuu 6ea975b
Merge branch 'main' into ashleyxu-df-snapshots-lookup
ashleyxuu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -177,6 +177,7 @@ def __init__( | |
| # Now that we're starting the session, don't allow the options to be | ||
| # changed. | ||
| context._session_started = True | ||
| self._df_snapshot: Dict[bigquery.TableReference, datetime.datetime] = {} | ||
|
|
||
| @property | ||
| def bqclient(self): | ||
|
|
@@ -232,6 +233,7 @@ def read_gbq( | |
| index_col: Iterable[str] | str = (), | ||
| col_order: Iterable[str] = (), | ||
| max_results: Optional[int] = None, | ||
| use_cache: bool = True, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we pass this to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unresolving. You didn't populate the |
||
| # Add a verify index argument that fails if the index is not unique. | ||
| ) -> dataframe.DataFrame: | ||
| # TODO(b/281571214): Generate prompt to show the progress of read_gbq. | ||
|
|
@@ -242,6 +244,7 @@ def read_gbq( | |
| col_order=col_order, | ||
| max_results=max_results, | ||
| api_name="read_gbq", | ||
| use_cache=use_cache, | ||
| ) | ||
| else: | ||
| # TODO(swast): Query the snapshot table but mark it as a | ||
|
|
@@ -253,13 +256,15 @@ def read_gbq( | |
| col_order=col_order, | ||
| max_results=max_results, | ||
| api_name="read_gbq", | ||
| use_cache=use_cache, | ||
| ) | ||
|
|
||
| def _query_to_destination( | ||
| self, | ||
| query: str, | ||
| index_cols: List[str], | ||
| api_name: str, | ||
| use_cache: bool = True, | ||
| ) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: | ||
| # If a dry_run indicates this is not a query type job, then don't | ||
| # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. | ||
|
|
@@ -284,6 +289,7 @@ def _query_to_destination( | |
| job_config = bigquery.QueryJobConfig() | ||
| job_config.labels["bigframes-api"] = api_name | ||
| job_config.destination = temp_table | ||
| job_config.use_query_cache = use_cache | ||
|
|
||
| try: | ||
| # Write to temp table to workaround BigQuery 10 GB query results | ||
|
|
@@ -305,6 +311,7 @@ def read_gbq_query( | |
| index_col: Iterable[str] | str = (), | ||
| col_order: Iterable[str] = (), | ||
| max_results: Optional[int] = None, | ||
| use_cache: bool = True, | ||
| ) -> dataframe.DataFrame: | ||
| """Turn a SQL query into a DataFrame. | ||
|
|
||
|
|
@@ -362,6 +369,7 @@ def read_gbq_query( | |
| col_order=col_order, | ||
| max_results=max_results, | ||
| api_name="read_gbq_query", | ||
| use_cache=use_cache, | ||
| ) | ||
|
|
||
| def _read_gbq_query( | ||
|
|
@@ -372,14 +380,18 @@ def _read_gbq_query( | |
| col_order: Iterable[str] = (), | ||
| max_results: Optional[int] = None, | ||
| api_name: str = "read_gbq_query", | ||
| use_cache: bool = True, | ||
| ) -> dataframe.DataFrame: | ||
| if isinstance(index_col, str): | ||
| index_cols = [index_col] | ||
| else: | ||
| index_cols = list(index_col) | ||
|
|
||
| destination, query_job = self._query_to_destination( | ||
| query, index_cols, api_name=api_name | ||
| query, | ||
| index_cols, | ||
| api_name=api_name, | ||
| use_cache=use_cache, | ||
| ) | ||
|
|
||
| # If there was no destination table, that means the query must have | ||
|
|
@@ -403,6 +415,7 @@ def _read_gbq_query( | |
| index_col=index_cols, | ||
| col_order=col_order, | ||
| max_results=max_results, | ||
| use_cache=use_cache, | ||
| ) | ||
|
|
||
| def read_gbq_table( | ||
|
|
@@ -412,6 +425,7 @@ def read_gbq_table( | |
| index_col: Iterable[str] | str = (), | ||
| col_order: Iterable[str] = (), | ||
| max_results: Optional[int] = None, | ||
| use_cache: bool = True, | ||
| ) -> dataframe.DataFrame: | ||
| """Turn a BigQuery table into a DataFrame. | ||
|
|
||
|
|
@@ -434,33 +448,22 @@ def read_gbq_table( | |
| col_order=col_order, | ||
| max_results=max_results, | ||
| api_name="read_gbq_table", | ||
| use_cache=use_cache, | ||
| ) | ||
|
|
||
| def _get_snapshot_sql_and_primary_key( | ||
| self, | ||
| table_ref: bigquery.table.TableReference, | ||
| *, | ||
| api_name: str, | ||
| use_cache: bool = True, | ||
| ) -> Tuple[ibis_types.Table, Optional[Sequence[str]]]: | ||
| """Create a read-only Ibis table expression representing a table. | ||
|
|
||
| If we can get a total ordering from the table, such as via primary key | ||
| column(s), then return those too so that ordering generation can be | ||
| avoided. | ||
| """ | ||
| if table_ref.dataset_id.upper() == "_SESSION": | ||
| # _SESSION tables aren't supported by the tables.get REST API. | ||
| return ( | ||
| self.ibis_client.sql( | ||
| f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`" | ||
| ), | ||
| None, | ||
| ) | ||
| table_expression = self.ibis_client.table( | ||
| table_ref.table_id, | ||
| database=f"{table_ref.project}.{table_ref.dataset_id}", | ||
| ) | ||
|
|
||
| # If there are primary keys defined, the query engine assumes these | ||
| # columns are unique, even if the constraint is not enforced. We make | ||
| # the same assumption and use these columns as the total ordering keys. | ||
|
|
@@ -481,14 +484,18 @@ def _get_snapshot_sql_and_primary_key( | |
|
|
||
| job_config = bigquery.QueryJobConfig() | ||
| job_config.labels["bigframes-api"] = api_name | ||
| current_timestamp = list( | ||
| self.bqclient.query( | ||
| "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", | ||
| job_config=job_config, | ||
| ).result() | ||
| )[0][0] | ||
| if use_cache and table_ref in self._df_snapshot.keys(): | ||
| snapshot_timestamp = self._df_snapshot[table_ref] | ||
| else: | ||
| snapshot_timestamp = list( | ||
| self.bqclient.query( | ||
| "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", | ||
| job_config=job_config, | ||
| ).result() | ||
| )[0][0] | ||
|
ashleyxuu marked this conversation as resolved.
|
||
| self._df_snapshot[table_ref] = snapshot_timestamp | ||
| table_expression = self.ibis_client.sql( | ||
| bigframes_io.create_snapshot_sql(table_ref, current_timestamp) | ||
| bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) | ||
| ) | ||
| return table_expression, primary_keys | ||
|
|
||
|
|
@@ -500,20 +507,21 @@ def _read_gbq_table( | |
| col_order: Iterable[str] = (), | ||
| max_results: Optional[int] = None, | ||
| api_name: str, | ||
| use_cache: bool = True, | ||
| ) -> dataframe.DataFrame: | ||
| if max_results and max_results <= 0: | ||
| raise ValueError("`max_results` should be a positive number.") | ||
|
|
||
| # TODO(swast): Can we re-use the temp table from other reads in the | ||
| # session, if the original table wasn't modified? | ||
| table_ref = bigquery.table.TableReference.from_string( | ||
| query, default_project=self.bqclient.project | ||
| ) | ||
|
|
||
| ( | ||
| table_expression, | ||
| total_ordering_cols, | ||
| ) = self._get_snapshot_sql_and_primary_key(table_ref, api_name=api_name) | ||
| ) = self._get_snapshot_sql_and_primary_key( | ||
| table_ref, api_name=api_name, use_cache=use_cache | ||
| ) | ||
|
|
||
| for key in col_order: | ||
| if key not in table_expression.columns: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.