Streaming

lazy API 的另一个好处是它允许以流的方式执行查询。Polars 无需一次性处理所有数据,而是可以批量执行查询,从而能够处理内存无法容纳的数据集。除了内存压力之外,流式引擎的性能也比 Polars 的内存引擎更高。

想要在 Polars 中以流模式执行查询,需要在使用collect时指定参数engine="streaming"

Python
1q1 = (
2    pl.scan_csv("polars\iris\iris.csv")
3    .filter(pl.col("sepal_length") > 5)
4    .group_by("species")
5    .agg(pl.col("sepal_width").mean())
6)
7df = q1.collect(engine="streaming")
1shape: (3, 2)
2┌─────────────────┬─────────────┐
3│ species         ┆ sepal_width │
4│ ---             ┆ ---         │
5│ str             ┆ f64         │
6╞═════════════════╪═════════════╡
7│ Iris-versicolor ┆ 2.804255    │
8│ Iris-setosa     ┆ 3.713636    │
9│ Iris-virginica  ┆ 2.983673    │
10└─────────────────┴─────────────┘

查看流式执行计划

Polar 可以以流式方式运行许多操作。有些操作本质上是非流式的,或者尚未以流式方式实现。在后一种情况下,Polar 将回退到内存引擎来执行这些操作。用户无需了解这一点,但它对于调试内存或性能问题可能很有用。

要检查流式查询的物理计划,可以绘制物理图。图例显示了操作的内存占用情况

需要先安装graphviz环境,下载地址

Python
1import polars as pl
2import matplotlib
3matplotlib.use('TkAgg')  # 更换为TkAgg后端,处理Matplotlib与PyCharm集成存在的兼容性问题
4
5if __name__ == '__main__':
6
7    q1 = (
8        pl.scan_csv("polars\iris\iris.csv")
9        .filter(pl.col("sepal_length") > 5)
10        .group_by("species")
11        .agg(
12            mean_width=pl.col("sepal_width").mean(),
13            mean_width2=pl.col("sepal_width").sum() / pl.col("sepal_length").count(),
14        )
15        .show_graph(plan_stage="physical", engine="streaming")
16    )