メインコンテンツへスキップ
paste
bin
.ca
type · paste · share
⌘
K
ドキュメント
サインイン
?
← ペーストに戻る
›
編集 / フォーク
無題のペースト
#56cT6ShEu9
public / public
新しいバージョン
匿名
作成日 2 days ago
失効まで 5 days
9.5 KB
構文:
text
変更により、このペーストにリンクされた新しいペーストが作成されます — 元のペーストは変更されません。
新しいバージョン
変更により、このペーストにリンクされた新しいペーストが作成されます — 元のペーストは変更されません。
タイトル(任意)
ファイル名
構文
text
text
bash
c
cpp
css
diff
dockerfile
go
html
ini
java
javascript
json
kotlin
lua
makefile
markdown
nginx
php
python
ruby
rust
shellscript
sql
swift
toml
typescript
xml
yaml
可視性
パブリックフィード
アクセス
public
失効
7日
10分
1時間
1日
7日
30日
90日
カスタム…
カスタム失効
変更メモ
(任意)
このペーストはパブリックフィードに表示されます。リンクを知っている人だけに見せたい場合は可視性を変更してください。
新しいバージョンを作成
キャンセル
貼り付けるか入力…
# -*- coding: UTF-8 -*- import sys import os os.environ['PYSPARK_PYTHON']='/home/zhangyu/anaconda3/bin/python3' from time import time import pandas as pd import matplotlib.pyplot as plt from pyspark import SparkConf, SparkContext from pyspark.mllib.tree import DecisionTree from pyspark.mllib.regression import LabeledPoint import numpy as np from pyspark.mllib.evaluation import RegressionMetrics import math def SetLogger( sc ): logger = sc._jvm.org.apache.log4j logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR ) logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR ) logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR) def extract_label(record): label=(record[-1]) return float(label) def convert_float(x): return (0 if x=="?" else float(x)) def extract_features(record,featureEnd): featureSeason=[convert_float(field) for field in record[2]] features=[convert_float(field) for field in record[4: featureEnd-2]] return np.concatenate( (featureSeason, features)) def PrepareData(sc): #----------------------1.导入并转换数据------------- print("开始导入数据...") rawDataWithHeader = sc.textFile("hdfs://127.0.0.1:9000/hour.csv") header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) lines = rawData.map(lambda x: x.split(",")) print (lines.first()) print("共计:" + str(lines.count()) + "项") #----------------------2.建立训练评估所需数据 RDD[LabeledPoint]------------- labelpointRDD = lines.map(lambda r:LabeledPoint( extract_label(r), extract_features(r,len(r) - 1))) print(labelpointRDD.first()) #----------------------3.以随机方式将数据分为3个部分并且返回------------- (trainData, validationData, testData) = labelpointRDD.randomSplit([8, 1, 1]) print("将数据分trainData:" + str(trainData.count()) + " validationData:" + str(validationData.count()) + " testData:" + str(testData.count())) #print(labelpointRDD.first()) return (trainData, validationData, testData) #返回数据 def PredictData(sc,model): #----------------------1.导入并转换数据------------- print("开始导入数据...") rawDataWithHeader = sc.textFile("hdfs://127.0.0.1:9000/hour.csv") header = rawDataWithHeader.first() rawData = rawDataWithHeader.filter(lambda x:x !=header) lines = rawData.map(lambda x: x.split(",")) #print (lines.first()) print("共计:" + str(lines.count()) + "项") #----------------------2.建立训练评估所需数据 LabeledPoint RDD------------- labelpointRDD = lines.map(lambda r: LabeledPoint( extract_label(r), extract_features(r,len(r) - 1))) #----------------------3.定义字典---------------- SeasonDict = { 1 : "春", 2 : "夏", 3 :"秋", 4 : "冬" } HoildayDict={ 0 : "非假日", 1 : "假日" } WeekDict = {0:"一",1:"二",2:"三",3:"四",4 :"五",5:"六",6:"日"} WorkDayDict={ 1 : "工作日", 0 : "非工作日" } WeatherDict={ 1 : "晴", 2 : "阴", 3 : "小雨", 4 : "大雨" } #----------------------4.进行预测并显示结果-------------- for lp in labelpointRDD.take(100): predict = int(model.predict(lp.features)) label=lp.label features=lp.features result = ("正确" if (label == predict) else "错误") error = math.fabs(label - predict) dataDesc=" 特征: "+SeasonDict[features[0]] +"季,"+\ str(features[1]) + "月," +\ str(features[2]) + "时,"+ \ HoildayDict[features[3]] +","+\ "星期"+WeekDict[features[4]]+","+ \ WorkDayDict[features[5]]+","+\ WeatherDict[features[6]]+","+\ str(features[7] * 41)+ "度,"+\ "体感" + str(features[8] * 50) + "度," +\ "湿度" + str(features[9] * 100) + ","+\ "风速" + str(features[10] * 67) +\ " ==> 预测结果:" + str(predict )+\ " , 实际:" + str(label) + result +", 误差:" + str(error) print(dataDesc) def evaluateModel(model, validationData): score = model.predict(validationData.map(lambda p: p.features)) scoreAndLabels=score.zip(validationData.map(lambda p: p.label)) metrics = RegressionMetrics(scoreAndLabels) RMSE=metrics.rootMeanSquaredError return( RMSE) def trainEvaluateModel(trainData,validationData, impurityParm, maxDepthParm, maxBinsParm): startTime = time() model = DecisionTree.trainRegressor(trainData, categoricalFeaturesInfo={}, \ impurity=impurityParm, maxDepth=maxDepthParm, maxBins=maxBinsParm) RMSE = evaluateModel(model, validationData) duration = time() - startTime print ("训练评估:使用参数" + \ " impurityParm= %s"%impurityParm+ \ " maxDepthParm= %s"%maxDepthParm+ \ " maxBinsParm = %d."%maxBinsParm + \ " 所需时间=%d"%duration + \ " 结果RMSE = %f " % RMSE ) return (RMSE,duration, impurityParm, maxDepthParm, maxBinsParm,model) def evalParameter(trainData, validationData, evaparm,impurityList, maxDepthList, maxBinsList): metrics = [trainEvaluateModel(trainData, validationData, impurity,maxdepth, maxBins ) for impurity in impurityList for maxdepth in maxDepthList for maxBins in maxBinsList ] if evaparm=="impurity": IndexList=impurityList[:] elif evaparm=="maxDepth": IndexList=maxDepthList[:] elif evaparm=="maxBins": IndexList=maxBinsList[:] df = pd.DataFrame(metrics,index=IndexList, columns=['RMSE', 'duration','impurityParm', 'maxDepthParm', 'maxBinsParm','model']) showchart(df,evaparm,'RMSE','duration',0,200 ) def showchart(df,evalparm ,barData,lineData,yMin,yMax): ax = df[barData].plot(kind='bar', title =evalparm,figsize=(10,6),legend=True, fontsize=12) ax.set_xlabel(evalparm,fontsize=12) ax.set_ylim([yMin,yMax]) ax.set_ylabel(barData,fontsize=12) ax2 = ax.twinx() ax2.plot(df[[lineData ]].values, linestyle='-', marker='o', linewidth=2.0,color='r') plt.show() def evalAllParameter(training_RDD, validation_RDD, impurityList, maxDepthList, maxBinsList): metrics = [trainEvaluateModel(trainData, validationData, impurity,maxdepth, maxBins ) for impurity in impurityList for maxdepth in maxDepthList for maxBins in maxBinsList ] Smetrics = sorted(metrics, key=lambda k: k[0]) bestParameter=Smetrics[0] print("调校后最佳参数:impurity:" + str(bestParameter[2]) + " ,maxDepth:" + str(bestParameter[3]) + " ,maxBins:" + str(bestParameter[4]) + " ,结果RMSE = " + str(bestParameter[0])) return bestParameter[5] def parametersEval(training_RDD, validation_RDD): print("----- 评估maxDepth参数使用 ---------") evalParameter(training_RDD, validation_RDD,"maxDepth", impurityList=["variance"], maxDepthList =[3, 5, 10, 15, 20, 25] , maxBinsList=[10]) print("----- 评估maxBins参数使用 ---------") evalParameter(training_RDD, validation_RDD,"maxBins", impurityList=["variance"], maxDepthList=[10], maxBinsList=[3, 5, 10, 50, 100, 200 ]) def CreateSparkContext(): sparkConf = SparkConf() \ .setAppName("RunDecisionTreeRegression") \ .set("spark.ui.showConsoleProgress", "false") sc = SparkContext(conf = sparkConf) print ("master="+sc.master) SetLogger(sc) return (sc) if __name__ == "__main__": print("RunDecisionTreeRegression") sc=CreateSparkContext() print("==========数据准备阶段===============") (trainData, validationData, testData) =PrepareData(sc) trainData.persist(); validationData.persist(); testData.persist() print("==========训练评估阶段===============") (AUC,duration, impurityParm, maxDepthParm, maxBinsParm,model)= \ trainEvaluateModel(trainData, validationData, "variance", 10, 100) if (len(sys.argv) == 2) and (sys.argv[1]=="-e"): parametersEval(trainData, validationData) elif (len(sys.argv) == 2) and (sys.argv[1]=="-a"): print("-----所有参数训练评估找出最好的参数组合---------") model=evalAllParameter(trainData, validationData, ["variance"], [3, 5, 10, 15, 20, 25], [3, 5, 10, 50, 100, 200 ]) print("==========测试阶段===============") RMSE = evaluateModel(model, testData) print("使用test Data测试最佳模型,结果 RMSE:" + str(RMSE)) print("==========预测数据===============") PredictData(sc, model) #print(model.toDebugString())