用户自定义的Python函数
尽管Polars表达式非常强大灵活, 但有时我们依然需要自定义函数来解决问题. 本章中我们介绍两个API来实现此目的:
map_elements
: 对Series中的每个值调用函数
map_batches
: 始终将完整Series
传给函数
逐个处理: map_elements
1import polars as pl
2import hashlib
3df = pl.DataFrame({
4 "keys": ["a","a","b","b"],
5 "values": [1,2,3,4]
6})
7def func(x: int)-> str:
8 print(f"handle {x}")
9 return hashlib.md5(str(x).encode()).hexdigest()
10
11res = df.select(
12 pl.all(),
13 pl.col("values").map_elements(
14 function=func,
15 return_dtype=pl.String
16 ).alias("hash")
17)
18print(res)
可以看map_elements
会依次调用函数, 一共调用了4次
1handle 1
2handle 2
3handle 3
4handle 4
5shape: (4, 3)
6┌──────┬────────┬─────────────────────────────────┐
7│ keys ┆ values ┆ hash │
8│ --- ┆ --- ┆ --- │
9│ str ┆ i64 ┆ str │
10╞══════╪════════╪═════════════════════════════════╡
11│ a ┆ 1 ┆ c4ca4238a0b923820dcc509a6f7584… │
12│ a ┆ 2 ┆ c81e728d9d4c2f636f067f89cc1486… │
13│ b ┆ 3 ┆ eccbc87e4b5ce2fe28308fd9f2a7ba… │
14│ b ┆ 4 ┆ a87ff679a2f3e71d9181a67b754212… │
15└──────┴────────┴─────────────────────────────────┘
WARNING
如果我们的的函数可以使用表达式就能完成, 那么Polars会提示我们使用表达式来代替, 看下面代码
1import polars as pl
2
3df = pl.DataFrame({
4 "keys": ["a","a","b","b"],
5 "values": [1,2,3,4]
6})
7
8res = df.select(
9 pl.all(),
10 pl.col("values").map_elements(
11 function=lambda x: x+1,
12 return_dtype=pl.Int32
13 ).alias("add")
14)
15print(res)
1shape: (4, 3)
2┌──────┬────────┬─────┐
3│ keys ┆ values ┆ add │
4│ --- ┆ --- ┆ --- │
5│ str ┆ i64 ┆ i32 │
6╞══════╪════════╪═════╡
7│ a ┆ 1 ┆ 2 │
8│ a ┆ 2 ┆ 3 │
9│ b ┆ 3 ┆ 4 │
10│ b ┆ 4 ┆ 5 │
11└──────┴────────┴─────┘
12PolarsInefficientMapWarning:
13Expr.map_elements is significantly slower than the native expressions API.
14Only use if you absolutely CANNOT implement your logic otherwise.
15Replace this expression...
16 - pl.col("values").map_elements(lambda x: ...)
17with this one instead:
18 + pl.col("values") + 1
19
20 pl.col("values").map_elements(
map_elements()
存在两个问题:
- 仅限于单个值: 通常需要对
Series
整体计算, 而不是针对单个项目进行计算
- 性能开销: 为每个值单独调用函数会增加很多开销
整体处理: map_batches
下面的代码计算Series
中每个值和整个Series
平均值的差值
1import polars as pl
2
3df = pl.DataFrame({
4 "keys": ["a","a","b","b"],
5 "values": [1,2,3,4]
6})
7
8
9def diff_from_mean(series: pl.Series) -> pl.Series:
10 """
11 计算序列中每个值和整个序列平均值的差值
12 :param series: 需要计算的Series
13 :return: 返回差值Series
14 """
15 total = 0
16 for value in series:
17 total += value
18 mean = total / series.len()
19 print("mean: ", mean)
20 diff = []
21 for value in series:
22 diff.append(value-mean)
23 return pl.Series(diff)
24
25res = df.select(
26 pl.all(),
27 pl.col("values").map_batches(diff_from_mean).alias("diff")
28)
29print(res)
1mean: 2.5
2shape: (4, 3)
3┌──────┬────────┬──────┐
4│ keys ┆ values ┆ diff │
5│ --- ┆ --- ┆ --- │
6│ str ┆ i64 ┆ f64 │
7╞══════╪════════╪══════╡
8│ a ┆ 1 ┆ -1.5 │
9│ a ┆ 2 ┆ -0.5 │
10│ b ┆ 3 ┆ 0.5 │
11│ b ┆ 4 ┆ 1.5 │
12└──────┴────────┴──────┘
使用用户自定义函数进行快速操作
纯Python实现的问题在于速度慢, 如果想要快速获得结果, 需要尽量减少Python代码的调用
为了最大限度地提高速度, Polars支持Numpy定义的一对接口, 称作: ufuncs
和通用ufuncs
. 前者对每个项目单独执行, 后者接受整个Numpy数组
来看一个使用numpy函数的例子
1import numpy as np
2import polars as pl
3df = pl.DataFrame({
4 "keys": ["a","a","b","b"],
5 "values": [1,2,3,4]
6})
7out = df.select(pl.col("values").map_batches(np.log))
8print(out)
1shape: (4, 1)
2┌──────────┐
3│ values │
4│ --- │
5│ f64 │
6╞══════════╡
7│ 0.0 │
8│ 0.693147 │
9│ 1.098612 │
10│ 1.386294 │
11└──────────┘
示例: 使用Numba
的快速自定义函数
Numpy提供的函数很有用, 但我们的目标是编写自己的函数, 如果我们想要一个上面示例中diff_from_mean
的快速版本,
在Python中编写此代码的最简单方法是Numba, 允许我们使用Python编写自定义函数,
同时享受编译代码的优势
Numba提供了一个名为@guvectorize
的装饰器, 通过将Python函数编译为快速的机器代码来创建通用的ufunc
函数, 以便Polars可以使用它
在以下示例中, diff_from_mean_numba()
将在导入时编译为快速的机器码, 这将花费一些时间.
之后,对该函数的所有调用都将快速运行. Series在传递给函数之前, 它将被转换为NumPy数组
1from numba import float64, guvectorize, int64
2
3
4# This will be compiled to machine code, so it will be fast. The Series is
5# converted to a NumPy array before being passed to the function. See the
6# Numba documentation for more details:
7# https://numba.readthedocs.io/en/stable/user/vectorize.html
8@guvectorize([(int64[:], float64[:])], "(n)->(n)")
9def diff_from_mean_numba(arr, result):
10 total = 0
11 for value in arr:
12 total += value
13 mean = total / len(arr)
14 for i, value in enumerate(arr):
15 result[i] = value - mean
16
17
18out = df.select(pl.col("values").map_batches(diff_from_mean_numba))
19print(out)
1shape: (4, 1)
2┌────────┐
3│ values │
4│ --- │
5│ f64 │
6╞════════╡
7│ -1.5 │
8│ -0.5 │
9│ 0.5 │
10│ 1.5 │
11└────────┘
调用通用ufunc时不允许缺少数据
传递用户自定义函数之前, 一个Series会被转换为Numpy数组, 由于Numpy数组没有缺失数据的概念, 所以如果原始Series中缺失数据,
那就意味着生成的Numpy数组与实际不匹配
在调用自定义函数之前需要填充缺失数据或者删除缺失数据
组合多个列值
如果要将多列传递给用户自定义函数, 可以使用Struct
, 其基本思想是将多列组合成Struct
, 然后函数可以提取这些列
1# Add two arrays together:
2@guvectorize([(int64[:], int64[:], float64[:])], "(n),(n)->(n)")
3def add(arr, arr2, result):
4 for i in range(len(arr)):
5 result[i] = arr[i] + arr2[i]
6
7
8df3 = pl.DataFrame({"values1": [1, 2, 3], "values2": [10, 20, 30]})
9
10out = df3.select(
11 # Create a struct that has two columns in it:
12 pl.struct(["values1", "values2"])
13 # Pass the struct to a lambda that then passes the individual columns to
14 # the add() function:
15 .map_batches(
16 lambda combined: add(
17 combined.struct.field("values1"), combined.struct.field("values2")
18 )
19 )
20 .alias("add_columns")
21)
22print(out)
1shape: (3, 1)
2┌─────────────┐
3│ add_columns │
4│ --- │
5│ f64 │
6╞═════════════╡
7│ 11.0 │
8│ 22.0 │
9│ 33.0 │
10└─────────────┘
返回类型
自定义的Python函数通常是黑盒的, Polars不知道我们的函数要干什么, 也不知道会返回什么. 因此返回值类型会被自动推断, 通过等待第一个非空值来实现这点,
然后这个值用于确定结果的类型
Python类型到Polars类型的映射如下
int
-> Int64
float
-> Float64
bool
-> Boolean
str
-> String
list[tp]
-> List[tp]
dict[str,[tp]]
-> Struct
Any
-> object
(尽量避免这种)