0%

367. 有效的完全平方数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bool isPerfectSquare(int num){
long long left=1;
long long right=num;
long long middle;
long long temp=0;
while(left<=right){
middle=(left+right)/2;
if(middle*middle<=num){
if(middle*middle==num){
temp=middle;
}
left=middle+1;
}
else if(middle*middle>num){
right=middle-1;
}
}
if(temp==0) return false;
else return temp;
}

69. x 的平方根

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int mySqrt(int x){
long long temp;
long long left = 1;
long long right = x;
long long middle;
if (x==0)temp=0;
else if (x==1) temp=1;
else if(x!=0&&x!=1){
while(left<=right){
middle=(left+right)/2;
if(middle<=x/middle){
temp=middle;
left=middle+1;
}
else if(middle>x/middle){
right=middle-1;
}
}

}
return temp;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include<iostream>
#include<vector>
using namespace std;
int main(){
int temp;
int n;
cin>>n;
// vector<int> x;
// for(int i=0;i<n;i++)
// {
// x.push_back(i);
// }
int left=1;
int right=n;
int middle;
// int target=2;

while(left<=right){
middle=(left+right)/2;
if(middle>n/middle){
right=middle-1;
}
else if(middle<=n/middle){ //middle^2<n
temp=middle; //因为2^2<8<2^3 所以直接取这边的middle
left=middle+1;
}

}
cout<<temp;


return 0;
}

35. 搜索插入位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Solution {
public:
int searchInsert(vector<int>& nums, int target) {
int left=0;
int right=nums.size()-1;
int middle;
while(left<=right){
middle=(left+right)/2;
if (nums[middle]>target){
right=middle-1;
}
else if (nums[middle]<target){
left=middle+1;
}
else{
return middle;
}
}
return right+1;

}
};

使用方法:

1.binary_search:查找某个元素是否出现

a.函数模板:

binary_search(arr[],arr[]+size,indx)

b.参数说明:

arr[]:数组首地址
size:数组元素个数
indx:需要查找的值

c.函数功能:

在数组中以二分法检索的方式查找,若在数组中查找到indx元素则真,若查找不到则返回值是假

2.lower_bound:查找第一个大于或等于某个元素的位置

a.函数模板:

lower_bound(arr[],arr[]+size,indx)

b.参数说明:

arr[] : 数组首地址
size : 数组元素的个数
indx : 需要查找的值

c.函数功能:

函数lower_bound()在first和last的前闭后开区间进行二分查找,返回大于或等于val的第一个元素的位置,如果所有元素都小于val,则返回last的位置。
举例:
一个数组number序列为:4,10,11,30,69,70,96,100.设要插入数字3,9,111.pos为要插入的位置的下标,则
 注意因为返回值是一个指针,所以减去数组的指针就是int变量了
  pos = lower_bound( number, number + 8, 3) - number,pos = 0.即number数组的下标为0的位置。
  pos = lower_bound( number, number + 8, 9) - number, pos = 1,即number数组的下标为1的位置(即10所在的位置)。
  pos = lower_bound( number, number + 8, 111) - number, pos = 8,即number数组的下标为8的位置(但下标上限为7,所以返回最后一个元素的下一个元素)。
e.注意:函数lower_bound()在first和last中的前闭后开区间进行二分查找,返回大于或等于val的第一个元素位置。如果所有元素都小于val,则返回last的位置,且last的位置是越界的!

返回查找元素的第一个可安插位置,也就是“元素值>=查找值”的第一个元素的位置

3.upper_bound : 查找第一个大于某个元素的位置

a. 函数模板 : upper_bound(arr[] , arr[]+size , indx)
b. 参数说明:

arr[] : 数组首地址
size : 数组元素个数
indx : 需要查找的值

c. 函数功能 :

函数upper_bound()返回的在前闭后开区间查找的关键字的上界,返回大于val的第一个元素位置
  例如:一个数组number序列1,2,2,4.upper_bound(2)后,返回的位置是3(下标)也就是4所在的位置,同样,如果插入元素大于数组中全部元素,返回的是last。(注意:数组下标越界)
  返回查找元素的最后一个可安插位置,也就是“元素值>查找值”的第一个元素的位置 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include<iostream>
#include<algorithm>
using namespace std;
int main()
{
int a[100]= {5,9,11,30,69,70,96,100};
int b=binary_search(a,a+9,5);//查找成功,返回1
cout<<"在数组中查找元素5,结果为:"<<b<<endl;
int c=binary_search(a,a+9,99);//查找失败,返回0
cout<<"在数组中查找元素99,结果为:"<<b<<endl;
int d=lower_bound(a,a+9,9)-a;
cout<<"在数组中查找第一个大于等于9的元素位置,结果为:"<<d<<endl;
int e=lower_bound(a,a+9,101)-a;
cout<<"在数组中查找第一个大于等于101的元素位置,结果为:"<<e<<endl;
int f=upper_bound(a,a+9,10)-a;
cout<<"在数组中查找第一个大于10的元素位置,结果为:"<<f<<endl;
int g=upper_bound(a,a+9,101)-a;
cout<<"在数组中查找第一个大于101的元素位置,结果为:"<<g<<endl;
}

704. 二分查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Solution {
public:
int search(vector<int>& nums, int target) {
int left=0;
int right=nums.size()-1;
while(left<=right){
int middle=(left+right)/2;
if (nums[middle]>target){
right=middle-1;
}
else if (nums[middle]<target){
left=middle+1;
}
else{
return middle;
}

}
return -1;
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int search(int* nums, int numsSize, int target){

int left=0;
int right=numsSize-1;
while(left<=right){
int middle=(left+right)/2;
if (nums[middle]>target){
right=middle-1;
}
else if (nums[middle]<target){
left=middle+1;
}
else{
return middle;
}

}
return -1;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include<iostream>
#include<vector>
using namespace std;
int main(){
vector <int> nums;
int n;
cin>>n;
int target;
cin>>target;
while(n--){
int temp;
cin>>temp;
nums.push_back(temp);
}
int left=0;
int right=nums.size()-1;
while(left<=right){
int middle=(left+right)/2;
if(nums[middle]>target){
right=middle-1;
}
else if(nums[middle]<target){
left=middle+1;
}
else {
cout<<middle;
break;
}


}

return 0;
}

1. vector:

1.1 vector 说明

  • vector是向量类型,可以容纳许多类型的数据,因此也被称为容器
  • (可以理解为动态数组,是封装好了的类)
  • 进行vector操作前应添加头文件#include <vector>

1.2 vector初始化:

方式1.

1
2
3
//定义具有10个整型元素的向量(尖括号为元素类型名,它可以是任何合法的数据类型),不具有初值,其值不确定
vector<int>a(10);
12

方式2.

1
2
//定义具有10个整型元素的向量,且给出的每个元素初值为1
vector<int>a(10,1);

方式3.

1
2
//用向量b给向量a赋值,a的值完全等价于b的值
vector<int>a(b);

方式4.

1
2
//将向量b中从0-2(共三个)的元素赋值给a,a的类型为int型
vector<int>a(b.begin(),b.begin+3);

方式5.

1
2
3
 //从数组中获得初值
int b[7]={1,2,3,4,5,6,7};
vector<int> a(b,b+7);

1.3 vector对象的常用内置函数使用(举例说明)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include<vector>
vector<int> a,b;
//b为向量,将b的0-2个元素赋值给向量a
a.assign(b.begin(),b.begin()+3);
//a含有4个值为2的元素
a.assign(4,2);
//返回a的最后一个元素
a.back();
//返回a的第一个元素
a.front();
//返回a的第i元素,当且仅当a存在
a[i];
//清空a中的元素
a.clear();
//判断a是否为空,空则返回true,非空则返回false
a.empty();
//删除a向量的最后一个元素
a.pop_back();
//删除a中第一个(从第0个算起)到第二个元素,也就是说删除的元素从a.begin()+1算起(包括它)一直到a.begin()+3(不包括它)结束
a.erase(a.begin()+1,a.begin()+3);
//在a的最后一个向量后插入一个元素,其值为5
a.push_back(5);
//在a的第一个元素(从第0个算起)位置插入数值5,
a.insert(a.begin()+1,5);
//在a的第一个元素(从第0个算起)位置插入3个数,其值都为5
a.insert(a.begin()+1,3,5);
//b为数组,在a的第一个元素(从第0个元素算起)的位置插入b的第三个元素到第5个元素(不包括b+6)
a.insert(a.begin()+1,b+3,b+6);
//返回a中元素的个数
a.size();
//返回a在内存中总共可以容纳的元素个数
a.capacity();
//将a的现有元素个数调整至10个,多则删,少则补,其值随机
a.resize(10);
//将a的现有元素个数调整至10个,多则删,少则补,其值为2
a.resize(10,2);
//将a的容量扩充至100,
a.reserve(100);
//b为向量,将a中的元素和b中的元素整体交换
a.swap(b);
//b为向量,向量的比较操作还有 != >= > <= <
a==b;

2. 顺序访问vector的几种方式,举例说明

2.1. 对向量a添加元素的几种方式

1.向向量a中添加元素

1
2
3
4
vector<int>a;
for(int i=0;i<10;++i){
a.push_back(i);
}

2.从数组中选择元素向向量中添加

1
2
3
4
5
int a[6]={1,2,3,4,5,6};
vector<int> b;
for(int i=0;i<=4;++i)
{b.push_back(a[i]);}

3.从现有向量中选择元素向向量中添加

1
2
3
4
5
6
7
int a[6]={1,2,3,4,5,6};
vector<int>b;
vector<int>c(a,a+4);
for(vector<int>::iterator it=c.begin();it<c.end();++it)
{
b.push_back(*it);
}

4.从文件中读取元素向向量中添加

1
2
3
ifstream in("data.txt");
vector<int>a;
for(int i;in>>i){a.push_back(i);}

5.常见错误赋值方式

1
2
vector<int>a;
for(int i=0;i<10;++i){a[i]=i;}//下标只能用来获取已经存在的元素

2.2 从向量中读取元素

1.通过下标方式获取

1
2
3
int a[6]={1,2,3,4,5,6};
vector<int>b(a,a+4);
for(int i=0;i<=b.size()-1;++i){cout<<b[i]<<endl;}

2.通过迭代器方式读取

1
2
3
int a[6]={1,2,3,4,5,6};
vector<int>b(a,a+4);
for(vector<int>::iterator it=b.begin();it!=b.end();it++){cout<<*it<<" ";}

3.几个常用的算法

1
2
3
4
5
6
7
8
9
10
#include<algorithm>
//对a中的从a.begin()(包括它)到a.end()(不包括它)的元素进行从小到大排列
sort(a.begin(),a.end());
//对a中的从a.begin()(包括它)到a.end()(不包括它)的元素倒置,但不排列,如a中元素为1,3,2,4,倒置后为4,2,3,1
reverse(a.begin(),a.end());
//把a中的从a.begin()(包括它)到a.end()(不包括它)的元素复制到b中,从b.begin()+1的位置(包括它)开始复制,覆盖掉原有元素
copy(a.begin(),a.end(),b.begin()+1);
//在a中的从a.begin()(包括它)到a.end()(不包括它)的元素中查找10,若存在返回其在向量中的位置
find(a.begin(),a.end(),10);

1.[left,right]

IMG_0057

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
left=0;
right=num.size()-1; //-1是因为]的原因
while(left<=right) //为什么= 因为[1,1]合法
{
middle=(left+right)/2;
if(num[middle]>target)
{
right=middle-1; //为什么是middle-1 因为']'右区间去了闭,所以要减一避免重复
}
else if(num[middle]<target){
left=middle+1; //为什么+1,和上面同理
}
else{
return middle;
}

}

B4B0D0DA4054146C02000B88684BE541

2.[left,right)

IMG_0057

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
left=0;
right=num.size(); //)的原因
while(left<right) //为什么是<,因为[),[1,1)开区间的取不到
{
middle=(left+right)/2;
if (num[middle]>target){
right=middle; //为什么不加1,因为)开区间取不到
}
else if(num[middle]<target){
left=middle+1; //闭区间取得到,所以+1
}
else{
return middle;
}
}

B4B0D0DA4054146C02000B88684BE541

题目

模板题704. 二分查找

其他:

35 搜索插入位置

hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

一、安装hive

1. 下载并解压hive源程序

Hive下载地址

1
2
3
4
sudo tar -zxvf ./apache-hive-1.2.1-bin.tar.gz -C /usr/local   # 解压到/usr/local中
cd /usr/local/
sudo mv apache-hive-1.2.1-bin hive # 将文件夹名改为hive
sudo chown -R dblab:dblab hive # 修改文件权限

2. 配置环境变量
为了方便使用,我们把hive命令加入到环境变量中去,编辑~/.bashrc文件vim ~/.bashrc,在最前面一行添加:

1
2
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin

保存退出后,运行source ~/.bashrc使配置立即生效。

3. 修改/usr/local/hive/conf下的hive-site.xml
将hive-default.xml.template重命名为hive-default.xml;新建一个文件touch hive-site.xml,并在hive-site.xml中粘贴如下配置信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
</configuration>

二、安装并配置mysql

1.Ubuntu下mysql的安装请参考Ubuntu安装MySQL
2.下载mysql jdbc 包,下载地址

https://downloads.mysql.com/archives/c-j/

1
2
tar -zxvf mysql-connector-java-5.1.40.tar.gz   #解压
cp mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar /usr/local/hive/lib #将mysql-connector-java-5.1.40-bin.jar拷贝到/usr/local/hive/lib目录下

3. 启动并登陆mysql shell

1
2
service mysql start #启动mysql服务
mysql -u root -p #登陆shell界面

4. 新建hive数据库

1
mysql> create database hive;    #这个hive数据库与hive-site.xml中localhost:3306/hive的hive对应,用来保存hive元数据

5. 配置mysql允许hive接入:

1
2
3
4
5
6
7
8
9
10
11
这条MySQL语句是为用户'hive'授予所有数据库的所有权限,并设置'hive'用户的密码为'hive'。这个命令通常用于在授权给用户访问数据库之前,先为该用户创建一个数据库账户并为其授权。

在这个命令中,'identified by'语句用于设置用户的密码。'hive'是数据库账户的用户名和密码,它将用于在hive-site.xml配置文件中连接数据库。

请注意,为了安全起见,建议不要在授权语句中明文指定密码。相反,可以使用以下语句来创建用户并设置其密码:

CREATE USER 'hive'@'localhost' IDENTIFIED BY 'password';
然后使用以下命令来授予用户所有数据库的所有权限:

GRANT ALL ON *.* TO 'hive'@'localhost';
这样做可以保护您的数据库免受潜在的安全威胁。
1
2
mysql> grant all on *.* to hive@localhost identified by 'hive';   #将所有数据库的所有表的所有权限赋给hive用户,后面的hive是配置hive-site.xml中配置的连接密码
mysql> flush privileges; #刷新mysql系统权限关系表

6. 启动hive
启动hive之前,请先启动hadoop集群。

1
2
start-all.sh #启动hadoop
hive #启动hive

解决Hive启动,Hive metastore database is not initialized的错误。出错原因:重新安装Hive和MySQL,导致版本、配置不一致。在终端执行如下命令:

1
schematool -dbType mysql -initSchema

Hive 分布现在包含一个用于 Hive Metastore 架构操控的脱机工具,名为 schematool.此工具可用于初始化当前 Hive 版本的 Metastore 架构。此外,其还可处理从较旧版本到新版本的架构升级。

https://dblab.xmu.edu.cn/blog/996/

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,目前属于 Oracle 旗下产品。MySQL 最流行的关系型数据库管理系统,在 WEB 应用方面MySQL是最好的 RDBMS (Relational Database Management System,关系数据库管理系统) 应用软件之一。

一、安装MySQL

使用以下命令即可进行mysql安装,注意安装前先更新一下软件源以获得最新版本:

1
2
sudo apt-get update  #更新软件源
sudo apt-get install mysql-server #安装mysql

上述命令会安装以下包:
apparmor
mysql-client-5.7
mysql-common
mysql-server
mysql-server-5.7
mysql-server-core-5.7
因此无需再安装mysql-client等。安装过程会提示设置mysql root用户的密码,设置完成后等待自动安装即可。默认安装完成就启动了mysql。

  • 启动和关闭mysql服务器:

    1
    2
    service mysql start
    service mysql stop

1680850535731

  • 确认是否启动成功,mysql节点处于LISTEN状态表示启动成功:

    1
    sudo netstat -tap | grep mysql

    1680850589215

  • 进入mysql shell界面:

1
mysql -u root -p

1680850651984

解决利用sqoop导入MySQL中文乱码的问题(可以插入中文,但不能用sqoop导入中文)
导致导入时中文乱码的原因是character_set_server默认设置是latin1,如下图。

1
show variables like "char%";

未修改server 编码

可以单个设置修改编码方式set character_set_server=utf8;但是重启会失效,建议按以下方式修改编码方式。
(1)编辑配置文件。sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf
(2)在[mysqld]下添加一行character_set_server=utf8。如下图

未修改server 编码

(3)重启MySQL服务。service mysql restart
(4)登陆MySQL,并查看MySQL目前设置的编码。show variables like "char%";

未修改server 编码

但是我的就直接是这样子的了:

1680850651984

二、MySQL常用操作

注意:MySQL中每个命令后都要以英文分号;结尾。
1、显示数据库
mysql> show databases;
MySql刚安装完有两个数据库:mysql和test。mysql库非常重要,它里面有MySQL的系统信息,我们改密码和新增用户,实际上就是用这个库中的相关表进行操作。

2、显示数据库中的表
mysql> use mysql; (打开库,对每个库进行操作就要打开此库)
Database changed
mysql> show tables;

3、显示数据表的结构:
describe 表名;

4、显示表中的记录:
select * from 表名;
例如:显示mysql库中user表中的纪录。所有能对MySQL用户操作的用户都在此表中。
select * from user;

5、建库:
create database 库名;
例如:创建一个名字位aaa的库
mysql> create database aaa;

6、建表:
use 库名;
create table 表名 (字段设定列表);
例如:在刚创建的aaa库中建立表person,表中有id(序号,自动增长),xm(姓名),xb(性别),csny(出身年月)四个字段
use aaa;
mysql> create table person (id int(3) auto_increment not null primary key, xm varchar(10),xb varchar(2),csny date);
可以用describe命令察看刚建立的表结构。
mysql> describe person;

未修改server 编码

7、增加记录
例如:增加几条相关纪录。
mysql>insert into person values(null,’张三’,’男’,’1997-01-02’);
mysql>insert into person values(null,’李四’,’女’,’1996-12-02’);
注意,字段的值(’张三’,’男’,’1997-01-02’)是使用两个英文的单撇号包围起来,后面也是如此。
因为在创建表时设置了id自增,因此无需插入id字段,用null代替即可。
可用select命令来验证结果。
mysql> select * from person;

未修改server 编码

8、修改纪录
例如:将张三的出生年月改为1971-01-10
mysql> update person set csny=’1971-01-10’ where xm=’张三’;

9、删除纪录
例如:删除张三的纪录。
mysql> delete from person where xm=’张三’;

10、删库和删表
drop database 库名;
drop table 表名;

11、查看mysql版本
在mysql5.0中命令如下:
show variables like ‘version’;
或者:select version();

https://dblab.xmu.edu.cn/blog/1002/

1.Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

为employee.json创建DataFrame,并写出Python语句完成下列操作:

(1) 查询所有数据;

(2) 查询所有数据,并去除重复的数据;

(3) 查询所有数据,打印时去除id字段;

(4) 筛选出age>30的记录;

(5) 将数据按age分组;

(6) 将数据按name升序排列;

(7) 取出前3行数据;

(8) 查询所有记录的name列,并为其取别名为username;

(9) 查询年龄age的平均值;

(10) 查询年龄age的最小值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
首先为employee.json创建DataFrame,并写出Python语句完成下列操作:
创建DataFrame
答案:
>>> spark=SparkSession.builder().getOrCreate()
>>> df = spark.read.json("file:///usr/local/spark/employee.json")
(1) 查询DataFrame的所有数据
答案:>>> df.show()
(2) 查询所有数据,并去除重复的数据
答案:>>> df.distinct().show()
(3) 查询所有数据,打印时去除id字段
答案:>>> df.drop("id").show()
(4) 筛选age>20的记录
答案:>>> df.filter(df.age > 30 ).show()
(5) 将数据按name分组
答案:>>> df.groupBy("name").count().show()
(6) 将数据按name升序排列
答案:>>> df.sort(df.name.asc()).show()
(7)取出前3行数据
答案:>>> df.take(3) 或python> df.head(3)
(8)查询所有记录的name列,并为其取别名为username
答案:>>> df.select(df.name.alias("username")).show()
(9)查询年龄age的平均值
答案:>>> df.agg({"age": "mean"}).show()
(10)查询年龄age的最大值
答案:>>> df.agg({"age": "max"}).show()

2.编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

​ 请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

假设当前目录为/usr/local/spark/mycode/rddtodf,在当前目录下新建一个目录mkdir -p src/main/python,然后在目录/usr/local/spark/mycode/rddtodf/src/main/python下新建一个rddtodf.py,复制下面代码;(下列两种方式任选其一)

方法一:利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
if __name__ == "__main__":
sc = SparkContext("local","Simple App")
peopleRDD = sc.textFile("file:///usr/local/spark/employee.txt")
rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
rowRDD.createOrReplaceTempView("employee")
personsDF = spark.sql("select * from employee")
personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

方法二:使用编程接口,构造一个schema并将其应用在已知的RDD上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
if __name__ == "__main__":
sc = SparkContext("local","Simple App")
peopleRDD = sc.textFile("file:///usr/local/spark/employee.txt")
schemaString = "id name age"
fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
schema = StructType(fields)
rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2])))
employeeDF = spark.createDataFrame(rowRDD, schema)
employeeDF.createOrReplaceTempView("employee")
results = spark.sql("SELECT * FROM employee")
results.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

1
`python3 ./ rddtodf.py`

3. 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表5-2所示的两行数据。

5-2 employee表原有数据

idnamegenderAge
1AliceF22
2JohnM25
1
2
3
4
5
6
mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

5-3 employee表新增数据

idnamegenderage
3MaryF26
4TomM23
1
答案:假设当前目录为/usr/local/spark/mycode/testmysql,在当前目录下新建一个目录mkdir -p src/main/python,然后在目录/usr/local/spark/mycode/testmysql/src/main/python下新建一个testmysql.py,复制下面代码;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
if __name__ == "__main__":

sc = SparkContext( 'local', 'test')
spark=SQLContext(sc)
jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user", "root").option("password", "123").load()
jdbcDF.filter(jdbcDF.age>20).collect()//检测是否连接成功
studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" "))
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3])))
employeeDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = '123'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee','append', prop)
jdbcDF.collect()
jdbcDF.agg({"age": "max"}).show()
jdbcDF.agg({"age": "sum"}).show()


然后我们,执行以下指令

1
2
3
 
python3 ./ rddtodf.py

在终端下,我们就可以看到结果了。