saltar al contenido principal
paste
bin
.ca
type · paste · share
⌘
K
Docs
Iniciar sesión
?
← volver a la publicación
›
Editar / bifurcar
Publicación sin título
#56cT6ShEu9
public / public
nueva versión
anónimo
creado 2 days ago
Caduca en 5 days
9.5 KB
sintaxis:
text
Tus cambios crean una nueva publicación enlazada a esta — la original no se toca.
nueva versión
Tus cambios crean una nueva publicación enlazada a esta — la original no se toca.
Título (opcional)
Nombre de archivo
Sintaxis
text
text
bash
c
cpp
css
diff
dockerfile
go
html
ini
java
javascript
json
kotlin
lua
makefile
markdown
nginx
php
python
ruby
rust
shellscript
sql
swift
toml
typescript
xml
yaml
Visibilidad
Feed público
Acceso
public
Caduca
7 días
10 min
1 hora
1 día
7 días
30 días
90 días
personalizada…
Caducidad personalizada
Nota de cambio
(opcional)
Esta publicación aparecerá en el feed público. Cambia Visibilidad si solo quieres compartirla por enlace.
Crear nueva versión
Cancelar
Pegar o escribir…
# -*- coding: UTF-8 -*- import sys import os os.environ['PYSPARK_PYTHON']='/home/zhangyu/anaconda3/bin/python3' from time import time import pandas as pd import matplotlib.pyplot as plt from pyspark import SparkConf, SparkContext from pyspark.mllib.tree import DecisionTree from pyspark.mllib.regression import LabeledPoint import numpy as np from pyspark.mllib.evaluation import RegressionMetrics import math def SetLogger( sc ): logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR) def extract_label(record): label=(record[-1]) return float(label) def convert_float(x): return (0 if x=="?" else float(x)) def extract_features(record,featureEnd): featureSeason=[convert_float(field) for field in record[2]] features=[convert_float(field) for field in record[4: featureEnd-2]] return np.concatenate( (featureSeason, features)) def PrepareData(sc): #----------------------1.导入并转换数据------------- print("开始导入数据...") rawDataWithHeader = sc.textFile("hdfs://127.0.0.1:9000/hour.csv") header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) lines = rawData.map(lambda x: x.split(",")) print (lines.first()) print("共计:" + str(lines.count()) + "项") #----------------------2.建立训练评估所需数据 RDD[LabeledPoint]------------- labelpointRDD = lines.map(lambda r:LabeledPoint( extract_label(r), extract_features(r,len(r) - 1))) print(labelpointRDD.first()) #----------------------3.以随机方式将数据分为3个部分并且返回------------- (trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1]) print("将数据分trainData:" + str(trainData.count()) + " validationData:" + str(validationData.count()) + " testData:" + str(testData.count())) #print(labelpointRDD.first()) return (trainData, validationData, testData) #返回数据 def PredictData(sc,model): #----------------------1.导入并转换数据------------- print("开始导入数据...") rawDataWithHeader = sc.textFile("hdfs://127.0.0.1:9000/hour.csv") header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) lines = rawData.map(lambda x: x.split(",")) #print (lines.first()) print("共计:" + str(lines.count()) + "项") #----------------------2.建立训练评估所需数据 LabeledPoint RDD------------- labelpointRDD = lines.map(lambda r: LabeledPoint( extract_label(r), extract_features(r,len(r) - 1))) #----------------------3.定义字典---------------- SeasonDict = { 1 : "春", 2 : "夏", 3 :"秋", 4 : "冬" } HoildayDict={ 0 : "非假日", 1 : "假日" } WeekDict = {0:"一",1:"二",2:"三",3:"四",4 :"五",5:"六",6:"日"} WorkDayDict={ 1 : "工作日", 0 : "非工作日" } WeatherDict={ 1 : "晴", 2 : "阴", 3 : "小雨", 4 : "大雨" } #----------------------4.进行预测并显示结果-------------- for lp in labelpointRDD.take(100): predict = int(model.predict(lp.features)) label=lp.label features=lp.features result = ("正确" if (label == predict) else "错误") error = math.fabs(label - predict) dataDesc=" 特征: "+SeasonDict[features[0]] +"季,"+\ str(features[1]) + "月," +\ str(features[2]) + "时,"+ \ HoildayDict[features[3]] +","+\ "星期"+WeekDict[features[4]]+","+ \ WorkDayDict[features[5]]+","+\ WeatherDict[features[6]]+","+\ str(features[7] * 41)+ "度,"+\ "体感" + str(features[8] * 50) + "度," +\ "湿度" + str(features[9] * 100) + ","+\ "风速" + str(features[10] * 67) +\ " ==> 预测结果:" + str(predict )+\ " , 实际:" + str(label) + result +", 误差:" + str(error) print(dataDesc) def evaluateModel(model, validationData): score = model.predict(validationData.map(lambda p: p.features)) scoreAndLabels=score.zip(validationData.map(lambda p: p.label)) metrics = RegressionMetrics(scoreAndLabels) RMSE=metrics.rootMeanSquaredError return( RMSE) def trainEvaluateModel(trainData,validationData, impurityParm, maxDepthParm, maxBinsParm): startTime = time() model = DecisionTree.trainRegressor(trainData, categoricalFeaturesInfo={}, \ impurity=impurityParm, maxDepth=maxDepthParm, maxBins=maxBinsParm) RMSE = evaluateModel(model, validationData) duration = time() - startTime print ("训练评估:使用参数" + \ " impurityParm= %s"%impurityParm+ \ " maxDepthParm= %s"%maxDepthParm+ \ " maxBinsParm = %d."%maxBinsParm + \ " 所需时间=%d"%duration + \ " 结果RMSE = %f " % RMSE ) return (RMSE,duration, impurityParm, maxDepthParm, maxBinsParm,model) def evalParameter(trainData, validationData, evaparm,impurityList, maxDepthList, maxBinsList): metrics = [trainEvaluateModel(trainData, validationData, impurity,maxdepth, maxBins ) for impurity in impurityList for maxdepth in maxDepthList for maxBins in maxBinsList ] if evaparm=="impurity": IndexList=impurityList[:] elif evaparm=="maxDepth": IndexList=maxDepthList[:] elif evaparm=="maxBins": IndexList=maxBinsList[:] df = pd.DataFrame(metrics,index=IndexList, columns=['RMSE', 'duration','impurityParm', 'maxDepthParm', 'maxBinsParm','model']) showchart(df,evaparm,'RMSE','duration',0,200 ) def showchart(df,evalparm ,barData,lineData,yMin,yMax): ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12) ax.set_xlabel(evalparm,fontsize=12) ax.set_ylim([yMin,yMax]) ax.set_ylabel(barData,fontsize=12) ax2 = ax.twinx() ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r') plt.show() def evalAllParameter(training_RDD, validation_RDD, impurityList, maxDepthList, maxBinsList): metrics = [trainEvaluateModel(trainData, validationData, impurity,maxdepth, maxBins ) for impurity in impurityList for maxdepth in maxDepthList for maxBins in maxBinsList ] Smetrics = sorted(metrics, key=lambda k: k[0]) bestParameter=Smetrics[0] print("调校后最佳参数:impurity:" + str(bestParameter[2]) + " ,maxDepth:" + str(bestParameter[3]) + " ,maxBins:" + str(bestParameter[4]) + " ,结果RMSE = " + str(bestParameter[0])) return bestParameter[5] def parametersEval(training_RDD, validation_RDD): print("----- 评估maxDepth参数使用 ---------") evalParameter(training_RDD, validation_RDD,"maxDepth", impurityList=["variance"], maxDepthList =[3, 5, 10, 15, 20, 25] , maxBinsList=[10]) print("----- 评估maxBins参数使用 ---------") evalParameter(training_RDD, validation_RDD,"maxBins", impurityList=["variance"], maxDepthList=[10], maxBinsList=[3, 5, 10, 50, 100, 200 ]) def CreateSparkContext(): sparkConf = SparkConf() \ .setAppName("RunDecisionTreeRegression") \ .set("spark.ui.showConsoleProgress", "false") sc = SparkContext(conf = sparkConf) print ("master="+sc.master) SetLogger(sc) return (sc) if __name__ == "__main__": print("RunDecisionTreeRegression") sc=CreateSparkContext() print("==========数据准备阶段===============") (trainData, validationData, testData) =PrepareData(sc) trainData.persist(); validationData.persist(); testData.persist() print("==========训练评估阶段===============") (AUC,duration, impurityParm, maxDepthParm, maxBinsParm,model)= \ trainEvaluateModel(trainData, validationData, "variance", 10, 100) if (len(sys.argv) == 2) and (sys.argv[1]=="-e"): parametersEval(trainData, validationData) elif (len(sys.argv) == 2) and (sys.argv[1]=="-a"): print("-----所有参数训练评估找出最好的参数组合---------") model=evalAllParameter(trainData, validationData, ["variance"], [3, 5, 10, 15, 20, 25], [3, 5, 10, 50, 100, 200 ]) print("==========测试阶段===============") RMSE = evaluateModel(model, testData) print("使用test Data测试最佳模型,结果 RMSE:" + str(RMSE)) print("==========预测数据===============") PredictData(sc, model) #print(model.toDebugString())