Polars (dagster-polars)
This library provides Dagster integration with Polars. It allows using Polars eager or lazy DataFrames as inputs and outputs with Dagster’s @asset and @op. Type annotations are used to control whether to load an eager or lazy DataFrame. Lazy DataFrames can be sinked as output. Multiple serialization formats (Parquet, Delta Lake, BigQuery) and filesystems (local, S3, GCS, …) are supported.
A comprehensive list of dagster-polars behavior for supported type annotations can be found in Type AnnotationsType Annotations
section.
Note: This is a community-supported integration. For support, see the Dagster Community Integrations repository.
Installation
pip install dagster-polars
Some IOManagers (like PolarsDeltaIOManager
) may require additional dependencies, which are provided with extras like dagster-polars[delta].
Please check the documentation for each IOManager for more details.
Quickstart
Common filesystem-based IOManagers features highlights, using PolarsParquetIOManager
as an example (see BasePolarsUPathIOManager
for the full list of features provided by dagster-polars):
Type annotations are not required. By default an eager pl.DataFrame will be loaded.
from dagster import asset
import polars as pl
@asset(io_manager_key="polars_parquet_io_manager")
def upstream():
return DataFrame({"foo": [1, 2, 3]})
@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream) -> pl.LazyFrame:
assert isinstance(upstream, pl.DataFrame)
return upstream.lazy() # LazyFrame will be sinked
Lazy pl.LazyFrame can be scanned by annotating the input with pl.LazyFrame, and returning a pl.LazyFrame will sink it:
@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream: pl.LazyFrame) -> pl.LazyFrame:
assert isinstance(upstream, pl.LazyFrame)
return upstream
The same logic applies to partitioned assets:
@asset
def downstream(partitioned_upstream: Dict[str, pl.LazyFrame]):
assert isinstance(partitioned_upstream, dict)
assert isinstance(partitioned_upstream["my_partition"], pl.LazyFrame)
Optional inputs and outputs are supported:
@asset
def upstream() -> Optional[pl.DataFrame]:
if has_data:
return DataFrame({"foo": [1, 2, 3]}) # type check will pass
else:
return None # type check will pass and `dagster_polars` will skip writing the output completely
@asset
def downstream(upstream: Optional[pl.LazyFrame]): # upstream will be None if it doesn't exist in storage
...
By default all the IOManagers store separate partitions as physically separated locations, such as:
- /my/asset/key/partition_0.extension
- /my/asset/key/partition_1.extension
This mode is useful for e.g. snapshotting.
Some IOManagers (like PolarsDeltaIOManager
) support reading and writing partitions in storage-native format in the same location.
This mode can be typically enabled by setting “partition_by” metadata value. For example, PolarsDeltaIOManager
would store different partitions in the same /my/asset/key.delta directory, which will be properly partitioned.
This mode should be preferred for true partitioning.
Type Annotations
Type aliases like DataFrameWithPartitions are provided by dagster_polars.types
for convenience.
Supported type annotations and dagster-polars behavior
Type annotation | Type Alias | Behavior |
---|---|---|
DataFrame | read/write aDataFrame | |
LazyFrame | read/sink aLazyFrame | |
Optional[DataFrame] | read/write aDataFrame. Do nothing if no data is found in storage or the output isNone | |
Optional[LazyFrame] | read aLazyFrame. Do nothing if no data is found in storage | |
Dict[str, DataFrame] | DataFrameWithPartitions | read multipleDataFrames as Dict[str, DataFrame]. Raises an error for missing partitions, unless“allow_missing_partitions”input metadata is set toTrue |
Dict[str, LazyFrame] | LazyFramePartitions | read multipleLazyFrames as Dict[str, LazyFrame]. Raises an error for missing partitions, unless“allow_missing_partitions”input metadata is set toTrue |
Generic builtins (like tuple[…] instead of Tuple[…]) are supported for Python >= 3.9.