PySpark自定义Transformer

使用python实现自定义Transformer以对pyspark的pipeline进行增强

一 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

class SelectColsTransformer(
Transformer, DefaultParamsReadable, DefaultParamsWritable
):
cols = Param(
Params._dummy(),
"cols",
"cols to select"
)

@keyword_only
def __init__(self, cols:list[float]=['*']):
super(SelectColsTransformer, self).__init__()
self._setDefault(cols=['*'])
kwargs = self._input_kwargs
self.setParams(**kwargs)


@keyword_only
def setParams(self, cols:list[float]=['*']):
kwargs = self._input_kwargs
return self._set(**kwargs)


def _transform(self, dataset):
return dataset.select(self.getOrDefault(self.cols));

PySpark自定义Transformer
https://linshenkx.github.io/pyspark-transformer/
作者
林泽浩
发布于
2022年3月16日
许可协议