案例系列:客户流失预测

文章目录

  • 介绍:使用Spark进行特征工程
    • 使用PySpark的Spark
    • 设置Spark
    • 测试Spark
      • Spark 仪表盘
    • 数据存储
  • 特征工程
      • 一个分区的特征
    • 自定义基元
        • 运行深度特征合成
  • 将分区转换为特征矩阵的函数
      • 测试函数
  • 使用Spark运行
    • 合并数据
  • 结论
    • 下一步

介绍:使用Spark进行特征工程

问题:在“特征工程”中,我们开发了一个自动化特征工程的流水线,使用客户交易和标签时间的数据集。在单个客户分区上运行此流水线需要大约15分钟,这意味着如果一个一个地完成所有功能,则需要几天时间。

解决方案:将数据集分成独立的客户分区,并并行运行多个子集。这可以使用单个机器上的多个处理器或机器集群来完成。

使用PySpark的Spark

[Apache Spark](http://spark.apache.org)是用于分布式计算和大数据处理的流行框架。它允许我们在单个机器上或分布在机器集群上并行运行计算。在这个笔记本中,我们将使用[PySpark库](http://spark.apache.org/docs/2.2.0/api/python/pyspark.html)在[Featuretools](https://github.com/Featuretools/featuretools)中运行自动化特征工程。

第一步是初始化Spark。我们可以使用“findspark”库确保“pyspark”可以在Jupyter Notebook中找到Spark。本笔记本假定Spark集群已经运行。要开始使用Spark集群,请参阅[此指南](https://data-flair.training/blogs/install-apache-spark-multi-node-cluster/)。

(我们将跳过本笔记本中的Featuretools详细信息,但是有关介绍,请参见[本文](https://towardsdatascience.com/automated-feature-engineering-in-python-99baf11cc219)。有关手动和自动特征工程的比较,请参见[本文](https://towardsdatascience.com/why-automated-feature-engineering-will-change-the-way-you-do-machine-learning-5c15bf188b96)。)

# 导入findspark模块
import findspark
# 根据你的安装情况进行更新
findspark.init()

设置Spark

SparkContext是与正在运行的Spark集群交互的接口。我们使用SparkConf对象向SparkContext传递许多参数。具体来说,我们将打开日志记录,告诉Spark在我们的机器上使用12个核心,并将Spark指向主节点(父节点)的位置。

根据您的集群设置调整参数。我发现这个指南在选择参数方面很有帮助。

# 导入pyspark模块
import pyspark

# 根据你的安装更新配置
conf = pyspark.SparkConf()

# 启用日志记录
conf.set('spark.eventLog.enabled', True);
conf.set('spark.eventLog.dir', './data/tmp/');

# 使用所有机器上的所有核心
conf.set('spark.num.executors', 1)
conf.set('spark.executor.memory', '12g')
conf.set('spark.executor.cores', 12)

# 设置父节点
conf.set('spark.master', 'spark://AMB-R09BLVCJ:7077')

# 获取所有配置信息
conf.getAll()

测试Spark

在进行特征工程之前,我们想要测试我们的集群是否正常运行。我们将实例化一个Spark集群,并运行一个简单的程序来计算pi的值。

sc = pyspark.SparkContext(appName="pi_calc", 
                          conf = conf)
sc
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/usr/local/Cellar/apache-spark/3.2.1/libexec/jars/spark-unsafe_2.12-3.2.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/09 16:33:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

案例系列:客户流失预测

num_samples = 100000000
import random

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

# Parallelize counting samples inside circle using Spark
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()
                                                                                
3.14170524

Spark 仪表盘

在从命令行启动 Spark 集群之后,在运行笔记本中的任何代码之前,您可以在 localhost:8080 上查看集群的仪表盘。这显示基本信息,如工作节点的数量以及当前正在运行或已完成的作业。

案例系列:客户流失预测

一旦初始化了 SparkContext,可以在 localhost:4040 上查看作业。这显示特定的细节,如已完成的任务数量和操作的有向无环图。

案例系列:客户流失预测

使用 Web 仪表盘可以帮助调试集群。

一旦集群正常运行,我们可以开始进行特征工程。

数据存储

在之前的笔记本中,我们对数据进行了分区,并为前50个分区创建了特征矩阵。通常情况下,运行Spark的所有读写都会通过S3进行,但是在这个示例中,我们将使用本地文件。

# 导入必要的库
import os
import pandas as pd
import featuretools as ft
import warnings

# 忽略警告信息
warnings.simplefilter('ignore')

# 设置分区数和当前工作目录
partition = 20
CWD = os.getcwd()

# 设置数据文件目录和截断时间文件名
directory = f'{CWD}/data/partitions/p' + str(partition)
cutoff_times_file = 'MS-31_labels.csv'

# 读取数据文件
members = pd.read_csv(f'{directory}/members.csv', 
                  parse_dates=['registration_init_time'], 
                  infer_datetime_format = True, 
                  dtype = {'gender': 'category'})

trans = pd.read_csv(f'{directory}/transactions.csv',
                   parse_dates=['transaction_date', 'membership_expire_date'], 
                    infer_datetime_format = True)

logs = pd.read_csv(f'{directory}/logs.csv', parse_dates = ['date'])

cutoff_times = pd.read_csv(f'{directory}/{cutoff_times_file}', parse_dates = ['time'])
cutoff_times = cutoff_times.drop_duplicates(subset = ['msno', 'time'])
/Users/nate.parsons/dev/open_source_demos/env/lib/python3.8/site-packages/statsmodels/compat/pandas.py:65: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
  from pandas import Int64Index as NumericIndex

特征工程

首先,我们将使用单个分区来创建特征集,这样我们就不必为每个分区重新计算它们。这也确保了对于每个客户子集都创建了完全相同的特征。(还可以从磁盘加载计算好的特征。)再次,我跳过了这里正在进行的操作的解释,所以请查看Featuretools文档或一些在线教程。

一个分区的特征

# 创建一个空的实体集
es = ft.EntitySet(id = 'customers')

# 添加成员父表
es.add_dataframe(dataframe_name='members', dataframe=members,
                 index = 'msno', time_index = 'registration_init_time', 
                 logical_types = {'city': 'Categorical', 'bd': 'Categorical',
                                  'registered_via': 'Categorical'})

# 在交易中创建新的特征
trans['price_difference'] = trans['plan_list_price'] - trans['actual_amount_paid']
trans['planned_daily_price'] = trans['plan_list_price'] / trans['payment_plan_days']
trans['daily_price'] = trans['actual_amount_paid'] / trans['payment_plan_days']

# 添加交易子表
es.add_dataframe(dataframe_name='transactions', dataframe=trans,
                 index = 'transactions_index', make_index = True,
                 time_index = 'transaction_date', 
                 logical_types = {'payment_method_id': 'Categorical', 
                                  'is_auto_renew': 'Boolean', 'is_cancel': 'Boolean'})

# 添加交易有趣的值
es.add_interesting_values(dataframe_name='transactions',
                          values={'is_cancel': [False, True],
                                  'is_auto_renew': [False, True]})

# 在日志中创建新的特征
logs['total'] = logs[['num_25', 'num_50', 'num_75', 'num_985', 'num_100']].sum(axis = 1)
logs['percent_100'] = logs['num_100'] / logs['total']
logs['percent_unique'] = logs['num_unq'] / logs['total']

# 添加日志子表
es.add_dataframe(dataframe_name='logs', dataframe=logs,
                 index = 'logs_index', make_index = True,
                 time_index = 'date')

# 添加关系
r_member_transactions = ft.Relationship(es, 'members', 'msno', 'transactions', 'msno')
r_member_logs = ft.Relationship(es, 'members', 'msno', 'logs', 'msno')
es.add_relationships([r_member_transactions, r_member_logs])

# 返回实体集
es

自定义基元

以下是我们为此数据集编写的自定义基元(请参见“特征工程”笔记本)。它计算上个月某种数量的总量。

def total_previous_month(numeric, datetime, time):
    """返回`time`之前一个月的`numeric`列的总和。"""
    df = pd.DataFrame({'value': numeric, 'date': datetime})
    previous_month = time.month - 1
    year = time.year
   
    # 处理一月份
    if previous_month == 0:
        previous_month = 12
        year = time.year - 1
        
    # 过滤数据并计算总和
    df = df[(df['date'].dt.month == previous_month) & (df['date'].dt.year == year)]
    total = df['value'].sum()
    
    return total
# 导入所需的库
from featuretools.primitives import make_agg_primitive
from woodwork.column_schema import ColumnSchema
from woodwork.logical_types import Datetime

# 创建一个数值类型的列模式
numeric = ColumnSchema(semantic_tags={'numeric'})

# 创建一个日期时间类型的列模式
datetime = ColumnSchema(logical_type=Datetime)

# 创建一个聚合原语,接受一个数字和一个日期时间作为输入,返回一个数字作为输出
total_previous = make_agg_primitive(total_previous_month, input_types=[numeric, datetime],
                                    return_type=numeric, 
                                    uses_calc_time=True)
运行深度特征合成

第一次创建特征时,我们使用ft.dfs函数,传入所选的基元、目标数据帧、关键的cutoff_time、要堆叠的特征的深度以及其他几个参数。

# 指定聚合特征
agg_primitives = ['sum', 'time_since_last', 'avg_time_between', 'num_unique', 'min', 'last', 
                  'percent_true', 'max', 'count']

# 指定转换特征
trans_primitives = ['is_weekend', 'cum_sum', 'day', 'month', 'time_since_previous']

# 指定where特征
where_primitives = ['sum', 'mean', 'percent_true']
# 运行深度特征合成
# feature_matrix:特征矩阵,包含生成的特征
# feature_defs:特征定义,包含生成的特征的详细信息
feature_matrix, feature_defs = ft.dfs(entityset=es, target_dataframe_name='members', 
                                      cutoff_time = cutoff_times, 
                                      agg_primitives = agg_primitives,
                                      trans_primitives = trans_primitives,
                                      where_primitives = where_primitives,
                                      max_depth = 2, features_only = False,
                                      chunk_size = 100, n_jobs = 1, verbose = 1)
Built 316 features
Elapsed: 24:05 | Progress: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████

这些特征定义可以保存在磁盘上。每次我们想要生成完全相同的特征时,只需将它们传递给ft.calculate_feature_matrix函数。

# 将特征定义保存到文件中
ft.save_features(feature_defs, f'{CWD}/data/features.txt')
# 加载特征定义文件
feature_defs = ft.load_features(f'{CWD}/data/features.txt')
# 打印特征数量
print(f'There are {len(feature_defs)} features.')
There are 316 features.

将分区转换为特征矩阵的函数

本笔记本的主要功能是从单个分区中生成特征。

这个函数 partition_to_feature_matrix 执行以下操作:

  1. 接收一个分区的名称
  2. 从分区目录中读取数据
  3. 从数据中创建一个实体集
  4. 计算该分区的特征矩阵
  5. 将特征矩阵保存到分区目录中

请注意,由于时间和磁盘空间的限制,我们只会在前20个分区上运行此函数,而不是在第一个笔记本中创建的全部1000个分区上运行。

# 定义一个常量,表示分区数量
N_PARTITIONS = 20
# 定义一个基础目录,用于存储分区数据
BASE_DIR = f'{CWD}/data/partitions/'

def partition_to_feature_matrix(partition, feature_defs = feature_defs, 
                                cutoff_time_name = 'MS-31_labels.csv', write = True):
    """输入一个分区号,创建特征矩阵,并保存到磁盘上
    
    参数
    --------
        partition (int): 分区号
        feature_defs (list of ft features): 分区要创建的特征
        cutoff_time_name (str): 截止时间文件的名称
        write: (boolean): 是否将数据写入磁盘。默认为True
        
    返回
    --------
        None: 将特征矩阵保存到磁盘上
    
    """
    
    # 拼接分区目录
    partition_dir = BASE_DIR + 'p' + str(partition)
    
    # 读取数据文件
    members = pd.read_csv(f'{partition_dir}/members.csv', 
                      parse_dates=['registration_init_time'], 
                      infer_datetime_format = True, 
                      dtype = {'gender': 'category'})

    trans = pd.read_csv(f'{partition_dir}/transactions.csv',
                       parse_dates=['transaction_date', 'membership_expire_date'], 
                        infer_datetime_format = True)
    logs = pd.read_csv(f'{partition_dir}/logs.csv', parse_dates = ['date'])
    
    # 确保删除重复项
    cutoff_times = pd.read_csv(f'{partition_dir}/{cutoff_time_name}', parse_dates = ['time'])
    cutoff_times = cutoff_times.drop_duplicates(subset = ['msno', 'time'])
    
    # 保存所需的截止时间
    cutoff_spec = cutoff_time_name.split('_')[0]
    
    # 创建空实体集
    es = ft.EntitySet(id = 'customers')

    # 添加父表members
    es.add_dataframe(dataframe_name='members', dataframe=members,
                     index = 'msno', time_index = 'registration_init_time', 
                     logical_types = {'city': 'Categorical',
                                      'registered_via': 'Categorical'})
    # 在transactions中创建新特征
    trans['price_difference'] = trans['plan_list_price'] - trans['actual_amount_paid']
    trans['planned_daily_price'] = trans['plan_list_price'] / trans['payment_plan_days']
    trans['daily_price'] = trans['actual_amount_paid'] / trans['payment_plan_days']

    # 添加子表transactions
    es.add_dataframe(dataframe_name='transactions', dataframe=trans,
                     index = 'transactions_index', make_index = True,
                     time_index = 'transaction_date', 
                     logical_types = {'payment_method_id': 'Categorical', 
                                      'is_auto_renew': 'Boolean', 'is_cancel': 'Boolean'})

    # 添加transactions的有趣值
    es.add_interesting_values(dataframe_name='transactions',
                              values={'is_cancel': [False, True],
                                      'is_auto_renew': [False, True]})
    
    # 在logs中创建新特征
    logs['total'] = logs[['num_25', 'num_50', 'num_75', 'num_985', 'num_100']].sum(axis = 1)
    logs['percent_100'] = logs['num_100'] / logs['total']
    logs['percent_unique'] = logs['num_unq'] / logs['total']
    logs['seconds_per_song'] = logs['total_secs'] / logs['total'] 
    
    # 添加子表logs
    es.add_dataframe(dataframe_name='logs', dataframe=logs,
                     index = 'logs_index', make_index = True,
                     time_index = 'date')

    # 添加关系
    r_member_transactions = ft.Relationship(es, 'members', 'msno', 'transactions', 'msno')
    r_member_logs = ft.Relationship(es, 'members', 'msno', 'logs', 'msno')
    es.add_relationships([r_member_transactions, r_member_logs])
    
    # 使用预先计算的特征计算特征矩阵
    feature_matrix = ft.calculate_feature_matrix(entityset=es, features=feature_defs, 
                                                 cutoff_time=cutoff_times, cutoff_time_in_index = True,
                                                 chunk_size = 1000)

    if write:
        # 将特征矩阵写入磁盘
        bytes_to_write = feature_matrix.to_csv(None).encode()

        with open(f'{partition_dir}/{cutoff_spec}_feature_matrix.csv', 'wb') as f:
            f.write(bytes_to_write)

测试函数

让我们使用2个不同的分区对函数进行测试。

# 导入计时器模块
from timeit import default_timer as timer

# 记录开始时间
start = timer()

# 调用函数partition_to_feature_matrix,传入参数10、feature_defs、'MS-31_labels.csv',并设置write参数为True
partition_to_feature_matrix(10, feature_defs, 'MS-31_labels.csv', write=True)

# 记录结束时间
end = timer()

# 打印程序运行时间
print(f'{round(end - start)} seconds elapsed.')
227 seconds elapsed.

# 读取csv文件并存储在feature_matrix变量中
# f'{BASE_DIR}/p10/MS-31_feature_matrix.csv'是文件的路径,BASE_DIR是文件所在的目录
# low_memory=False表示不对数据类型进行自动推断,以节省内存
feature_matrix = pd.read_csv(f'{BASE_DIR}/p10/MS-31_feature_matrix.csv', low_memory=False)

# 显示feature_matrix的前几行数据
feature_matrix.head()

案例系列:客户流失预测

start = timer()
partition_to_feature_matrix(19, feature_defs, 'MS-31_labels.csv', write=True)
end = timer()
print(f'{round(end - start)} seconds elapsed.')
269 seconds elapsed.

# 读取csv文件,并将结果存储在feature_matrix变量中
feature_matrix = pd.read_csv(f'{BASE_DIR}/p19/MS-31_feature_matrix.csv', low_memory = False)

# 显示feature_matrix的前几行数据
feature_matrix.head()

案例系列:客户流失预测

使用Spark运行

下一个单元格使用Spark并行化进行所有特征工程计算。我们希望将分区映射到函数,并让Spark在执行器之间分配工作,每个执行器都是一台机器上的一个核心。

# 创建分区列表
partitions = list(range(N_PARTITIONS))

# 创建Spark上下文 - 根据您的配置进行更新
sc = pyspark.SparkContext(master='spark://AMB-R09BLVCJ:7077',
                          appName='featuretools', conf=conf)

# 并行化特征工程
r = sc.parallelize(partitions, numSlices=N_PARTITIONS).\
    map(lambda x: partition_to_feature_matrix(x, feature_defs,
                                              'MS-31_labels.csv')).collect()
sc.stop()
                                                                                

以下是集群的整体状态。

案例系列:客户流失预测

以下是关于提交的作业的信息。

案例系列:客户流失预测

合并数据

从这里开始,我们可以读取所有分区的特征矩阵并构建一个单一的特征矩阵,或者如果我们有一个支持增量(也称为在线)学习的模型,我们可以一次训练一个分区。

# 读取特征矩阵数据文件
feature_matrix = pd.read_csv(f'{BASE_DIR}/p19/MS-31_feature_matrix.csv', low_memory = False)

# 显示特征矩阵的前几行数据
feature_matrix.head()

案例系列:客户流失预测

结论

在这个笔记本中,我们看到了如何使用Spark框架在Featuretools中分布式特征工程。这个大数据处理技术让我们使用多台计算机并行计算,即使在大型数据集上也能实现高效的数据科学工作流程。

基本方法是:

  1. 将数据分成独立的分区
  2. 使用不同的工作器并行运行每个子集
  3. 如有必要,将结果合并在一起

使用Dask和Spark等框架的好处是我们不必改变底层的Featuretools代码。我们在本机Python中编写代码,更改运行计算的后端,并将计算分布在一组机器上。使用这种方法,我们将能够扩展到任何大小的数据集,并解决更令人兴奋的数据科学和机器学习问题。

下一步

机器学习流程的最后一步是构建模型,以预测这些特征。这在“建模”笔记本中实现。

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/c47359d9c5.html