fix: Fix issue with stream upload batch size upload limit (#2290) · googleapis/python-bigquery-dataframes@6cdf64b · GitHub
Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.

Commit 6cdf64b

Browse files
fix: Fix issue with stream upload batch size upload limit (#2290)
1 parent c4cb39d commit 6cdf64b

3 files changed

Lines changed: 86 additions & 12 deletions

File tree

bigframes/core/local_data.py

Lines changed: 2 additions & 1 deletion

bigframes/session/loader.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import datetime
2020
import io
2121
import itertools
22+
import math
2223
import os
2324
import typing
2425
from typing import (
@@ -397,6 +398,15 @@ def stream_data(
397398
offsets_col: str,
398399
) -> bq_data.BigqueryDataSource:
399400
"""Load managed data into bigquery"""
401+
MAX_BYTES = 10000000 # streaming api has 10MB limit
402+
SAFETY_MARGIN = (
403+
40 # Perf seems bad for large chunks, so do 40x smaller than max
404+
)
405+
batch_count = math.ceil(
406+
data.metadata.total_bytes / (MAX_BYTES // SAFETY_MARGIN)
407+
)
408+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
409+
400410
schema_w_offsets = data.schema.append(
401411
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
402412
)
@@ -410,16 +420,24 @@ def stream_data(
410420
)
411421
rows_w_offsets = ((*row, offset) for offset, row in enumerate(rows))
412422

413-
for errors in self._bqclient.insert_rows(
414-
load_table_destination,
415-
rows_w_offsets,
416-
selected_fields=bq_schema,
417-
row_ids=map(str, itertools.count()), # used to ensure only-once insertion
418-
):
419-
if errors:
420-
raise ValueError(
421-
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
422-
)
423+
# TODO: don't use batched
424+
batches = _batched(rows_w_offsets, rows_per_batch)
425+
ids_iter = map(str, itertools.count())
426+
427+
for batch in batches:
428+
batch_rows = list(batch)
429+
row_ids = itertools.islice(ids_iter, len(batch_rows))
430+
431+
for errors in self._bqclient.insert_rows(
432+
load_table_destination,
433+
batch_rows,
434+
selected_fields=bq_schema,
435+
row_ids=row_ids, # used to ensure only-once insertion
436+
):
437+
if errors:
438+
raise ValueError(
439+
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
440+
)
423441
destination_table = self._bqclient.get_table(load_table_destination)
424442
return bq_data.BigqueryDataSource(
425443
bq_data.GbqTable.from_table(destination_table),
@@ -434,6 +452,15 @@ def write_data(
434452
offsets_col: str,
435453
) -> bq_data.BigqueryDataSource:
436454
"""Load managed data into bigquery"""
455+
MAX_BYTES = 10000000 # streaming api has 10MB limit
456+
SAFETY_MARGIN = (
457+
4 # aim for 2.5mb to account for row variance, format differences, etc.
458+
)
459+
batch_count = math.ceil(
460+
data.metadata.total_bytes / (MAX_BYTES // SAFETY_MARGIN)
461+
)
462+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
463+
437464
schema_w_offsets = data.schema.append(
438465
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
439466
)
@@ -450,7 +477,9 @@ def write_data(
450477

451478
def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]:
452479
schema, batches = data.to_arrow(
453-
offsets_col=offsets_col, duration_type="int"
480+
offsets_col=offsets_col,
481+
duration_type="int",
482+
max_chunksize=rows_per_batch,
454483
)
455484
offset = 0
456485
for batch in batches:
@@ -1332,3 +1361,10 @@ def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype):
13321361
f"Nested JSON types, found in column `{name}`: `{column_type}`', "
13331362
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
13341363
)
1364+
1365+
1366+
# itertools.batched not available in python <3.12, so we use this instead
1367+
def _batched(iterator: Iterable, n: int) -> Iterable:
1368+
assert n > 0
1369+
while batch := tuple(itertools.islice(iterator, n)):
1370+
yield batch

tests/system/large/test_session.py

Lines changed: 37 additions & 0 deletions

0 commit comments

Comments
 (0)