1.创建新的虚拟机
一、创建虚拟机 1.
2.
3.官网下载iso文件
6.随变取(maduit 密码:123)
7.
8.自定义文件
9.内存至少需要4GB
4核也够了
10.自动初始化
11.初始化完成,点击bigdata用户
然后虚拟机的安装就完成了
emmm ,但是书上说要
创建一个hadoop用户 1 sudo useradd -m hadoop -s /bin/bash
设置密码
切换用户
切换下载源
二、安装vim 1 sudo apt-get install vim
这边得切换到一开始创建的用户(bigdata)那边安装
不然会报这个错
1 hadoop is not in the sudoers file. This incident will be reported.
显示了这个说明hadoop用户没有root权限
然后在bigdata的用户里面打开这个命令
在最后一行加上
1 hadoop ALL=(ALL:ALL) ALL
保存关闭之后再切换回hadoop用户
然后再执行安装命令就行了
如果还是报错,在hadoop用户继续输入sudo visudo
这命令 然后保存在执行(我不知到空格有没有影响,反正保持和上面代码命令行一样的空格就好)然后Hadoop用户就可以使用sudo命令了
三、安装JDK环境 从百度网盘里的东西直接移动到downloads里面
如果不行的话,自己查找vmware tool的东西
复制完成
切换到管理员模式可以减少很多麻烦
1 2 3 sudo mkdir jvm #创建jvm文件夹用来放JDK文件 cd /home/hadoop/Downloads tar -zxvf ./jdk-8u162-linux-x64.tar.gz #解压
1 mv jdk1.8.0_162 /usr/lib/jvm #移动到jvm里
回到jvm文件那么发现多了一个
1 2 cd ~ #用于将当前工作目录切换到当前用户的主目录(home directory) vim ~/.bashrc
在最后输入
1 2 3 4 export JAVA_HOME="/usr/lib/jvm/jdk1.8.0_162" CLASSPATH=.:$JAVA_HOME/lib/tools.jar PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME CLASSPATH PATH
1 2 3 4 export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH
1 2 source ~/.bashrc java -version
返回这个就是好的
四、下载Hadoop 安装ssh 1 2 apt-get install openssh-server ssh localhost
1 2 3 cd ~/.ssh/ ssh-keygen -t rsa #全回车 cat ./id_rsa.pub >> ./authorized_keys
安装hadoop 找到原本下载hadoop安装包的文件夹解压
1 2 3 4 tar -zxvf hadoop-2.7.1.tar.gz sudo mv hadoop-2.7.1 /usr/local #移动 sudo mv hadoop-2.7.1 ./hadoop #改名 sudo chown -R hadoop ./hadoop #修改文件夹权限
看单机版hadoop信息
1 2 cd /usr/local/hadoop ./bin/hadoop version
因为需要伪分布式,继续伪分布式的配置
core-site.xml 1 2 cd /usr/local/hadoop/etc/hadoop vim core-site.xml
复制这个
1 2 3 4 5 6 7 8 9 10 11 12 <configuration> <property> <name>hadoop.tmp.dir</name> <value>file:/usr/local/hadoop/tmp</value> <description>Abase for other temporary directories.</description> </property> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
hdfs-site.xml 1 2 cd /usr/local/hadoop/etc/hadoop vim hdfs-site.xml
复制这个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/usr/local/hadoop/tmp/dfs/name</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/usr/local/hadoop/tmp/dfs/data</value> </property> </configuration>
hadoop-env.sh 1 2 cd /usr/local/hadoop/etc/hadoop vim hadoop-env.sh
找到这边添加一个
1 2 3 # The java implementation to use. export JAVA_HOME=${JAVA_HOME} export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
yarn-env.sh 1 2 cd /usr/local/hadoop/etc/hadoop vim yarn-env.sh
1 2 插入如下代码: export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
mapred-site.xml文件配置 1 2 cd /usr/local/hadoop/etc/hadoop vim mapred-site.xml
1 2 3 4 5 6 <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
yarn-site.xml配置 1 2 cd /usr/local/hadoop/etc/hadoop vim yarn-site.xml
1 2 3 4 5 6 7 8 9 10 11 <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.webapp.address</name> <value>192.168.2.10:8099</value> <description>这个地址是mr管理界面的</description> </property> </configuration>
执行名称结点格式化
1 2 cd /usr/local/hadoop ./bin/hdfs namenode -format
显示这个就是成功
ps:如果显示没有java啥的,重新看看vim ~/.bashrc
里面的东西在不在,然后source
启动hadoop 1 2 cd /usr/local/hadoop/ ./sbin/start-dfs.sh
打yes
再打
成功
但是还要干点东西,不知道为啥书上没看懂
1 2 3 4 5 我们在配置文件中配置了一些文件夹路径,现在我们来创建他们,在/usr/local/hadoop/目录下使用hadoop用户操作,建立tmp、hdfs/name、hdfs/data目录,执行如下命令: mkdir -p /usr/local/hadoop/tmp mkdir /usr/local/hadoop/hdfs mkdir /usr/local/hadoop/hdfs/data mkdir /usr//localhadoop/hdfs/name
将Hadoop添加到环境变量中 切换到root用户
插入
HADOOP_HOME 1 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
使之生效
确保 bin
目录已添加到 PATH
环境变量中
1 2 export PATH=/usr/local/hadoop/bin:$PATH source /etc/profile
验证 现在配置工作已经基本搞定,接下来只需要完成:1.格式化HDFS
文件、2.启动hadoop
、3.验证Hadoop
即可。
格式化 在使用Hadoop
之前我们需要格式化一些hadoop
的基本信息。 使用如下命令:
出现如下界面代表成功:
启动Hadoop 接下来我们启动Hadoop
:
这个是表示启动没成功,是因为root
用户现在还不能启动hadoop
,我们来设置一下就可以了。
在/hadoop/sbin
路径下: cd /usr/local/hadoop/sbin
。
将start-dfs.sh
,stop-dfs.sh
两个文件顶部添加以下参数
1 2 3 4 5 #!/usr/bin/env bash HDFS_DATANODE_USER=root HADOOP_SECURE_DN_USER=hdfs HDFS_NAMENODE_USER=root HDFS_SECONDARYNAMENODE_USER=root
还有,start-yarn.sh
,stop-yarn.sh
顶部也需添加以下:
1 2 3 4 #!/usr/bin/env bash YARN_RESOURCEMANAGER_USER=root HADOOP_SECURE_DN_USER=yarn YARN_NODEMANAGER_USER=root
再次启动start-dfs.sh
,最后输入命令 jps
验证,出现如下界面代表启动成功:
下次启动就不要再执行namenode -format那个命令了!!!!
五、HDFS操作常用Shell命令 1.1 启动HDFS服务 1.使用start-dfs.sh命令启动
2.使用JPS命令查看是否启动成功jps
1.2 help 命令
1.3 ls 命令 功能:显示目录信息
可以看到找到一个/user文件
1.4 mkdir 命令 功能:在 hdfs 上创建目录
1.在根目录的 user 文件夹下创建 hadoop 文件夹
1 2 cd /usr/local/hadoop hdfs dfs -mkdir /user/hadoop
2.使用 ls 命令查看,可以看到出现了hadoop 文件夹:
1.5 put 命令 功能:上传 Linux 系统中的文件到 HDFS 文件系统的指定目录
1.首先使用 vi 命令在 linux本地编辑一份数据文件 输入命令:vi stu01.txt
vi stu01.txt
按 i键 进入编辑模式,左下角出现–INSERT–字符后,输入以下内容,
234
5678
Hadoop
然后按ESC 键 ,左下角–INSERT–字符消失后,在英文输入状态下输入 :wq ,回车保存退出文件。
2.将刚刚创建的stu01.txt文件上传到HDFS上面
hdfs dfs -put stu01.txt /user/hadoop
3.使用 ls 命令查看,可以看到出现了stu01.txt文件
hdfs dfs -ls /user/hadoop
1.6 cat 命令 功能:显示文件内容
hdfs dfs -cat /user/hadoop/stu01.txt
1.7 text 命令 功能:以字符形式打印一个文件的内容
hdfs dfs -text /user/hadoop/stu01.txt
1.8 cp 命令 功能:从 hdfs 的一个路径拷贝 hdfs 的另一个路径
1.查看HDFS上的根目录下的内容
hdfs dfs -ls /
2.查看HDFS上的/user/hadoop下的内容hdfs dfs -ls /user/hadoop
3.将/user/hadoop下的stu01.txt拷贝到根目录下
hdfs dfs -cp /user/hadoop/stu01.txt /
4.再次查看HDFS上的根目录下的内容
hdfs dfs -ls /
从上面的图中可以看出文件已经被复制到了HDFS的根目录下。
1.9 mv 命令 功能:从 hdfs 的一个路径拷贝 hdfs 的另一个路径
1.在HDFS上面创建一个新的目录hdfs dfs -mkdir /user/hadoop2 hdfs dfs -ls /user
2.查看HDFS上的根目录下的内容hdfs dfs -ls /
3.将跟目录下的stu01.txt移动到/user/hadoop2下
hdfs dfs -mv /stu01.txt /user/hadoop2
4.再次查看HDFS上的根目录下的内容
hdfs dfs -ls /
5.查看/user/stu02下的内容hdfs dfs -ls /user/hadoop2
从上面的图中可以看出文件已经被移动到了/user/stu02的根目录下。
1.10 rm 命令 功能:删除 hdfs 文件或文件夹
1.将HDFS上面的/user/hadoop/stu01.txt文件删除
hdfs dfs -rm /user/hadoop/stu01.txt
hdfs dfs -ls /user/hadoop 没有显示stu01.txt结果,则正确。
2.将HDFS上面的/user/hadoop/input/文件夹删除hdfs dfs -rm -r /user/hadoop/input
hdfs dfs -ls /user/hadoop
删除成功
2.1 moveFromLocal命令 功能:从本地剪切粘贴到 hdfs
1.使用 vi 命令在 linux 本地先创建一个数据文件,stu01_2.txt
vi stu01_2.txt 按 i 进入编辑模式,输入内容,
**hadoop **
hive
然后按 esc 键,再同时按下 shift 和冒号键,输入 wq,保存退出。
2.查看本地目录中的文件
ls
3.将该文件从本地剪切粘贴到 hdfs
hdfs dfs -mkdir /user/hadoop/
hdfs dfs -moveFromLocal stu01-2.txt /user/hadoop/
4.查看本地文件
ls
5.查看HDFS上面的文件
hdfs dfs -ls /user/hadoop/
6.查看文件的内容
hdfs dfs -cat /user/hadoop/stu01-2.txt
可以看到和我们写入的内容一致,而且本地中也没有了该文件,该文件已经被剪切到了HDFS上面。
2.2 appendToFile 命令 功能:追加一个文件到已经存在的文件末尾
1.创建本地stu01.txt文件的内容
vi stu01.txt
按下i 键进入编辑模式,左下角出现–INSERT–后,将以下内容输入
234
5678
Hadoop
输入完成后,按下ESC键 ,左下角–INSERT–消失后,在英文状态下输入 :wq ,回车保存退出文件。
2.查看HDFS上面的stu01-2.txt文件的内容
hdfs dfs -cat /user/hadoop/stu01-2.txt
234
5678
hadoop
3.将本地stu01.txt文件的内容追加写到HDFS上面的stu01_2.txt文件中
hdfs dfs -appendToFile stu01.txt /user/hadoop/stu01-2.txt
4.再次查看HDFS上面的stu01-2.txt文件的内容
hdfs dfs -cat /user/hadoop/stu01-2.txt
2.3 get 命令 功能:等同于 copyToLocal,就是从 hdfs 下载文件到本地
1.查看本地目录中的文件
ls
2.将stu01_2.txt文件下载到本地
**hdfs dfs -get /user/hadoop/stu01-2.txt **
3.查看本地目录中的文件
ls
4.查看stu01-2.txt中的内容
cat stu01_2.txt
六、Spark下载 进入
1 2 3 4 cd /home/hadoop/Downloads tar -zxvf spark-2.1.0-bin-without-hadoop.tgz mv spark-2.1.0-bin-without-hadoop spark #改名 sudo spark /usr/local
加上权限
1 sudo chown -R hadoop:hadoop ./spark
复制一份
1 cp ./conf/spark-env.sh.template ./conf/spark-env.sh
编辑
1 export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
检验是否成功
1 2 cd /usr/local/spark bin/run-example SparkPi
过滤
1 bin/run-example SparkPi 2>&1 | grep "Pi is roughly"
七、在spark-shell中运行 1 2 cd /usr/local/spark ./bin/spark-shell
八、安装MySQL 1 sudo apt-get install mysql-server
启动mysql服务
1 2 service mysql stop 先手动关 service mysql start再手动开
看是否成功
1 sudo netstat -tap | grep mysql
没有权限
用
然后输入这个 密码输入是’123’
1 2 ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '123'; FLUSH PRIVILEGES;
好了
修改乱码问题
atin1
1 show variables like "char%";
修改配置文件
1 2 cd /etc/mysql/mysql.conf.d/ vim mysqlb.cnf
在mysqld.cnf添加
character_set_server=utf8
重启mysql
再进去mysql
再查看编码方式
1 show variables like "char%";
好了
mysql常用操作 1.1显示数据库
2.2显示数据库中的表
3.3显示数据表的结构
3.4查询表中的记录
3.5创建数据库 创建一个名称为aaa
3.6创建表 1 2 use 库名; create table 表名(字段设定列表);
1 2 use aaa; create table person (id int(3) auto_increment not null primary key,xm varchar(10),xb varchar(2),csny date);
显示person表的结构
3.7增加记录 1 2 insert into person values(null,'张三','男','1997-01-02'); insert into person values(null,'李四','女','1998-05-04');
用select查询person表的记录
3.8修改记录 1 update person set csny='1971-01-10' where xm='张三';
3.9删除记录 1 delete from person where xm='张三';
3.10删除数据库和表 1 2 drop database 库名; drop table 表名;
3.11看mysql版本 1 show variables like 'version';
九、Scala安装 1 2 3 4 5 cd /home/hadoop/Downloads tar -zxvf scala-2.11.8.tgz mv scala-2.11.8 scala #改名 sudo scala /usr/local vim ~/.bashrc
1 export PATH=$PATH:/usr/local/scala/bin
检查scala是否安装成功
十、IntelliJ IDEA开发工具的安装和使用方法 1 2 3 4 cd /home/hadoop/Downloads tar -zxvf ideaIU-2017.3.5.tar.gz mv idea-IU-173.4674.33 idea #改名 sudo mv idea /usr/local
十一、启动IDEA 1 2 cd /usr/local/idea ./bin/idea.sh
会显示
一个界面忘记截图了
然后
我选的evaluate for free
1.为IDEA安装Scala插件 点plugins
2.配置JDK
3.创建一个新项目WordCount
4.设置目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import org.apache.spark.SparkContext import org.apache.spark.SparkContext ._import org.apache.spark.SparkConf object WordCount { def main (args: Array [String ]) { val inputFile = "file:///usr/local/spark/mycode/wordcount/word.txt" val conf = new SparkConf ().setAppName("WordCount" ).setMaster("local" ) val sc = new SparkContext (conf) val textFile = sc.textFile(inputFile) val wordCount = textFile.flatMap(line => line.split(" " )).map(word => (word, 1 )).reduceByKey((a, b) => a + b) wordCount.foreach(println) } }
记得去mycode里面船舰一个wordcount
5.配置pom.xml文件
输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > dblab</groupId > <artifactId > WordCount</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <spark.version > 2.1.0</spark.version > <scala.version > 2.11</scala.version > </properties > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_${scala.version}</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-streaming_${scala.version}</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_${scala.version}</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-hive_${scala.version}</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-mllib_${scala.version}</artifactId > <version > ${spark.version}</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > org.scala-tools</groupId > <artifactId > maven-scala-plugin</artifactId > <version > 2.15.2</version > <executions > <execution > <goals > <goal > compile</goal > <goal > testCompile</goal > </goals > </execution > </executions > </plugin > <plugin > <artifactId > maven-compiler-plugin</artifactId > <version > 3.6.0</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-surefire-plugin</artifactId > <version > 2.19</version > <configuration > <skip > true</skip > </configuration > </plugin > </plugins > </build > </project >
emm….因为前面没有安装maven所以,压根九不能自动生成
首先装一下maven
1 sudo apt-get install maven
命令检查 Maven 是否已安装:
然后在terminal里面输入
然后就自动安装依赖环境了,有点久
然后
搞定了
6.打包WordCount程序生成JAR包
只保留这两个
生成的JAR在~/IdeaProjects/WordCount/out/artifacts/WordCount_jar/WordCount.jar
7.把JAR包提交到Spark中运行 1 2 cd /usr/local/spark ./bin/spark-submit --class "WordCount" ~/IdeaProjects/WordCount/out/artifacts/WordCount_jar/WordCount.jar
十二、安装Kettle 1 2 3 cd /usr/local sudo mkdir kettle sudo chown -R hadoop ./kettle #用用户赋予针对kettle目录的操作权限
把安装包data-integration.zip解压到安装目录
1 2 3 cd ~ sudo unzip ~/Downloads/data-integration.zip sudo mv data-integration /usr/local/kettle
1.复制MySQL数据库驱动程序JAR包 1 2 3 cd ~/Downloads sudo unzip mysql-connector-java-5.1.40.zip sudo cp ./mysql-connector-java-5.1.40/mysql-connector-java-5.1.40 /usr/local/kettle/data-integration/lib
2.启动Kettle中的Spoon 1 2 3 cd /usr/local/kettle/data-integration chmod +x spoon.sh #设置权限 sudo ./spoon.sh
3.使用Kettle把数据加载到HDFS中 先创建一个word.txt文件
随便输入一点英文
然后
1 cd /usr/local/kettle/data-integration/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh55/
最后一行输入
1 authentication.superuser.provider=NO_AUTH
1 2 3 4 5 6 mv core-site.xml ./core-site.xml.bak mv mapred-site.xml ./mapred-site.xml.bak mv yarn-site.xml ./yarn-site.xml.bak cp /usr/local/hadoop/etc/hadoop/mapred-site.xml ./mapred-site.xml cp /usr/local/hadoop/etc/hadoop/yarn-site.xml ./ cp /usr/local/hadoop/etc/hadoop/core-site.xml ./
这边的post端口必须和之前配置的core-site.xml的设置一样,其他参数可以不改动
点击测试后就是下面三×,其他全勾才对。得保证hadoop开启了。
拖拽一个START到右侧
然后去JHDFS创建目录input_spark
1 2 cd /usr/local/hadoop hdfs dfs -mkdir /input_spark
建立连接(按住shift和鼠标左键)
双击 Hadoop Copy File 那么设置
执行成功
然后
1 2 cd /usr/local/hadoop hdfs dfs -ls /input_spark
就能看到信息
那么做的意义是用工具kettle把数据从本地文件加载到HDFS中
十三、使用Spark SQL读写MySQL数据库的方法 1.创建MySQL数据库 启动数据库
1 2 service mysql start mysql -u root -p
创建数据库和表
1 2 3 4 5 6 create database spark; use spark; create table student (id int(4), name char(20),gender char(4),age int (4)); insert into student values(1,'Xueqian','F',23); insert into student values(2,'Weiliang','M',24); select * from student;
2.在spark-shell中读写mysql数据库 1 2 cd ~/Downloads cp ./mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar /usr/local/spark/jars/
启动spark shell 时,必须指定MySQL连接驱动JAR包,
1 2 cd /usr/local/spark ./bin/spark-shell --jars /usr/local/spark/jars/mysql-connector-java-5.1.40-bin.jar --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40-bin.jar
1 2 3 4 5 6 7 8 scala> val jdbcDF= spark.read.format("jdbc"). option("url","jdbc:mysql://localhost:3306/spark"). option("driver","com.mysql.jdbc.Driver"). option("dbtable","student"). option("user","root"). option("password","123"). option("useSSL", "false"). load()
如果出现了,不用在意,好像对下面操作影响不大
Sun May 28 22:52:36 PDT 2023 WARN: Establishing SSL connection without server’s identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn’t set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to ‘false’. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification. jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string … 2 more fields]
输入
3.向MySQL数据库写入数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.Properties import org.apache.spark.sql.types._import org.apache.spark.sql.Row val studentRDD = spark.sparkContext.parallelize(Array ("3 Rongcheng M 26" ,"4 Guanhua M 27" )).map(_.split(" " )) val schema = StructType (List (StructField ("id" , IntegerType , true ),StructField ("name" , StringType , true ),StructField ("gender" , StringType , true ),StructField ("age" , IntegerType , true ))) val rowRDD = studentRDD.map(p => Row (p(0 ).toInt, p(1 ).trim, p(2 ).trim, p(3 ).toInt)) val studentDF = spark.createDataFrame(rowRDD, schema) val prop = new Properties ()prop.put("user" ,"root" ) prop.put("password" ,"123" ) prop.put("driver" ,"com.mysql.jdbc.Driver" ) studentDF.write.mode("append" ).jdbc("jdbc:mysql://localhost:3306/spark" ,"spark.student" ,prop)
然后
4.编写独立应用程序读写MySQL数据库
在SparkOperateMySQL写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import java.util.Properties import org.apache.spark.sql.types._import org.apache.spark.sql.{Row , SparkSession }import org.apache.spark.{SparkConf , SparkContext }object SparkOperateMySQL { def main (args: Array [String ]): Unit = { val spark= SparkSession .builder().appName("sqltest" ).master("local[2]" ).getOrCreate() val jdbcDF = spark. read.format("jdbc" ). option("url" , "jdbc:mysql://localhost:3306/spark" ). option("driver" ,"com.mysql.jdbc.Driver" ). option("dbtable" , "student" ). option("user" , "root" ). option("password" , "123" ).load() jdbcDF.show() val studentRDD = spark.sparkContext.parallelize(Array ("5 Chenglu F 22" ,"6 Linzhe M 23" )).map(_.split(" " )) val rowRDD = studentRDD.map(p => Row (p(0 ).toInt, p(1 ).trim, p(2 ).trim, p(3 ).toInt)) rowRDD.foreach(println) val schema = StructType (List ( StructField ("id" , IntegerType , true ), StructField ("name" , StringType , true ), StructField ("gender" , StringType , true ), StructField ("age" , IntegerType , true ))) val studentDF=spark.createDataFrame(rowRDD,schema) val prop = new Properties () prop.put("user" , "root" ) prop.put("password" , "123" ) prop.put("driver" ,"com.mysql.jdbc.Driver" ) studentDF.write.mode("append" ).jdbc("jdbc:mysql://localhost:3306/spark" , "spark.student" , prop) jdbcDF.show() } }
在pom.xml中添加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > dblab</groupId > <artifactId > spark-mysql</artifactId > <version > 1.0-SNAPSHOT</version > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.40</version > </dependency > </dependencies > </project >
看到下面有这个就是在下载
run一下
5.生成应用程序JAR包
同之前的一样
只删剩
生成的在~/IdeaProjects/SparkMySQL/out/artifacts/SparkMySQL_jar/SparkMySQL.jar中
6.把JAR包提交到Spark中运行 1 2 cd /usr/local/spark/ ./bin/spark-submit --class "SparkOperateMySQL" ~/IdeaProjects/SparkMySQL/out/artifacts/SparkMySQL_jar/SparkMySQL.jar
成功后如下图
十四、使用Spark MLlib实现协同过滤算法 1.在spark-shell中运行ALS算法 找到Saprk自带的MoveLens数据集
1 /usr/local/spark/data/mllib/als/sample_movielens_data.txt
使用ALS.train()方法来构建
1 2 import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS
创建一个Rating类和parseRating函数。 parseRating函数读取MovieLens数据集中的每一行,并转化成Rating类的对象
1 case class Rating (userId: Int , movieId: Int , rating: Float , timestamp: Long )
1 2 3 4 5 def parseRating (str: String ): Rating = { val fields = str.split("::" ) assert(fields.size == 4 ) Rating (fields(0 ).toInt, fields(1 ).toInt, fields(2 ).toFloat, fields(3 ).toLong) }
1 val ratings = spark.sparkContext.textFile("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt" ).map(parseRating).toDF()
把MovieLens数据集划分训练集和测试集,训练集80%,测试集20% 1 val Array (training,test) = ratings.randomSplit(Array (0.8 ,0.2 ))
这里构建两个模型,一个是显性反馈,另一个是隐性反馈。
1 val alsExplicit = new ALS ().setMaxIter(5 ).setRegParam(0.01 ).setUserCol("userId" ).setItemCol("movieId" ).setRatingCol("rating" )
1 val alsImplicit = new ALS ().setMaxIter(5 ).setRegParam(0.01 ).setImplicitPrefs(true ).setUserCol("userId" ).setItemCol("movieId" ).setRatingCol("rating" )
1 val modelExplicit = alsExplicit.fit(training)
1 val modelImplicit = alsImplicit.fit(training)
对测试集中的用户-电影进行预测,的到预测评分的数据集 1 val predictionsExplicit= modelExplicit.transform(test).na.drop()
1 val predictionsImplicit= modelImplicit.transform(test).na.drop()
1 predictionsExplicit.show()
1 predictionsImplicit.show()
通过计算模型的均方根误差,来对模型进行评估。均匀方根差越小,模型越准确 1 val evaluator = new RegressionEvaluator ().setMetricName("rmse" ).setLabelCol("rating" ).setPredictionCol("prediction" )
1 val rmseExplicit = evaluator.evaluate(predictionsExplicit)
1 val rmseImplicit = evaluator.evaluate(predictionsImplicit)
//打出两个模型的均方根误差
1 println(s"Explicit:Root-mean-square error = $rmseExplicit " )
1 println(s"Implicit:Root-mean-square error = $rmseImplicit " )
2.编写独立应用程序运行ALS算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import org.apache.spark.sql.SparkSession import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.recommendation.ALS object MovieLensALS { case class Rating (userId: Int , movieId: Int , rating: Float , timestamp: Long ) ; def parseRating (str: String ): Rating = { val fields = str.split("::" ) Rating (fields(0 ).toInt, fields(1 ).toInt, fields(2 ).toFloat, fields(3 ).toLong) } def main (args: Array [String ]): Unit = { val spark=SparkSession .builder().appName("sparkmllibtest" ).master("local[2]" ).getOrCreate() import spark.implicits._ val ratings = spark.sparkContext.textFile("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt" ). map(parseRating).toDF() ratings.show() val Array (training,test) = ratings.randomSplit(Array (0.8 ,0.2 )) val alsExplicit = new ALS ().setMaxIter(5 ).setRegParam(0.01 ).setUserCol("userId" ).setItemCol("movieId" ).setRatingCol("rating" ) val alsImplicit = new ALS ().setMaxIter(5 ).setRegParam(0.01 ).setImplicitPrefs(true ).setUserCol("userId" ).setItemCol("movieId" ).setRatingCol("rating" ) val modelExplicit = alsExplicit.fit(training) val modelImplicit = alsImplicit.fit(training) val predictionsExplicit= modelExplicit.transform(test).na.drop() val predictionsImplicit= modelImplicit.transform(test).na.drop() predictionsExplicit.show() predictionsImplicit.show() val evaluator = new RegressionEvaluator ().setMetricName("rmse" ).setLabelCol("rating" ).setPredictionCol("prediction" ) val rmseExplicit = evaluator.evaluate(predictionsExplicit) val rmseImplicit = evaluator.evaluate(predictionsImplicit) println(s"Explicit:Root-mean-square error = $rmseExplicit " ) println(s"Implicit:Root-mean-square error = $rmseImplicit " ) } }
然后搞pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > dblab</groupId > <artifactId > SparkALS</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <spark.version > 2.1.0</spark.version > </properties > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-mllib_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_2.11</artifactId > <version > ${spark.version}</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.0</version > <executions > <execution > <id > compile-scala</id > <phase > compile</phase > <goals > <goal > add-source</goal > <goal > compile</goal > </goals > </execution > <execution > <id > test-compile-scala</id > <phase > test-compile</phase > <goals > <goal > add-source</goal > <goal > testCompile</goal > </goals > </execution > </executions > <configuration > <scalaVersion > 2.11.8</scalaVersion > </configuration > </plugin > </plugins > </build > </project >
自己参照前面的把maven运行一下
然后
run那个class文件
成功!
导出jar文件,参考上面的
把jar提交到spark中运行
1 2 cd /usr/local/spark ./bin/spark-submit --class "MovieLensALS" ~/IdeaProjects/SparkALS/out/artifacts/SparkALS_jar/SparkALS.jar
搞定!
十五、Node.js的安装和使用方法 1.安装 1 2 cd ~/Downloads curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.33.8/install.sh | bash
这边需要配置一下host 因为某些不可抗拒因素
1 Failed to connect to raw.githubusercontent.com port 443
查询真实IP
在https://www.ipaddress.com/ 查询raw.githubusercontent.com的真实IP。
修改hosts
添加如下内容:
1 199.232 .96.133 raw.githubusercontent .com
然后
1 2 上面curl的下载完成后 sudo vim ~/.bashrc
把下面的写进去
1 2 export NVM_DIR="$HOME/.nvm" [ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh"
然后下一步
1 2 nvm install --lts node -v
会出现这个,因为Ubuntu命令太古老了
所以另辟蹊径
nvm install v12.22.6
nvm use v12.22.6
node -v
2.应用 1 2 3 4 cd ~ mkdir mynodeapp cd mynodeapp vim server.js
输入
1 2 3 4 5 6 var http=require ('http' );http.createServer (function (request,response ){ response.writeHead (200 ,{'Content-Type' :'text/plain' }); response.end ('Hello World\n' ); }).listen (3000 ); console .log ('Server running at http://127.0.0.1:3000/' );
保存之后
3.安装express和Jade 1 2 cd ~ mkdir expressjadeapp
1 2 cd expressjadeapp npm init
1 2 cd expressjadeapp npm install express --save
1 2 cd expressjadeapp npm install jade --save
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 var express = require ('express' );var http=require ('http' );var app = express ();app.set ('view engine' , 'jade' ); app.set ('views' , __dirname); app.get ('/' , (req, res ) => { res.render ('test' , { title : 'Jade test' ,message :'Database Lab' }); }); var servser=http.createServer (app);servser.listen (3000 ); console .log ('Server running at http://127.0.0.1:3000/' );
1 2 3 4 5 html head tittle!=tittle body h1!=message
然后
实例1 登陆注册 1 2 3 4 5 create database userlogin; use userlogin; create table user (userid int(20) not null auto_increment,username char(50),password char(50),primary key(userid)); desc user; select * from user;
创建项目目录
1 2 3 4 cd ~ mkdir userloginapp cd userloginapp npm init
要apt install npm
1 2 npm install express -save npm install jade --save
1 npm install mysql --save
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 var express=require ('express' );var app=express ();var mysql=require ('mysql' ); var connection = mysql.createConnection ({ host : '127.0.0.1' , user : 'root' , password : '123' , database : 'userlogin' , port :'3306' }); connection.connect (); app.get ('/' ,function (req,res ) { res.sendfile (__dirname + "/" + "index.html" ); }) app.get ('/login' ,function (req,res ) { var name=req.query .name ; var pwd=req.query .pwd ; var selectSQL = "select * from user where username = '" +name+"' and password = '" +pwd+"'" ; connection.query (selectSQL,function (err,rs ) { if (err) throw err; console .log (rs); console .log ('OK' ); res.sendfile (__dirname + "/" + "ok.html" ); }) }) app.get ('/register.html' ,function (req,res ) { res.sendfile (__dirname+"/" +"register.html" ); }) app.get ('/register' ,function (req,res ) { var name=req.query .name ; var pwd=req.query .pwd ; var user={username :name,password :pwd}; connection.query ('insert into user set ?' ,user,function (err,rs ) { if (err) throw err; console .log ('ok' ); res.sendfile (__dirname + "/" + "index.html" ); }) }) var server=app.listen (3000 ,function ( ) { console .log ("userlogin server starts......" ); })
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <p > 用户登录 <form action ="http://127.0.0.1:3000/login" > User Name:<input type ="text" name ="name" /> Password:<input type ="text" name ="pwd" /> <input type ="submit" value ="提交" /> </form > <p > <a href ="register.html" > 注册</a > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <p > 用户注册<form action ="http://127.0.0.1:3000/register" > User Name:<input type ="text" name ="name" /> Password:<input type ="text" name ="pwd" /> <input type ="submit" value ="提交" /> </form > </body > </html >
1 2 3 4 5 6 7 8 9 10 11 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > login success! </body > </html >
实例2 用jade实现 1 2 3 4 5 6 7 8 cd ~ mkdir userloginjadeapp cd userloginjadeapp npm init npm install express -save npm install jade --save npm install mysql --save npm install body-parser --save
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 var express=require ('express' ); var bodyParser = require ('body-parser' ) var app=express (); var mysql=require ('mysql' ); app.set ('view engine' , 'jade' ); app.set ('views' , __dirname); app.use (bodyParser.urlencoded ({extended : false })) app.use (bodyParser.json ()) var connection = mysql.createConnection ({ host : '127.0.0.1' , user : 'root' , password : '123' , database : 'movierecommend' , port :'3306' }); connection.connect (); app.get ('/' ,function (req,res ) { res.render ('index' ); }) app.post ('/login' ,function (req,res ) { var name=req.body .username .trim (); var pwd=req.body .pwd .trim (); console .log ('username:' +name+'password:' +pwd); var selectSQL = "select * from user where username = '" +name+"' and password = '" +pwd+"'" ; connection.query (selectSQL,function (err,rs ) { if (err) throw err; console .log (rs); console .log ('ok' ); res.render ('ok' ,{title :'Welcome User' ,message :name}); }) }) app.get ('/registerpage' ,function (req,res ) { res.render ('registerpage' ,{title :'注册' }); }) app.post ('/register' ,function (req,res ) { var name=req.body .username .trim (); var pwd=req.body .pwd .trim (); var user={username :name,password :pwd}; connection.query ('insert into user set ?' ,user,function (err,rs ) { if (err) throw err; console .log ('ok' ); res.render ('ok' ,{title :'Welcome User' ,message :name}); }) }) var server=app.listen (3000 ,function ( ) { console .log ("userloginjade server start......" ); })
1 2 3 4 5 6 7 8 9 10 11 12 13 14 html head title!= title body form(action='/login', method='post') p 用户登录 input(type='text',name='username') input(type='text',name='pwd') input(type='submit',value='登录') br a(href='/registerpage', title='注册') 注册
1 2 3 4 5 6 7 8 9 10 11 12 13 14 html head title!= title body form(action='/register', method='post') p 用户注册 input(type='text',name='username') input(type='text',name='pwd') input(type='submit',value='注册')
1 2 3 4 5 6 html head title!= title body h1 热烈欢迎用户: #{message}
先去mysql里面创建一个movierecommend数据库
1 2 3 4 5 create database movierecommend;use movierecommend; create table user (userid int (20 ) not null auto_increment,username char (50 ),password char (50 ),primary key(userid));desc user ;select * from user ;
然后再
通过网页调用词频统计程序 1 2 3 4 cd ~ mkdir myapp cd myapp npm init
1 2 3 4 cd ~/myapp npm install express -save npm install jade --save npm install body-parser --save
1 2 3 4 5 6 7 8 9 10 11 12 13 14 const express = require ('express' );const app = express ();app.get ('/' , (req, res ) => { res.send ('Hello World!' ); }); const servser=app.listen (3000 ,function ( ){ const host=servser.address ().address ; const port=servser.address ().port ; console .log ('Listening at http://%:%' ,host,port); })
vim index.jade
1 2 3 4 5 html head tittle!=tittle body h1!=message
再次修改index.js文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 const express = require ('express' )const bodyParser = require ('body-parser' )const exec = require ('child_process' ).exec const app = express (); app.set ('views' ,'./views' ) app.set ('view engine' , 'jade' ) app.use (bodyParser.urlencoded ({extended : false })) app.use (bodyParser.json ()) app.get ('/' , function (req, res ) { res.render ('index' , {title : 'WordCount测试' , message : '厦门大学数据库实验室!' }) }) app.post ('/' ,function (req, res ){ const path = req.body .path .trim () const jarStr = '/usr/local/spark/bin/spark-submit --class WordCount ~/IdeaProjects/WordCount/out/artifacts/WordCount_jar/WordCount.jar ' +path+' /output' const rmrStr = '/usr/local/hadoop/bin/hdfs dfs -rmr /output' const catStr = '/usr/local/hadoop/bin/hdfs dfs -cat /output/*' exec (rmrStr,function (err, stdout, stderr ){ exec (jarStr, function (err, stdout, stderr ){ if (stderr){ res.render ('index' , {title : 'WordCount测试' , message : '厦门大学数据库实验室!' , result : stderr}) } exec (catStr,function (err, stdout, stderr ){ res.render ('index' , {title : 'WordCount测试' , message : '厦门大学数据库实验室!' , result : stdout}) }) }) }) }) const server = app.listen (3000 , function ( ) { const host = server.address ().address ; const port = server.address ().port ; console .log ('Example app listening at http://%s:%s' , host, port); });
再改index.jade,并且创建一个views的目录把index.jade移动进去
1 2 mkdir views mv index.jade ~/myapp/views
1 2 3 4 5 6 7 8 9 10 11 12 13 14 html head title!= title body h1!= message form(action='/', method='post') p 请在下面输入所需要进行词频统计文件的路径 p HDFS文件路径示例:hdfs://localhost:9000/user/hadoop/word.txt p 本地文件路径示例:file:///home/hadoop/word.txt br input(name='path') input(type='submit') br textarea(rows='40', cols='40')!=result
搞完之后开启hadoop,hdfs,然后修改WirdCount.jar的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import org.apache.spark.SparkContext import org.apache.spark.SparkContext ._import org.apache.spark.SparkConf object WordCount { def main (args: Array [String ]) { val inputFile = args(0 ) val conf = new SparkConf ().setAppName("WordCount" ).setMaster("local" ) val sc = new SparkContext (conf) val textFile = sc.textFile(inputFile) val wordCount = textFile.flatMap(line => line.split(" " )).map(word => (word, 1 )).reduceByKey((a, b) => a + b) wordCount.foreach(println) wordCount.saveAsTextFile(args(1 )) } }
然后
1 2 cd ~/myapp node index.js
这边建议修改一下index.js代码,因为报错包括WARN,所以很难没错
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 const express = require ('express' )const bodyParser = require ('body-parser' )const exec = require ('child_process' ).exec const app = express (); app.set ('views' ,'./views' ) app.set ('view engine' , 'jade' ) app.use (bodyParser.urlencoded ({extended : false })) app.use (bodyParser.json ()) app.get ('/' , function (req, res ) { res.render ('index' , {title : 'WordCount测试' , message : '厦门大学数据库实验室!' }) }) app.post ('/' ,function (req, res ){ const path = req.body .path .trim () const jarStr = '/usr/local/spark/bin/spark-submit --class WordCount ~/IdeaProjects/WordCount/out/artifacts/WordCount_jar/WordCount.jar ' +path+' /output' const rmrStr = '/usr/local/hadoop/bin/hdfs dfs -rmr /output' const catStr = '/usr/local/hadoop/bin/hdfs dfs -cat /output/*' exec (rmrStr,function (err, stdout, stderr ){ exec (jarStr, function (err, stdout ){ exec (catStr,function (err, stdout, stderr ){ res.render ('index' , {title : 'WordCount测试' , message : '厦门大学数据库实验室!' , result : stdout}) }) }) }) }) const server = app.listen (3000 , function ( ) { const host = server.address ().address ; const port = server.address ().port ; console .log ('Example app listening at http://%s:%s' , host, port); });
这个是测试spark和hdfs连通问题
1 2 3 4 5 6 val data = Seq("Hello", "World", "Spark", "HDFS") val rdd = spark.sparkContext.parallelize(data) rdd.saveAsTextFile("hdfs://localhost:9000/usr/hadoop/my.txt") val rdd = spark.sparkContext.textFile("hdfs://localhost:9000/usr/hadoop/my.txt") rdd.foreach(println) 有输出就是联通的
解决一个warn警告
1 “WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform” warning
1 2 3 4 配置一下 HADOOP_HOME 检测命令 echo $HADOOP_HOME
十六、电影推荐系统(基础版)实现过程 1 2 cd ~/Downloads sudo unzip movie_recommend.zip
1.清洗数据 1 2 3 4 cd /usr/local/hadoop ./sbin/start-dfs.sh#启动 #建立一个数据集HDFS目录input_spark hdfs dfs -mkdir /input_spark
把下面俩个导入进hdfs里面
1 2 file:///home/hadoop/Downloads/movie_recommend/personalRatings.dat file:///home/hadoop/Downloads/movie_recommend/ratings.dat
对movie.dat清洗
提娜佳剪切字符串控件
我打错了应该是dat
good game
查看
1 hdfs dfs -cat /input_spark/movies.dat | head -5
乱码也不知道为什么,因为后面不用清洗过的呵呵呵
2.编写Spark程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 package recommendimport java.io.File import org.apache.log4j.{Level , Logger }import org.apache.spark.ml.recommendation.{ALS , ALSModel }import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame , Row , SparkSession }import org.apache.spark.{SparkConf , SparkContext }import scala.io.Source object MovieLensALS { case class Rating (user : Int , product : Int , rating : Double ) val spark=SparkSession .builder().appName("MovieLensALS" ).master("local[2]" ).getOrCreate() def main (args: Array [String ]) { Logger .getLogger("org.apache.spark" ).setLevel(Level .ERROR ) Logger .getLogger("org.eclipse.jetty.server" ).setLevel(Level .OFF ) if (args.length != 5 ) { println("Usage: /usr/local/spark/bin/spark-submit --class recommend.MovieLensALS " + "Spark_Recommend_Dataframe.jar movieLensHomeDir personalRatingsFile bestRank bestLambda bestNumiter" ) sys.exit(1 ) } import spark.implicits._ val myRatings = loadRatings(args(1 )) val myRatingsRDD = spark.sparkContext.parallelize(myRatings, 1 ) val movieLensHomeDir = args(0 ) val ratings = spark.sparkContext.textFile(new File (movieLensHomeDir, "ratings.dat" ).toString).map { line => val fields = line.split("::" ) (fields(3 ).toLong % 10 , Rating (fields(0 ).toInt, fields(1 ).toInt, fields(2 ).toDouble)) } val movies = spark.sparkContext.textFile(new File (movieLensHomeDir, "movies.dat" ).toString).map { line => val fields = line.split("::" ) (fields(0 ).toInt, fields(1 ).toString()) }.collect().toMap val numRatings = ratings.count() val numUsers = ratings.map(_._2.user).distinct().count() val numMovies = ratings.map(_._2.product).distinct().count() val numPartitions = 4 val trainingDF = ratings.filter(x => x._1 < 6 ) .values .union(myRatingsRDD) .toDF() .repartition(numPartitions) val validationDF = ratings.filter(x => x._1 >= 6 && x._1 < 8 ) .values .toDF() .repartition(numPartitions) val testDF = ratings.filter(x => x._1 >= 8 ).values.toDF() val numTraining = trainingDF.count() val numValidation = validationDF.count() val numTest = testDF.count() val ranks = List (8 , 12 ) val lambdas = List (0.1 , 10.0 ) val numIters = List (10 , 20 ) var bestModel: Option [ALSModel ] = None var bestValidationRmse = Double .MaxValue var bestRank = args(2 ).toInt var bestLambda = args(3 ).toDouble var bestNumIter = args(4 ).toInt for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { val als = new ALS ().setMaxIter(numIter).setRank(rank).setRegParam(lambda).setUserCol("user" ).setItemCol("product" ).setRatingCol("rating" ) val model = als.fit(trainingDF) val validationRmse = computeRmse(model, validationDF, numValidation) if (validationRmse < bestValidationRmse) { bestModel = Some (model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } val testRmse = computeRmse(bestModel.get, testDF, numTest) val meanRating = trainingDF.union(validationDF).select("rating" ).rdd.map{case Row (v : Double ) => v}.mean val baselineRmse = math.sqrt(testDF.select("rating" ).rdd.map{case Row (v : Double ) => v}.map(x => (meanRating - x) * (meanRating - x)).mean) val improvement = (baselineRmse - testRmse) / baselineRmse * 100 val myRatedMovieIds = myRatings.map(_.product).toSet val candidates = spark.sparkContext.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq).map(Rating (1 ,_,0.0 )) .toDF().select("user" ,"product" ) val recommendations = bestModel.get .transform(candidates).select("user" ,"product" ,"prediction" ).rdd .map(x => Rating (x(0 ).toString.toInt,x(1 ).toString.toInt,x(2 ).toString.toDouble)) .sortBy(-_.rating) .take(10 ) var i = 1 println("Movies recommended for you(用户 ID:推荐电影 ID:推荐分数:推荐电影名称):" ) recommendations.foreach { r => println(r.user + ":" + r.product + ":" + r.rating + ":" + movies(r.product)) i += 1 } spark.sparkContext.stop() } def computeRmse (model: ALSModel , df: DataFrame , n: Long ): Double = { import spark.implicits._ val predictions = model.transform(df.select("user" ,"product" )) val predictionsAndRatings = predictions.select("user" ,"product" ,"prediction" ).rdd.map(x => ((x(0 ),x(1 )),x(2 ))) .join(df.select("user" ,"product" ,"rating" ).rdd.map(x => ((x(0 ),x(1 )),x(2 )))) .values .take(10 ) math.sqrt(predictionsAndRatings.map(x => (x._1.toString.toDouble - x._2.toString.toDouble) * (x._1.toString.toDouble - x._2.toString.toDouble)).reduce(_ + _) / n) } def loadRatings (path: String ): Seq [Rating ] = { val lines = Source .fromFile(path).getLines() val ratings = lines.map { line => val fields = line.split("::" ) Rating (fields(0 ).toInt, fields(1 ).toInt, fields(2 ).toDouble) }.filter(_.rating > 0.0 ) if (ratings.isEmpty) { sys.error("No ratings provided." ) } else { ratings.toSeq } } }
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > Spark_Recommend_Dataframe</groupId > <artifactId > Spark_Recommend_Dataframe</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <spark.version > 2.1.0</spark.version > </properties > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-mllib_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_2.11</artifactId > <version > ${spark.version}</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.0</version > <executions > <execution > <id > compile-scala</id > <phase > compile</phase > <goals > <goal > add-source</goal > <goal > compile</goal > </goals > </execution > <execution > <id > test-compile-scala</id > <phase > test-compile</phase > <goals > <goal > add-source</goal > <goal > testCompile</goal > </goals > </execution > </executions > <configuration > <scalaVersion > 2.11.8</scalaVersion > </configuration > </plugin > </plugins > </build > </project >
运行成功
用老图示意一下
jar放入spark中运行
1 2 3 cd /usr/local/spark/ ./bin/spark-submit --class recommend.MovieLensALS ~/IdeaProjects/Spark_Recommend_Dataframe/out/artifacts/Spark_Recommend_Dataframe_jar/Spark_Recommend_Dataframe.jar /input_spark ~/Downloads/personalRatings.dat 10 5 10
运行完成
3.node.js中展示 1 2 3 4 5 6 7 8 cd ~ mkdir mysparkapp cd mysparkapp npm init npm install express -save npm install jade --save npm install body-parser --save
1 2 mkdir views vim index.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 const express = require ('express' )const bodyParser = require ('body-parser' )const spawnSync = require ('child_process' ).spawnSync const app = express (); app.set ('views' ,'./views' ) app.set ('view engine' , 'jade' ) app.use (bodyParser.urlencoded ({extended : false })) app.use (bodyParser.json ()) app.get ('/' , function (req, res ) { res.render ('index' , {title : '电影推荐系统' , message : '厦门大学数据库实验室电影推荐系统' }) }) app.post ('/' ,function (req, res ){ const path = req.body .path .trim () || '/input_spark' const myRatings = req.body .myRatings .trim () || '~/Downloads/personalRatings.dat' const bestRank = req.body .bestRank .trim () || 10 const bestLambda = req.body .bestLambda .trim () || 5 const bestNumIter = req.body .bestNumIter .trim () || 10 let spark_submit = spawnSync ('/usr/local/spark/bin/spark-submit' ,['--class' , 'recommend.MovieLensALS' ,' ~/IdeaProjects/Spark_Recommend_Dataframe/out/artifacts/Spark_Recommend_Dataframe_jar/Spark_Recommend_Dataframe.jar' , path, myRatings, bestRank, bestLambda, bestNumIter],{ shell :true , encoding : 'utf8' }) res.render ('index' , {title : '电影推荐系统' , message : '厦门大学数据库实验室电影推荐系统' , result : spark_submit.stdout }) }) const server = app.listen (3000 , function ( ) { const host = server.address ().address ; const port = server.address ().port ; console .log ('Example app listening at http://%s:%s' , host, port); });
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 html head title!= title body h1!= message form (action='/' , method='post' ) p 请输入建模的相关信息 table (border=0 ) tr td 样本数据的路径(默认为/input_spark) td input (style='width:350px' ,placeholder='/input_spark' ,name='path' ) tr td 用户评分数据的路径(默认为~/Downloads/ personalRatings.dat ) td input (style='width:350px' ,placeholder='~/Downloads/personalRatings.dat ' ,name='myRatings' ) tr td 隐语义因子的个数: td input (placeholder='10' ,type='number' ,min='8' ,max='12' ,name='bestRank' ) tr td 正则化参数: td input (placeholder='5' ,type='number' ,min='0' ,max='10' ,step='0.1' ,name='bestLambda' ) tr td 迭代次数: td input (placeholder='10' ,type='number' ,min='10' ,max='20' ,name='bestNumIter' ) input (type='submit' ) br textarea (rows='20' , cols='40' )!=result
1 2 cd mysparkapp node index.js
十七、update 数据库
1 2 3 4 service mysql start mysql -u root -p source ~/Downloads/MovieRecommendDatabase.sql select count(*) from movieinfo;use movierecommend ;
idea
和之前步骤一样,最后创建一个recomme文件
在文件里面创建四个文件
MovieLensALS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 package recommend import java.io.File import org.apache.log4j.{Level, Logger} import org.apache.spark.ml.recommendation.{ALS, ALSModel} import org.apache.spark.mllib.recommendation.Rating import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} object MovieLensALS { case class Rating(user : Int, product : Int, rating : Double) val spark=SparkSession.builder().appName("MovieLensALS").master("local[2]").getOrCreate() def main(args: Array[String]) { // 屏蔽不必要的日志显示在终端上 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) if (args.length != 2) { println("Usage: /usr/local/spark/bin/spark-submit --class recommend.MovieLensALS " + "Spark_Recommend_Dataframe.jar movieLensHomeDir userid") sys.exit(1) } // 设置运行环境 import spark.implicits._ // 装载参数二,即用户评分,该评分由评分器生成 val userid=args(1).toInt; //删除该用户之前已经存在的电影推荐结果,为本次写入最新的推荐结果做准备 DeleteFromMySQL.delete(userid) //从关系数据库中读取该用户对一些电影的个性化评分数据 val personalRatingsLines:Array[String]=ReadFromMySQL.read(userid) val myRatings = loadRatings(personalRatingsLines) val myRatingsRDD = spark.sparkContext.parallelize(myRatings, 1) // 样本数据目录 val movieLensHomeDir = args(0) // 装载样本评分数据,其中最后一列 Timestamp 取除 10 的余数作为 key,Rating 为值,即(Int,Rating) //ratings.dat 原始数据:用户编号、电影编号、评分、评分时间戳 val ratings = spark.sparkContext.textFile(new File(movieLensHomeDir, "ratings.dat").toString).map { line => val fields = line.split("::") (fields(3).toLong % 10, Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)) } //装载电影目录对照表(电影 ID->电影标题) //movies.dat 原始数据:电影编号、电影名称、电影类别 val movies = spark.sparkContext.textFile(new File(movieLensHomeDir, "movies.dat").toString).map { line => val fields = line.split("::") (fields(0).toInt, fields(1).toString()) }.collect().toMap val numRatings = ratings.count() val numUsers = ratings.map(_._2.user).distinct().count() val numMovies = ratings.map(_._2.product).distinct().count() // 将样本评分表以 key 值切分成 3 个部分,分别用于训练 (60%,并加入用户评分), 校验 (20%), and 测试 (20%) // 该数据在计算过程中要多次应用到,所以 cache 到内存 val numPartitions = 4 // training 训练样本数据 val trainingDF = ratings.filter(x => x._1 < 6) //取评分时间除 10 的余数后值小于 6 的作为训练样本 .values .union(myRatingsRDD) //注意 ratings 是(Int,Rating),取 value 即可 .toDF() .repartition(numPartitions) // validation 校验样本数据 val validationDF = ratings.filter(x => x._1 >= 6 && x._1 < 8) //取评分时间除 10 的余数后值大于等于 6 且小于 8 分的作为校验样本 .values .toDF() .repartition(numPartitions) // test 测试样本数据 val testDF = ratings.filter(x => x._1 >= 8).values.toDF() //取评分时间除 10 的余数后值大于等于 8 分的作为测试样本 val numTraining = trainingDF.count() val numValidation = validationDF.count() val numTest = testDF.count() // 训练不同参数下的模型,并在校验集中验证,获取最佳参数下的模型 val ranks = List(8, 12) //模型中隐语义因子的个数 val lambdas = List(0.1, 10.0) //是 ALS 的正则化参数 val numIters = List(10, 20) //迭代次数 var bestModel: Option[ALSModel] = None //最好的模型 var bestValidationRmse = Double.MaxValue //最好的校验均方根误差 var bestRank = 0 //最好的隐语义因子的个数 var bestLambda = 0.0 //最好的ALS正则化参数 var bestNumIter = 0 //最好的迭代次数 for (rank <- ranks; lambda <- lambdas; numIter <- numIters) { println("正在执行循环训练模型") val als = new ALS().setMaxIter(numIter).setRank(rank).setRegParam(lambda).setUserCol("user").setItemCol("product").setRatingCol("rating") val model = als.fit(trainingDF)//训练样本、隐语义因子的个数、迭代次数、ALS 的正则化参数 // model 训练模型 //输入训练模型、校验样本、校验个数 val validationRmse = computeRmse(model, validationDF, numValidation) // 校验模型结果 if (validationRmse < bestValidationRmse) { bestModel = Some(model) bestValidationRmse = validationRmse bestRank = rank bestLambda = lambda bestNumIter = numIter } } // 用最佳模型预测测试集的评分,并计算和实际评分之间的均方根误差 val testRmse = computeRmse(bestModel.get, testDF, numTest) //创建一个基准(Naïve Baseline),并把它和最好的模型进行比较 val meanRating = trainingDF.union(validationDF).select("rating").rdd.map{case Row(v : Double) => v}.mean val baselineRmse = math.sqrt(testDF.select("rating").rdd.map{case Row(v : Double) => v}.map(x => (meanRating - x) * (meanRating - x)).mean) //改进了基准的最佳模型 val improvement = (baselineRmse - testRmse) / baselineRmse * 100 // 推荐前十部最感兴趣的电影,注意要剔除用户已经评分的电影 val myRatedMovieIds = myRatings.map(_.product).toSet val candidates = spark.sparkContext.parallelize(movies.keys.filter(!myRatedMovieIds.contains(_)).toSeq).map(Rating(userid,_,0.0)) .toDF().select("user","product") //上面的Rating(userid,_,0.0)中,0.0是赋予的初始评分值 val recommendations = bestModel.get .transform(candidates).select("user","product","prediction").rdd .map(x => Rating(x(0).toString.toInt,x(1).toString.toInt,x(2).toString.toDouble)) .sortBy(-_.rating) .take(10) //把推荐结果写入数据库 val rddForMySQL=recommendations.map(r=>r.user + "::"+ r.product + "::"+ r.rating+"::" + movies(r.product)) InsertIntoMySQL.insert(rddForMySQL) var i = 1 println("Movies recommended for you(用户 ID:推荐电影 ID:推荐分数:推荐电影名称):") recommendations.foreach { r => println(r.user + ":" + r.product + ":" + r.rating + ":" + movies(r.product)) i += 1 } spark.sparkContext.stop() } /** 校验集预测数据和实际数据之间的均方根误差 **/ //输入训练模型、校验样本、校验个数 def computeRmse(model: ALSModel, df: DataFrame, n: Long): Double = { import spark.implicits._ val predictions = model.transform(df.select("user","product")) //调用预测的函数 // 输出 predictionsAndRatings 预测和评分 val predictionsAndRatings = predictions.select("user","product","prediction").rdd.map(x => ((x(0),x(1)),x(2))) .join(df.select("user","product","rating").rdd.map(x => ((x(0),x(1)),x(2)))) .values .take(10) math.sqrt(predictionsAndRatings.map(x => (x._1.toString.toDouble - x._2.toString.toDouble) * (x._1.toString.toDouble - x._2.toString.toDouble)).reduce(_ + _) / n) } /** 装载用户评分文件 **/ def loadRatings(lines: Array[String]): Seq[Rating] = { val ratings = lines.map { line => val fields = line.split("::") Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble) }.filter(_.rating > 0.0) if (ratings.isEmpty) { sys.error("No ratings provided.") } else { ratings.toSeq } } }
DeleteFromMySQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package recommend import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.sql.{DataFrame, Row, SQLContext} object DeleteFromMySQL { val url = "jdbc:mysql://localhost:3306/movierecommend?useUnicode=true&characterEncoding=UTF-8" val prop = new java.util.Properties prop.setProperty("user", "root") prop.setProperty("password", "123") def delete(userid:Int): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "delete from recommendresult where userid="+userid conn = DriverManager.getConnection(url,prop) ps = conn.prepareStatement(sql) ps.executeUpdate() if (ps != null) { ps.close() } if (conn != null) { conn.close() } } }
InsertIntoMySQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package recommendimport java.util.Properties import org.apache.spark.sql.types._import org.apache.spark.sql.{Row , SparkSession }import org.apache.spark.{SparkConf , SparkContext }object InsertIntoMySQL { def insert (array:Array [String ]): Unit = { val spark= SparkSession .builder().appName("InsertIntoMySQL" ).master("local[2]" ).getOrCreate() val movieRDD = spark.sparkContext.parallelize(array).map(_.split("::" )) val rowRDD = movieRDD.map(p => Row (p(0 ).trim.toInt, p(1 ).trim.toInt, p(2 ).trim.toFloat, p(3 ).trim)) rowRDD.foreach(println) val schema = StructType (List ( StructField ("userid" , IntegerType , true ), StructField ("movieid" , IntegerType , true ), StructField ("rating" , FloatType , true ), StructField ("moviename" , StringType , true ))) val movieDF=spark.createDataFrame(rowRDD,schema) val prop = new Properties () prop.put("user" , "root" ) prop.put("password" , "123" ) prop.put("driver" ,"com.mysql.jdbc.Driver" ) movieDF.write.mode("append" ).jdbc("jdbc:mysql://localhost:3306/movierecommend" , "movierecommend.recommendresult" , prop) movieDF.show() } }
ReadFromMySQL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package recommendimport java.util.Properties import org.apache.spark.sql.types._import org.apache.spark.sql.{Row , SparkSession }import org.apache.spark.{SparkConf , SparkContext }import scala.collection.mutable.ArrayBuffer object ReadFromMySQL { def read (userid:Int ): Array [String ] = { val spark= SparkSession .builder().appName("ReadFromMySQL" ).master("local[2]" ).getOrCreate() import spark.implicits._ val personalRatingsDF = spark. read.format("jdbc" ). option("url" , "jdbc:mysql://localhost:3306/movierecommend" ). option("driver" ,"com.mysql.jdbc.Driver" ). option("dbtable" , "personalratings" ). option("user" , "root" ). option("password" , "123" ).load() personalRatingsDF.show() personalRatingsDF.createOrReplaceTempView("personalratings" ) val prDF=spark.sql("select * from personalratings where userid=" +userid) val myrdd=prDF.rdd.map(r=>{r(0 ).toString+"::" +r(1 ).toString+"::" +r(2 ).toString+"::" +r(3 ).toString}) val array=ArrayBuffer [String ]() array++=myrdd.collect(); println(array.length) val i=0 ; for (i <- 0 until array.length){println(array(i))} array.toArray } }
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > Film_Recommend_Dataframe</groupId > <artifactId > Film_Recommend_Dataframe</artifactId > <version > 1.0-SNAPSHOT</version > <properties > <spark.version > 2.1.0</spark.version > </properties > <dependencies > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-mllib_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-core_2.11</artifactId > <version > ${spark.version}</version > </dependency > <dependency > <groupId > org.apache.spark</groupId > <artifactId > spark-sql_2.11</artifactId > <version > 2.1.0</version > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.40</version > </dependency > <dependency > <groupId > org.apache.hadoop</groupId > <artifactId > hadoop-yarn-common</artifactId > <version > 2.7.4</version > </dependency > </dependencies > <build > <plugins > <plugin > <groupId > net.alchim31.maven</groupId > <artifactId > scala-maven-plugin</artifactId > <version > 3.2.0</version > <executions > <execution > <id > compile-scala</id > <phase > compile</phase > <goals > <goal > add-source</goal > <goal > compile</goal > </goals > </execution > <execution > <id > test-compile-scala</id > <phase > test-compile</phase > <goals > <goal > add-source</goal > <goal > testCompile</goal > </goals > </execution > </executions > <configuration > <scalaVersion > 2.11.8</scalaVersion > </configuration > </plugin > </plugins > </build > </project >
打包jar
运行
1 2 cd /usr/local/spark/ ./bin/spark-submit --class recommend.MovieLensALS ~/IdeaProjects/Film_Recommend_Dataframe/out/artifacts/Film_Recommend_Dataframe_jar/Film_Recommend_Dataframe.jar /input_spark 1
1 2 3 4 5 6 7 cd ~ mkdir movierecommendapp cd movierecommendapp npm install express -save npm install jade --save npm install body-parser --save npm install mysql --save
vim movierecommend.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 var express=require ('express' ); var bodyParser = require ('body-parser' ) const spawnSync = require ('child_process' ).spawnSync ; var app=express (); var mysql=require ('mysql' ); var http = require ("http" ); app.set ('view engine' , 'jade' ); app.set ('views' , './views' ); app.use (bodyParser.urlencoded ({extended : false })) app.use (bodyParser.json ()) var connection = mysql.createConnection ({ host : '127.0.0.1' , user : 'root' , password : '123456' , database : 'movierecommend' , port :'3306' }); connection.connect (); app.get ('/' ,function (req,res ) { res.render ('index' ); }) app.get ('/loginpage' ,function (req,res ) { res.render ('loginpage' ,{title :'登录' }); }) app.post ('/login' ,function (req,res ) { var name=req.body .username .trim (); var pwd=req.body .pwd .trim (); console .log ('username:' +name+'password:' +pwd); var selectMovieInfoSQL="select movieid,moviename,picture from movieinfo limit 1000" ; var movieinfolist=[]; connection.query (selectMovieInfoSQL,function (err,rows,fields ){ if (err) throw err; movieinfolist=rows; }); var selectSQL = "select * from user where username = '" +name+"' and password = '" +pwd+"'" ; connection.query (selectSQL,function (err,rows,fields ) { if (err) throw err; function randomFrom (lowerValue,upperValue ) { return Math .floor (Math .random () * (upperValue - lowerValue + 1 ) + lowerValue); } var lowerValue=0 ; var upperValue=movieinfolist.length ; var index=randomFrom (lowerValue,upperValue); var movielist=[]; var movieNumbers=10 ; for (var i=0 ;i<movieNumbers;i++){ index=randomFrom (lowerValue,upperValue); movielist.push ({movieid :movieinfolist[index].movieid ,moviename :movieinfolist[index].moviename ,picture :movieinfolist[index].picture }); } res.render ('personalratings' ,{title :'Welcome User' ,userid :rows[0 ].userid ,username :rows[0 ].username ,movieforpage :movielist}); }); }); app.get ('/registerpage' ,function (req,res ) { res.render ('registerpage' ,{title :'注册' }); }) app.post ('/register' ,function (req,res ) { var name=req.body .username .trim (); var pwd=req.body .pwd .trim (); var user={username :name,password :pwd}; connection.query ('insert into user set ?' ,user,function (err,rs ) { if (err) throw err; console .log ('register success' ); res.render ('registersuccess' ,{title :'注册成功' ,message :name}); }) }) app.post ('/submituserscore' ,function (req,res ) { var userid=req.body .userid ; var moviescores=[]; var movieids=[]; req.body .moviescore .forEach (function (score ){ moviescores.push ({moviescore :score}); }); req.body .movieid .forEach (function (id ){ movieids.push ({movieid :id}); }); connection.query ('delete from personalratings where userid=' +userid, function (err, result ) { if (err) throw err; console .log ('deleted' ); }); var mytimestamp =new Date ().getTime ().toString ().slice (1 ,10 ); for (var item in movieids){ var personalratings={userid :userid,movieid :movieids[item].movieid ,rating :moviescores[item].moviescore ,timestamp :mytimestamp}; connection.query ('insert into personalratings set ?' ,personalratings,function (err,rs ) { if (err) throw err; console .log ('insert into personalrating success' ); }); } var selectUserIdNameSQL='select userid,username from user where userid=' +userid; connection.query (selectUserIdNameSQL,function (err,rows,fields ){ if (err) throw err; res.render ('userscoresuccess' ,{title :'Personal Rating Success' ,user :rows[0 ]}); }); }); app.get ('/recommendmovieforuser' ,function (req,res ) { const userid=req.query .userid ; const username=req.query .username ; const path = '/input_spark' ; let spark_submit = spawnSync ('/usr/local/spark/bin/spark-submit' ,['--class' , 'recommend.MovieLensALS' ,' ~/IdeaProjects/Film_Recommend/out/artifacts/Film_Recommend_jar/Film_Recommend.jar' , path, userid],{ shell :true , encoding : 'utf8' }); var selectRecommendResultSQL="select recommendresult.userid,recommendresult.movieid,recommendresult.rating,recommendresult.moviename,movieinfo.picture from recommendresult inner join movieinfo on recommendresult.movieid=movieinfo.movieid where recommendresult.userid=" +userid; var movieinfolist=[]; connection.query (selectRecommendResultSQL,function (err,rows,fields ){ if (err) throw err; console .log ('read recommend result from database' ); for (var i=0 ;i<rows.length ;i++){ console .log ('forxunhuan:i=' +i); movieinfolist.push ({userid :rows[i].userid ,movieid :rows[i].movieid ,rating :rows[i].rating ,moviename :rows[i].moviename ,picture :rows[i].picture }); } res.render ('recommendresult' , {title : 'Recommend Result' , message : 'this is recommend for you' ,username :username,movieinfo :movieinfolist}) }); }) var server=app.listen (3000 ,function ( ) { console .log ("movierecommend server start......" ); })
vim index.jade
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 html head title!=title meta(charset='utf-8') meta(name='description') meta(name='keywords') meta(name='author') link(rel='shortcut icon', href='http://eduppp.cn/images/logo4.gif') link(rel='apple-touch-icon', href='http://eduppp.cn/images/logo.gif') style include css/index.css style(type='text/css'). #frame {/*----------图片轮播相框容器----------*/ position: absolute; /*--绝对定位,方便子元素的定位*/ width: 1500px; height: 75%; overflow: hidden;/*--相框作用,只显示一个图片---*/ border-radius:5px; } #dis {/*--绝对定位方便li图片简介的自动分布定位---*/ position: absolute; left: -50px; top: -10px; opacity: 0.5; } #dis li { display: inline-block; width: 200px; height: 20px; margin: 0 650px; float: left; text-align: center; color: #fff; border-radius: 10px; background: #000; } #photos img { float: left; width:1500px; height:75%; } #photos {/*---设置总的图片宽度--通过位移来达到轮播效果----*/ position: absolute;z-index:9px; width: calc(1500px * 5);/*---修改图片数量的话需要修改下面的动画参数*/ } .play{ animation: ma 20s ease-out infinite alternate;/**/ } @keyframes ma {/*---每图片切换有两个阶段:位移切换和静置。中间的效果可以任意定制----*/ 0%,20% { margin-left: 0px; } 25%,40% { margin-left: -1500px; } 45%,60% { margin-left: -3000px; } 65%,80% { margin-left: -4500px; } 85%,100% { margin-left: -6000px; } } .num{ position:absolute;z-index:10; display:inline-block; right:10px;top:550px; border-radius:100%; background:#778899; width:50px;height:50px; line-height:50px; cursor:pointer; color:#fff; background-clor:rgba(0,0,0,0.5); text-align:center; opacity:0.8; } .num:hover{background:#000;} .num:hover,#photos:hover{animation-play-state:paused;} .num:nth-child(2){margin-right:60px} .num:nth-child(3){margin-right:120px} .num:nth-child(4){margin-right:180px} .num:nth-child(5){margin-right:240px} #a1:hover ~ #photos{animation: ma1 .5s ease-out forwards;} #a2:hover ~ #photos{animation: ma2 .5s ease-out forwards;} #a3:hover ~ #photos{animation: ma3 .5s ease-out forwards;} #a4:hover ~ #photos{animation: ma4 .5s ease-out forwards;} #a5:hover ~ #photos {animation: ma5 .5s ease-out forwards;} @keyframes ma1 {0%{margin-left:-1200px;}100%{margin-left:-0px;} } @keyframes ma2 {0%{margin-left:-1200px;}100%{margin-left:-1500px;} } @keyframes ma3 {100%{margin-left:-3000px;} } @keyframes ma4 {100%{margin-left:-4500px;} } @keyframes ma5 {100%{margin-left:-6000px;} } body div#navigation 欢迎访问厦门大学数据库实验室电影推荐系统 div#logreg input(type='submit',value='登录',onclick="window.location='/loginpage'") input(type='submit', value='注册',onclick="window.location='/registerpage'") div#mid #frame a#a5.num 5 a#a4.num 4 a#a3.num 3 a#a2.num 2 a#a1.num 1 #photos.play img(src='http://img05.tooopen.com/products/20150130/44128217.jpg') img(src='http://image.17173.com/bbs/v1/2012/11/14/1352873759491.jpg') img(src='http://t1.27270.com/uploads/tu/201502/103/5.jpg') img(src='http://img.doooor.com/img/forum/201507/15/171203xowepc3ju9n9br9z.jpg') img(src='http://4493bz.1985t.com/uploads/allimg/170503/5-1F503140J0.jpg') ul#dis li 魔戒:霍比特人 li 魔境仙踪 li 阿凡达 li 大圣归来 li 拆弹专家
vim loginpage.jade
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 html head title!= title style. body{ background-image:url(https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1537261291133&di=c04553d39f158272a36be6e3ec0c8071&imgtype=0&src=http%3A%2F%2Fh.hiphotos.baidu.com%2Fzhidao%2Fpic%2Fitem%2Fc2fdfc039245d6885bc3be94a2c27d1ed21b2438.jpg); } #log{ padding-top: 2px; margin-top: 10%; margin-left: 37%; background: white; width: 25%; height: 40%; text-align: center; } body div#log form(action='/login', method='post') h1 用户登录 br span 帐号: input(type='text',name='username') br span 密码: input(type='password',name='pwd') br br input(type='submit',value='登录') br a(href='/registerpage', title='注册')注册 br a(href='/',title='主页')返回主页
vim personalratings.jade
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 html head title!= title body h1 热烈欢迎用户: #{username},请您为以下电影打分: form(action='/submituserscore', method='post') input(type='text',style="visibility: hidden;width:0px; height:0px;",name='userid', value=userid) table( cellpadding="5" cellspacing="5") tr -for(var i=0;i<movieforpage.length;i++) td //p 电影名称:#{movieforpage[i].moviename} input(type='text',style="visibility: hidden;width:0px; height:0px;",name='movieid', value=movieforpage[i].movieid) img(src=movieforpage[i].picture) br select(name='moviescore') option(value=1) 1 option(value=2) 2 option(value=3) 3 option(value=4) 4 option(value=5) 5 -if((i+1)%5==0) tr input(type='submit',value='提交')
vim recommendresult.jade
1 2 3 4 5 6 7 8 9 10 11 12 13 14 html head title!= title body h1 亲爱的用户:#{username},猜你喜欢电影: table( cellpadding="5" cellspacing="5") tr -for(var i=0;i<movieinfo.length;i++) td p #{movieinfo[i].moviename} img(src=movieinfo[i].picture) -if((i+1)%5==0) tr
vim registerpage.jade
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 html head title!= title style. body{ background-image:url(https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1537261291133&di=c04553d39f158272a36be6e3ec0c8071&imgtype=0&src=http%3A%2F%2Fh.hiphotos.baidu.com%2Fzhidao%2Fpic%2Fitem%2Fc2fdfc039245d6885bc3be94a2c27d1ed21b2438.jpg); } #reg{ padding-top: 2px; margin-top: 10%; margin-left: 37%; background: white; width: 25%; height: 40%; text-align: center; } body div#reg form(action='/register', method='post') h1 用户注册 br span 帐号: input(type='text',name='username') br span 密码: input(type='password',name='pwd') br br input(type='submit',value='注册')
vim registersuccess.jade
1 2 3 4 5 6 7 html head title!= title body h1 热烈欢迎用户: #{message} a(href='loginpage' tile='请点击这里登录')请点击这里登录
vim userscoresuccess.jade
1 2 3 4 5 6 7 html head title!= title body h1 亲爱的用户:#{user.username},祝贺评分成功! a(href='/recommendmovieforuser?userid=#{user.userid}&username=#{user.username}') 点击这里为您推荐电影
1 2 3 mkdir css cd css vim index.css
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 body { background-image :url (https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1537261291133&di=c04553d39f158272a36be6e3ec0c8071&imgtype=0&src=http%3A%2F%2Fh.hiphotos.baidu.com%2Fzhidao%2Fpic%2Fitem%2Fc2fdfc039245d6885bc3be94a2c27d1ed21b2438.jpg ); } #navigation { background : #888888 ; height : 10% ; width : 100% ; text-align : center; font-size : 50px ; margin-bottom :5px ; } #logreg { float : right; padding-top : 2% ; padding-right : 20px ; color : #FFF ; background-color : rgba (0 ,0 ,0 ,0 ) } #mid { padding-top : 50px ; height : 75% ; width : 1500px ; margin : 0 auto; } #picchange { text-align : center; height : 75% ; } #contain { position : relative; margin : auto; width : 600px ; height : 200px ; text-align : center; font-family : Arial; color : #FFF ; overflow : hidden; } #contain ul { margin : 10px 0 ; padding : 0 ; width : 1800px ; transition :all 0.5s ; } #contain li { float : left; width : 600px ; height : 200px ; list-style : none; line-height : 200px ; font-size : 36px ; } #one { background : #9fa8ef ; } #two { background : #ef9fb1 ; } #three { background : #9fefc3 ; } @keyframes marginLeft{ 0% {margin-left : 0 ;} 28.5% {margin-left : 0 ;} 33.3% {margin-left : -600px ;} 62% {margin-left : -600px ;} 66.7% {margin-left : -1200px ;} 95.2% {margin-left : -1200px ;} 100% {margin-left : 0 ;} } #slide-auto { animation :marginLeft 10.5s infinite; }
然后 node js
完成