用户自定义的Python函数

尽管Polars表达式非常强大灵活, 但有时我们依然需要自定义函数来解决问题. 本章中我们介绍两个API来实现此目的:

  • map_elements: 对Series中的每个值调用函数
  • map_batches: 始终将完整Series传给函数

逐个处理: map_elements

1import polars as pl
2import hashlib
3df = pl.DataFrame({
4	"keys": ["a","a","b","b"],
5	"values": [1,2,3,4]
6})
7def func(x: int)-> str:
8    print(f"handle {x}")
9    return hashlib.md5(str(x).encode()).hexdigest()
10
11res = df.select(
12	pl.all(),
13	pl.col("values").map_elements(
14		function=func,
15		return_dtype=pl.String
16    ).alias("hash")
17)
18print(res)

可以看map_elements会依次调用函数, 一共调用了4次

1handle 1
2handle 2
3handle 3
4handle 4
5shape: (4, 3)
6┌──────┬────────┬─────────────────────────────────┐
7│ keys ┆ values ┆ hash                            │
8│ ---  ┆ ---    ┆ ---                             │
9│ str  ┆ i64    ┆ str                             │
10╞══════╪════════╪═════════════════════════════════╡
11│ a    ┆ 1      ┆ c4ca4238a0b923820dcc509a6f7584… │
12│ a    ┆ 2      ┆ c81e728d9d4c2f636f067f89cc1486… │
13│ b    ┆ 3      ┆ eccbc87e4b5ce2fe28308fd9f2a7ba… │
14│ b    ┆ 4      ┆ a87ff679a2f3e71d9181a67b754212… │
15└──────┴────────┴─────────────────────────────────┘
WARNING

如果我们的的函数可以使用表达式就能完成, 那么Polars会提示我们使用表达式来代替, 看下面代码

1import polars as pl
2
3df = pl.DataFrame({
4	"keys": ["a","a","b","b"],
5	"values": [1,2,3,4]
6})
7
8res = df.select(
9	pl.all(),
10	pl.col("values").map_elements(
11		function=lambda x: x+1,
12		return_dtype=pl.Int32
13    ).alias("add")
14)
15print(res)
1shape: (4, 3)
2┌──────┬────────┬─────┐
3│ keys ┆ values ┆ add │
4│ ---  ┆ ---    ┆ --- │
5│ str  ┆ i64    ┆ i32 │
6╞══════╪════════╪═════╡
7│ a    ┆ 1      ┆ 2   │
8│ a    ┆ 2      ┆ 3   │
9│ b    ┆ 3      ┆ 4   │
10│ b    ┆ 4      ┆ 5   │
11└──────┴────────┴─────┘
12PolarsInefficientMapWarning:
13Expr.map_elements is significantly slower than the native expressions API.
14Only use if you absolutely CANNOT implement your logic otherwise.
15Replace this expression...
16  - pl.col("values").map_elements(lambda x: ...)
17with this one instead:
18  + pl.col("values") + 1
19
20  pl.col("values").map_elements(

map_elements()存在两个问题:

  1. 仅限于单个值: 通常需要对Series整体计算, 而不是针对单个项目进行计算
  2. 性能开销: 为每个值单独调用函数会增加很多开销

整体处理: map_batches

下面的代码计算Series中每个值和整个Series平均值的差值

1import polars as pl
2
3df = pl.DataFrame({
4	"keys": ["a","a","b","b"],
5	"values": [1,2,3,4]
6})
7
8
9def diff_from_mean(series: pl.Series) -> pl.Series:
10	"""
11	计算序列中每个值和整个序列平均值的差值
12	:param series: 需要计算的Series
13	:return: 返回差值Series
14	"""
15	total = 0
16	for value in series:
17		total += value
18	mean = total / series.len()
19	print("mean: ", mean)
20	diff = []
21	for value in series:
22		diff.append(value-mean)
23	return pl.Series(diff)
24
25res = df.select(
26	pl.all(),
27	pl.col("values").map_batches(diff_from_mean).alias("diff")
28)
29print(res)
1mean:  2.5
2shape: (4, 3)
3┌──────┬────────┬──────┐
4│ keys ┆ values ┆ diff │
5│ ---  ┆ ---    ┆ ---  │
6│ str  ┆ i64    ┆ f64  │
7╞══════╪════════╪══════╡
8│ a    ┆ 1      ┆ -1.5 │
9│ a    ┆ 2      ┆ -0.5 │
10│ b    ┆ 3      ┆ 0.5  │
11│ b    ┆ 4      ┆ 1.5  │
12└──────┴────────┴──────┘

使用用户自定义函数进行快速操作

纯Python实现的问题在于速度慢, 如果想要快速获得结果, 需要尽量减少Python代码的调用

为了最大限度地提高速度, Polars支持Numpy定义的一对接口, 称作: ufuncs通用ufuncs. 前者对每个项目单独执行, 后者接受整个Numpy数组

来看一个使用numpy函数的例子

1import numpy as np
2import polars as pl
3df = pl.DataFrame({
4	"keys": ["a","a","b","b"],
5	"values": [1,2,3,4]
6})
7out = df.select(pl.col("values").map_batches(np.log))
8print(out)
1shape: (4, 1)
2┌──────────┐
3│ values   │
4│ ---      │
5│ f64      │
6╞══════════╡
7│ 0.0      │
8│ 0.693147 │
9│ 1.098612 │
10│ 1.386294 │
11└──────────┘

示例: 使用Numba的快速自定义函数

Numpy提供的函数很有用, 但我们的目标是编写自己的函数, 如果我们想要一个上面示例中diff_from_mean的快速版本, 在Python中编写此代码的最简单方法是Numba, 允许我们使用Python编写自定义函数, 同时享受编译代码的优势

Numba提供了一个名为@guvectorize的装饰器, 通过将Python函数编译为快速的机器代码来创建通用的ufunc函数, 以便Polars可以使用它

在以下示例中, diff_from_mean_numba()将在导入时编译为快速的机器码, 这将花费一些时间. 之后,对该函数的所有调用都将快速运行. Series在传递给函数之前, 它将被转换为NumPy数组

1from numba import float64, guvectorize, int64
2
3
4# This will be compiled to machine code, so it will be fast. The Series is
5# converted to a NumPy array before being passed to the function. See the
6# Numba documentation for more details:
7# https://numba.readthedocs.io/en/stable/user/vectorize.html
8@guvectorize([(int64[:], float64[:])], "(n)->(n)")
9def diff_from_mean_numba(arr, result):
10    total = 0
11    for value in arr:
12        total += value
13    mean = total / len(arr)
14    for i, value in enumerate(arr):
15        result[i] = value - mean
16
17
18out = df.select(pl.col("values").map_batches(diff_from_mean_numba))
19print(out)
1shape: (4, 1)
2┌────────┐
3│ values │
4│ ---    │
5│ f64    │
6╞════════╡
7│ -1.5   │
8│ -0.5   │
9│ 0.5    │
10│ 1.5    │
11└────────┘

调用通用ufunc时不允许缺少数据

传递用户自定义函数之前, 一个Series会被转换为Numpy数组, 由于Numpy数组没有缺失数据的概念, 所以如果原始Series中缺失数据, 那就意味着生成的Numpy数组与实际不匹配

在调用自定义函数之前需要填充缺失数据或者删除缺失数据

组合多个列值

如果要将多列传递给用户自定义函数, 可以使用Struct, 其基本思想是将多列组合成Struct, 然后函数可以提取这些列

1# Add two arrays together:
2@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")
3def add(arr, arr2, result):
4    for i in range(len(arr)):
5        result[i] = arr[i] + arr2[i]
6
7
8df3 = pl.DataFrame({"values1": [1, 2, 3], "values2": [10, 20, 30]})
9
10out = df3.select(
11    # Create a struct that has two columns in it:
12    pl.struct(["values1", "values2"])
13    # Pass the struct to a lambda that then passes the individual columns to
14    # the add() function:
15    .map_batches(
16        lambda combined: add(
17            combined.struct.field("values1"), combined.struct.field("values2")
18        )
19    )
20    .alias("add_columns")
21)
22print(out)
1shape: (3, 1)
2┌─────────────┐
3│ add_columns │
4│ ---         │
5│ f64         │
6╞═════════════╡
7│ 11.0        │
8│ 22.0        │
9│ 33.0        │
10└─────────────┘

返回类型

自定义的Python函数通常是黑盒的, Polars不知道我们的函数要干什么, 也不知道会返回什么. 因此返回值类型会被自动推断, 通过等待第一个非空值来实现这点, 然后这个值用于确定结果的类型

Python类型到Polars类型的映射如下

  • int -> Int64
  • float -> Float64
  • bool -> Boolean
  • str -> String
  • list[tp] -> List[tp]
  • dict[str,[tp]] -> Struct
  • Any -> object (尽量避免这种)