Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit 4caf74c

Browse files
feat: Add cloud_function_cpus option to remote_function (#2475)
1 parent cb00daa commit 4caf74c

5 files changed

Lines changed: 120 additions & 17 deletions

File tree

bigframes/functions/_function_client.py

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
# BQ managed functions (@udf) currently only support Python 3.11.
6666
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
6767

68+
_DEFAULT_FUNCTION_MEMORY_MIB = 1024
69+
6870

6971
class FunctionClient:
7072
# Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -402,8 +404,12 @@ def create_cloud_function(
402404
is_row_processor=False,
403405
vpc_connector=None,
404406
vpc_connector_egress_settings="private-ranges-only",
405-
memory_mib=1024,
407+
memory_mib=None,
408+
cpus=None,
406409
ingress_settings="internal-only",
410+
workers=None,
411+
threads=None,
412+
concurrency=None,
407413
):
408414
"""Create a cloud function from the given user defined function."""
409415

@@ -486,6 +492,8 @@ def create_cloud_function(
486492
function.service_config = functions_v2.ServiceConfig()
487493
if memory_mib is not None:
488494
function.service_config.available_memory = f"{memory_mib}Mi"
495+
if cpus is not None:
496+
function.service_config.available_cpu = str(cpus)
489497
if timeout_seconds is not None:
490498
if timeout_seconds > 1200:
491499
raise bf_formatting.create_exception_with_feedback_link(
@@ -517,6 +525,20 @@ def create_cloud_function(
517525
function.service_config.service_account_email = (
518526
self._cloud_function_service_account
519527
)
528+
if concurrency:
529+
function.service_config.max_instance_request_concurrency = concurrency
530+
531+
# Functions framework use environment variables to pass config to gunicorn
532+
# See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241
533+
# Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43
534+
env_vars = {}
535+
if workers:
536+
env_vars["WORKERS"] = str(workers)
537+
if threads:
538+
env_vars["THREADS"] = str(threads)
539+
if env_vars:
540+
function.service_config.environment_variables = env_vars
541+
520542
if ingress_settings not in _INGRESS_SETTINGS_MAP:
521543
raise bf_formatting.create_exception_with_feedback_link(
522544
ValueError,
@@ -581,6 +603,7 @@ def provision_bq_remote_function(
581603
cloud_function_vpc_connector,
582604
cloud_function_vpc_connector_egress_settings,
583605
cloud_function_memory_mib,
606+
cloud_function_cpus,
584607
cloud_function_ingress_settings,
585608
bq_metadata,
586609
):
@@ -616,6 +639,21 @@ def provision_bq_remote_function(
616639
)
617640
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)
618641

642+
if cloud_function_memory_mib is None:
643+
cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB
644+
645+
# assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL
646+
# therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker
647+
expected_milli_cpus = (
648+
int(cloud_function_cpus * 1000)
649+
if (cloud_function_cpus is not None)
650+
else _infer_milli_cpus_from_memory(cloud_function_memory_mib)
651+
)
652+
workers = -(expected_milli_cpus // -1000) # ceil(cpus) without invoking floats
653+
threads = 4 # (per worker)
654+
# max concurrency==1 for vcpus < 1 hard limit from cloud run
655+
concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1
656+
619657
# Create the cloud function if it does not exist
620658
if not cf_endpoint:
621659
cf_endpoint = self.create_cloud_function(
@@ -630,7 +668,11 @@ def provision_bq_remote_function(
630668
vpc_connector=cloud_function_vpc_connector,
631669
vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
632670
memory_mib=cloud_function_memory_mib,
671+
cpus=cloud_function_cpus,
633672
ingress_settings=cloud_function_ingress_settings,
673+
workers=workers,
674+
threads=threads,
675+
concurrency=concurrency,
634676
)
635677
else:
636678
logger.info(f"Cloud function {cloud_function_name} already exists.")
@@ -696,3 +738,27 @@ def get_remote_function_specs(self, remote_function_name):
696738
# Note: list_routines doesn't make an API request until we iterate on the response object.
697739
pass
698740
return (http_endpoint, bq_connection)
741+
742+
743+
def _infer_milli_cpus_from_memory(memory_mib: int) -> int:
744+
# observed values, not formally documented by cloud run functions
745+
if memory_mib < 128:
746+
raise ValueError("Cloud run supports at minimum 128MiB per instance")
747+
elif memory_mib == 128:
748+
return 83
749+
elif memory_mib <= 256:
750+
return 167
751+
elif memory_mib <= 512:
752+
return 333
753+
elif memory_mib <= 1024:
754+
return 583
755+
elif memory_mib <= 2048: