6565# BQ managed functions (@udf) currently only support Python 3.11.
6666_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"
6767
68+ _DEFAULT_FUNCTION_MEMORY_MIB = 1024
69+
6870
6971class FunctionClient :
7072 # Wait time (in seconds) for an IAM binding to take effect after creation.
@@ -402,8 +404,12 @@ def create_cloud_function(
402404 is_row_processor = False ,
403405 vpc_connector = None ,
404406 vpc_connector_egress_settings = "private-ranges-only" ,
405- memory_mib = 1024 ,
407+ memory_mib = None ,
408+ cpus = None ,
406409 ingress_settings = "internal-only" ,
410+ workers = None ,
411+ threads = None ,
412+ concurrency = None ,
407413 ):
408414 """Create a cloud function from the given user defined function."""
409415
@@ -486,6 +492,8 @@ def create_cloud_function(
486492 function .service_config = functions_v2 .ServiceConfig ()
487493 if memory_mib is not None :
488494 function .service_config .available_memory = f"{ memory_mib } Mi"
495+ if cpus is not None :
496+ function .service_config .available_cpu = str (cpus )
489497 if timeout_seconds is not None :
490498 if timeout_seconds > 1200 :
491499 raise bf_formatting .create_exception_with_feedback_link (
@@ -517,6 +525,20 @@ def create_cloud_function(
517525 function .service_config .service_account_email = (
518526 self ._cloud_function_service_account
519527 )
528+ if concurrency :
529+ function .service_config .max_instance_request_concurrency = concurrency
530+
531+ # Functions framework use environment variables to pass config to gunicorn
532+ # See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241
533+ # Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43
534+ env_vars = {}
535+ if workers :
536+ env_vars ["WORKERS" ] = str (workers )
537+ if threads :
538+ env_vars ["THREADS" ] = str (threads )
539+ if env_vars :
540+ function .service_config .environment_variables = env_vars
541+
520542 if ingress_settings not in _INGRESS_SETTINGS_MAP :
521543 raise bf_formatting .create_exception_with_feedback_link (
522544 ValueError ,
@@ -581,6 +603,7 @@ def provision_bq_remote_function(
581603 cloud_function_vpc_connector ,
582604 cloud_function_vpc_connector_egress_settings ,
583605 cloud_function_memory_mib ,
606+ cloud_function_cpus ,
584607 cloud_function_ingress_settings ,
585608 bq_metadata ,
586609 ):
@@ -616,6 +639,21 @@ def provision_bq_remote_function(
616639 )
617640 cf_endpoint = self .get_cloud_function_endpoint (cloud_function_name )
618641
642+ if cloud_function_memory_mib is None :
643+ cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB
644+
645+ # assumption is most bigframes functions are cpu bound, single-threaded and many won't release GIL
646+ # therefore, want to allocate a worker for each cpu, and allow a concurrent request per worker
647+ expected_milli_cpus = (
648+ int (cloud_function_cpus * 1000 )
649+ if (cloud_function_cpus is not None )
650+ else _infer_milli_cpus_from_memory (cloud_function_memory_mib )
651+ )
652+ workers = - (expected_milli_cpus // - 1000 ) # ceil(cpus) without invoking floats
653+ threads = 4 # (per worker)
654+ # max concurrency==1 for vcpus < 1 hard limit from cloud run
655+ concurrency = (workers * threads ) if (expected_milli_cpus >= 1000 ) else 1
656+
619657 # Create the cloud function if it does not exist
620658 if not cf_endpoint :
621659 cf_endpoint = self .create_cloud_function (
@@ -630,7 +668,11 @@ def provision_bq_remote_function(
630668 vpc_connector = cloud_function_vpc_connector ,
631669 vpc_connector_egress_settings = cloud_function_vpc_connector_egress_settings ,
632670 memory_mib = cloud_function_memory_mib ,
671+ cpus = cloud_function_cpus ,
633672 ingress_settings = cloud_function_ingress_settings ,
673+ workers = workers ,
674+ threads = threads ,
675+ concurrency = concurrency ,
634676 )
635677 else :
636678 logger .info (f"Cloud function { cloud_function_name } already exists." )
@@ -696,3 +738,27 @@ def get_remote_function_specs(self, remote_function_name):
696738 # Note: list_routines doesn't make an API request until we iterate on the response object.
697739 pass
698740 return (http_endpoint , bq_connection )
741+
742+
743+ def _infer_milli_cpus_from_memory (memory_mib : int ) -> int :
744+ # observed values, not formally documented by cloud run functions
745+ if memory_mib < 128 :
746+ raise ValueError ("Cloud run supports at minimum 128MiB per instance" )
747+ elif memory_mib == 128 :
748+ return 83
749+ elif memory_mib <= 256 :
750+ return 167
751+ elif memory_mib <= 512 :
752+ return 333
753+ elif memory_mib <= 1024 :
754+ return 583
755+ elif memory_mib <= 2048 :
756+ return 1000
757+ elif memory_mib <= 8192 :
758+ return 2000
759+ elif memory_mib <= 16384 :
760+ return 4000
761+ elif memory_mib <= 32768 :
762+ return 8000
763+ else :
764+ raise ValueError ("Cloud run supports at most 32768MiB per instance" )
0 commit comments