Streaming
lazy API 的另一个好处是它允许以流的方式执行查询。Polars 无需一次性处理所有数据,而是可以批量执行查询,从而能够处理内存无法容纳的数据集。除了内存压力之外,流式引擎的性能也比 Polars 的内存引擎更高。
想要在 Polars 中以流模式执行查询,需要在使用collect
时指定参数engine="streaming"
:
Python
1q1 = (
2 pl.scan_csv("polars\iris\iris.csv")
3 .filter(pl.col("sepal_length") > 5)
4 .group_by("species")
5 .agg(pl.col("sepal_width").mean())
6)
7df = q1.collect(engine="streaming")
1shape: (3, 2)
2┌─────────────────┬─────────────┐
3│ species ┆ sepal_width │
4│ --- ┆ --- │
5│ str ┆ f64 │
6╞═════════════════╪═════════════╡
7│ Iris-versicolor ┆ 2.804255 │
8│ Iris-setosa ┆ 3.713636 │
9│ Iris-virginica ┆ 2.983673 │
10└─────────────────┴─────────────┘
查看流式执行计划
Polar 可以以流式方式运行许多操作。有些操作本质上是非流式的,或者尚未以流式方式实现。在后一种情况下,Polar 将回退到内存引擎来执行这些操作。用户无需了解这一点,但它对于调试内存或性能问题可能很有用。
要检查流式查询的物理计划,可以绘制物理图。图例显示了操作的内存占用情况
需要先安装graphviz环境,下载地址
Python
1import polars as pl
2import matplotlib
3matplotlib.use('TkAgg') # 更换为TkAgg后端,处理Matplotlib与PyCharm集成存在的兼容性问题
4
5if __name__ == '__main__':
6
7 q1 = (
8 pl.scan_csv("polars\iris\iris.csv")
9 .filter(pl.col("sepal_length") > 5)
10 .group_by("species")
11 .agg(
12 mean_width=pl.col("sepal_width").mean(),
13 mean_width2=pl.col("sepal_width").sum() / pl.col("sepal_length").count(),
14 )
15 .show_graph(plan_stage="physical", engine="streaming")
16 )