@@ -364,6 +364,9 @@ def to_arrow(self):
364364 return pyarrow .Table .from_batches (record_batches )
365365
366366 # No data, return an empty Table.
367+ if self ._stream_parser is None :
368+ return pyarrow .Table .from_batches ([], schema = pyarrow .schema ([]))
369+
367370 self ._stream_parser ._parse_arrow_schema ()
368371 return pyarrow .Table .from_batches ([], schema = self ._stream_parser ._schema )
369372
@@ -396,38 +399,72 @@ def to_dataframe(self, dtypes=None):
396399 if dtypes is None :
397400 dtypes = {}
398401
399- # If it's an Arrow stream, calling to_arrow, then converting to a
400- # pandas dataframe is about 2x faster. This is because pandas.concat is
401- # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
402- # usually no-copy.
402+ # Use a "peek" strategy to check the first page, without consuming
403+ # from self.pages generator.
404+ pages = self .pages
403405 try :
404- record_batch = self .to_arrow ()
405- except NotImplementedError :
406- pass
407- else :
408- df = record_batch .to_pandas ()
406+ first_page = next (pages )
407+ except StopIteration :
408+ return self ._empty_dataframe (dtypes )
409+
410+ first_batch = None
411+ try :
412+ # Optimization: If it's an Arrow stream, calling to_arrow, then converting to a
413+ # pandas dataframe is about 2x faster. This is because pandas.concat is
414+ # rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
415+ # usually no-copy.
416+ first_batch = first_page .to_arrow ()
417+ record_batches = [first_batch ] + [p .to_arrow () for p in pages ]
418+
419+ table = pyarrow .Table .from_batches (record_batches )
420+ df = table .to_pandas ()
409421 for column in dtypes :
410422 df [column ] = pandas .Series (df [column ], dtype = dtypes [column ])
411423 return df
424+ except NotImplementedError as e :
425+ if first_batch is not None :
426+ # Unexpected state: if Arrow parsing fails mid-stream,
427+ # raise exception to prevent unreported data loss.
428+ raise RuntimeError ("Stream format changed mid-stream" ) from e
429+ # Not an Arrow stream; use generic parser.
430+ first_batch = first_page .to_dataframe (dtypes = dtypes )
431+ frames = [first_batch ] + [p .to_dataframe (dtypes = dtypes ) for p in pages ]
432+ return pandas .concat (frames )
412433
413- frames = [page .to_dataframe (dtypes = dtypes ) for page in self .pages ]
434+ def _empty_dataframe (self , dtypes ):
435+ """Create an empty DataFrame with the correct schema.
414436
415- if frames :
416- return pandas .concat (frames )
437+ This handles cases where the stream is empty but we still need to
438+ return a DataFrame with the correct columns and types. It handles
439+ both Arrow and Avro parsers, as well as the case where no parser
440+ is initialized.
441+ """
442+ if self ._stream_parser is None :
443+ df = pandas .DataFrame (columns = dtypes .keys ())
444+ for col , dtype in dtypes .items ():
445+ df [col ] = pandas .Series ([], dtype = dtype )
446+ return df
417447
418- # No data, construct an empty dataframe with columns matching the schema.
419- # The result should be consistent with what an empty ARROW stream would produce.
420- self ._stream_parser ._parse_avro_schema ()
421- schema = self ._stream_parser ._avro_schema_json
448+ if isinstance (self ._stream_parser , _ArrowStreamParser ):
449+ self ._stream_parser ._parse_arrow_schema ()
422450
423- column_dtypes = self ._dtypes_from_avro (schema ["fields" ])
424- column_dtypes .update (dtypes )
451+ df = self ._stream_parser ._schema .empty_table ().to_pandas ()
425452
426- df = pandas .DataFrame (columns = column_dtypes .keys ())
427- for column in df :
428- df [column ] = pandas .Series ([], dtype = column_dtypes [column ])
453+ for column , dtype in dtypes .items ():
454+ df [column ] = pandas .Series (df .get (column , []), dtype = dtype )
455+ return df
456+ else :
457+ self ._stream_parser ._parse_avro_schema ()
458+ schema = self ._stream_parser ._avro_schema_json
429459
430- return df
460+ column_dtypes = self ._dtypes_from_avro (schema ["fields" ])
461+ column_dtypes .update (dtypes )
462+
463+ df = pandas .DataFrame (columns = column_dtypes .keys ())
464+ for column in df :
465+ df [column ] = pandas .Series ([], dtype = column_dtypes [column ])
466+
467+ return df
431468
432469 def _dtypes_from_avro (self , avro_fields ):
433470 """Determine Pandas dtypes for columns in Avro schema.
@@ -447,11 +484,13 @@ def _dtypes_from_avro(self, avro_fields):
447484 for field_info in avro_fields :
448485 # If a type is an union of multiple types, pick the first type
449486 # that is not "null".
450- if isinstance (field_info ["type" ], list ):
451- type_info = next (item for item in field_info ["type" ] if item != "null" )
487+ type_info = field_info ["type" ]
488+ if isinstance (type_info , list ):
489+ type_info = next (item for item in type_info if item != "null" )
452490
453491 if isinstance (type_info , str ):
454492 field_dtype = type_map .get (type_info , "object" )
493+
455494 else :
456495 logical_type = type_info .get ("logicalType" )
457496 if logical_type == "timestamp-micros" :
0 commit comments