黎明晨光

生活从一点一滴开始


  • 首页

  • 标签

  • 分类

  • 归档

  • 搜索

MapReduce

发表于 2019-08-16 | 分类于 大数据平台 , Hadoop

Map/Reduce 是一个分布式运算程序的编程框架,Map/Reduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 hadoop 集群上。每个Map/Reduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。一个Map/Reduce作业的输入和输出类型如下所示:

(input) <k1, v1> -> map-> <k2, v2> -> Shuff-> <k2, list[v2]> -> reduce -> <k3, v3> (output)

简单在于其编程模型只包含map和reduce两个过程,map的主要输入是一对<key , value>值,经过map计算后输出一对<key , value>值;然后将相同key合并,形成<key , value集合>;再将这个<key , value集合>输入reduce,经过计算输出零个或多个<key , value>对。

img

原理

img

系统中有两类主要的进程节点:master(单点),worker(多个)。其中,worker根据不同的计算任务,又分为map worker(对应上图中的Map phase)、reduce worker(对应上图中的Reduce phase)。

  • master是系统的中心节点,负责为worker节点分配计算任务,同时监控worker节点的状态。如果某个worker计算太慢,或者宕机,master会将该worker进程负责的计算任务转移到其他进程。
  • worker包括Map与Reduce
  1. 首先将文件分割成<key,value>的键值对。Map通过 RecordReader 读取Input的<key,value>对,map根据用户自定义的任务,运行完毕后,输出另外一系列<key,value>,
  2. Shuffle 阶段需要从所有map主机上把相同的key 的 key value对组合在一起,(也就是这里省去的Combine阶段)。
  3. Partitioner组件会把 key放进一个 hash函数里,然后得到结果。如果两个 key 的哈希值 一样,他们的 <key,value>对就被放到同一个 reduce 函数里。我们也把分配到同一个 reduce函数里的<key,value>对叫做一个reduce partition。
  4. reduce() 函数以 key 及对应的 value 列表作为输入,按照用户自己的程序逻辑,经合并 key 相同的 value 值后,产 生另外一系列 <key,value> 对作为最终输出写入HDFS。

应用

HDFS

发表于 2019-08-16 | 分类于 大数据平台 , Hadoop

HDFS框架

img

  • 主从结构:
    1. 主节点,只有一个 : namenode
    2. 从节点,有很多个 :datanodes
  • namenode负责:
    1. 接收用户请求操作
    2. 维护文件信息系统的目录结构
    3. 管理文件与block之间的关系,block与datanade之间的关系
      注意:namenode归根结底要放在磁盘上的以保证数据的持久性,但是为了提高效率,一般在hadoop运行时时放在内存中的
  • datanode负责:
      1. 存储文件
         2. 文件被分成`block`存储字磁盘上
         3. 为保证数据安全,文件会有多个副本
    

HDFS shell命令

命令格式: bin/hadoop fs <args> URI

                     URI:scheme://authority/path—HDFS文件系统
                                file://authority/path—本地文件系统

命令 说明
cat 使用方法:hadoop fs -cat URI [URI …]
将路径指定文件的内容输出到stdout,成功返回0,失败返回-1。
eg:hadoop fs -cat hdfs://host1:port1/file1 hdfs://host2:port2/file2
   hadoop fs -cat file:///file3 /user/hadoop/file4
chgrp 使用方法:hadoop fs -chgrp [-R] GROUP URI [URI …]
                   -R:递归改变文件的目录结构
改变文件所属的组。
chmod 使用方法:hadoop fs -chmod [-R] <MODE[,MODE]... OCTALMODE> URI [URI …]
       -R:递归改变文件的目录结构
改变文件的权限。
chown 使用方法:hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]
     -R:递归改变文件的目录结构
改变文件的拥有者。
copyFromLocal 使用方法:hadoop fs -copyFromLocal <localsrc> URI
除了限定源路径是一个本地文件外,和put命令相似
copyToLocal 使用方法:hadoop fs -copyToLocal [-ignorecrc] [-crc] URI <localdst>
除了限定目标路径是一个本地文件外,和get命令类似。
cp 使用方法:hadoop fs -cp URI [URI …] <dest>
将文件从源路径复制到目标路径。这个命令允许有多个源路径,此时目标路径必须是一个目录。成功返回0,失败返回-1。
eg:hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir
du 使用方法:hadoop fs -du URI [URI …]
显示目录中所有文件的大小,或者当只指定一个文件时,显示此文件的大小。成功返回0,失败返回-1。
eg:hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://host:port/user/hadoop/dir1
dus 使用方法:hadoop fs -dus <args>
显示文件的大小。
expunge 使用方法:hadoop fs -expunge
清空回收站。
get 使用方法:hadoop fs -get [-ignorecrc] [-crc] <src> <localdst>
复制文件到本地文件系统。可用-ignorecrc选项复制CRC校验失败的文件。使用-crc选项复制文件以及CRC信息。
eg:hadoop fs -get /user/hadoop/file localfile
  hadoop fs -get hdfs://host:port/user/hadoop/file localfile
getmerge 使用方法:hadoop fs -getmerge <src> <localdst> [addnl]
接受一个源目录和一个目标文件作为输入,并且将源目录中所有的文件连接成本地目标文件。addnl是可选的,用于指定在每个文件结尾添加一个换行符。
ls 使用方法:hadoop fs -ls <args>
如果是文件,则按照如下格式返回文件信息:文件名 <副本数> 文件大小 修改日期 修改时间 权限 用户ID 组ID
如果是目录,则返回它直接子文件的一个列表,就像在Unix中一样。目录返回列表的信息如下目录名 <dir> 修改日期 修改时间 权限 用户ID 组ID
成功返回0,失败返回-1。
lsr ls命令的递归版本。类似于Unix中的ls -R。
mkdir 使用方法:hadoop fs -mkdir <paths>
接受路径指定的uri作为参数,创建这些目录。其行为类似于Unix的mkdir -p,它会创建路径中的各级父目录。成功返回0,失败返回-1。
eg:hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
movefromLocal 使用方法:dfs -moveFromLocal <src> <dst>
mv 使用方法:hadoop fs -mv URI [URI …] <dest>
将文件从源路径移动到目标路径。这个命令允许有多个源路径,此时目标路径必须是一个目录。不允许在不同的文件系统间移动文件。
eg:hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
put 使用方法:hadoop fs -put <localsrc> ... <dst>
从本地文件系统中复制单个或多个源路径到目标文件系统。也支持从标准输入中读取输入写入目标文件系统。成功返回0,失败返回-1。
eg:hadoop fs -put localfile /user/hadoop/hadoopfile
rm 使用方法:hadoop fs -rm URI [URI …]
删除指定的文件。只删除非空目录和文件。请参考rmr命令了解递归删除。成功返回0,失败返回-1。
eg:hadoop fs -rm hdfs://host:port/file /user/hadoop/emptydir
rmr 使用方法:hadoop fs -rmr URI [URI …]
delete的递归版本。
hadoop fs -rmr /user/hadoop/dir
setrep 使用方法:hadoop fs -setrep [-R] <path>
改变一个文件的副本系数。-R选项用于递归改变目录下所有文件的副本系数。成功返回0,失败返回-1。
hadoop fs -setrep -w 3 -R /user/hadoop/dir1
stat 使用方法:hadoop fs -stat URI [URI …]
返回指定路径的统计信息。
hadoop fs -stat path
tail 使用方法:hadoop fs -tail [-f] URI
将文件尾部1K字节的内容输出到stdout。支持-f选项,行为和Unix中一致。成功返回0,失败返回-1。
test 使用方法:hadoop fs -test -[ezd] URI
     -e 检查文件是否存在。如果存在则返回0。
     -z 检查文件是否是0字节。如果是则返回0。
     -d如果路径是个目录,则返回1,否则返回0。
text 使用方法:hadoop fs -text <src>
将源文件输出为文本格式。允许的格式是zip和TextRecordInputStream。
touchz 使用方法:hadoop fs -touchz URI [URI …]
创建一个0字节的空文件。成功返回0,失败返回-1。

python

发表于 2019-08-15 | 分类于 编程语言 , python

条件语句

1
print('True') if condition else print('False')

基本

命令 说明
max(num,key=func) 按照func返回num中的最大值

sys

getsizeof()

os

environ()函数

os.environ[‘USER’] 当前使用用户
os.environ[‘SHELL’] 使用shell的类型
os.environ[‘LAN’] 使用的语言

numpy

命令 说明
random.randint(start,ene,size) start与end:取值范围
size:数组的容量
np.logical_and(x1, x2, *args, **kwargs) 逻辑与
np.logical_or(x1, x2, *args, **kwargs) 逻辑或
np.logical_not(x, *args, **kwargs) 逻辑非

pandas

Dataframe

命令 说明
df.iloc[num] 指定行。df.iloc[0]指定第一行
df(df.b.isin([5,13])) 删除b列包含5、13的列
df['add_column']=1 df添加列add_column
df.drop() 删除行或者列
labels:删除的行或者列名
axis:行->0;列->1
index :直接指定要删除的行
columns:直接指定要删除的列
inplace=False:默认该删除操作不改变原数据,而是返回一个执行删除操作后的新dataframe;
inplace=True:则会直接在原数据上进行删除操作,删除后无法返回。
df.dropna() 删除空行
axis:0-行操作(默认),1-列操作
how:any-只要有空值就删除(默认),all-全部为空值才删除
inplace:False-返回新的数据集(默认),True-在愿数据集上操作
df.rename() df.rename(columns={'A':'a', 'B':'b', 'C':'c'}, inplace = True)
将列名修改为`[‘a’,’b’,’c’]
df.columns= ['a','b','c'] 将列名修改为['a','b','c']
df.reset_index(列名,inplace) 索引转化为列
列名:可以有,可以没有。多索引时,可以指定索引
inplace:替换原DataFrame
df.set_index(列名) 列转化为索引
df.T DataFrame转置
df.merge() 类似于数据库的join,返回DataFrame
df['列名'].tolist() 将列转换为list

matplotlib

https://serverpoolauth.ops.ctripcorp.com

sklearn

特征工程

1
from sklearn.preprocessing import *
命令 说明
KBinsDiscretizer - 对数据进行分箱(离散化) ,不能处理空值
- 返回np.array数据
- 参数
  n_bins:分箱的数量,默认值是5,也可以是列表,指定各个特征的分箱数量,例如,[feature1_bins,feature2_bins,…]

n_bins:分箱的数量,默认值是5,也可以是列表,指定各个特征的分箱数量,例如,[feature1_bins,feature2_bins,…]

encode:编码方式,{‘onehot’, ‘onehot-dense’, ‘ordinal’}, (default=’onehot’)

  • onehot:以onehot方式编码,返回稀疏矩阵
  • onehot-dense:以onehot方式编码,返回密集矩阵
  • ordinal:以ordinal方式编码,返回分箱的序号

strategy:定义分箱宽度的策略,{‘uniform’, ‘quantile’, ‘kmeans’}, (default=’quantile’)

  • uniform:每个分箱等宽
  • quantile:每个分箱中拥有相同数量的数据点
  • kmeans:每个箱中的值具有与1D k均值簇最近的中心

其他

断言 assert

hadoop

发表于 2019-08-14 | 分类于 大数据平台 , Hadoop

1565782138575

Apache Hadoop软件库是一个框架,允许使用简单的编程模型跨计算机集群分布式处理大型数据集。它旨在从单个服务器扩展到数千台计算机,每台计算机都提供本地计算和存储。该库本身不是依靠硬件来提供高可用性,而是设计用于检测和处理应用层的故障,从而在计算机集群之上提供高可用性服务,每个计算机都可能容易出现故障。Hadoop的命令众多,本文重点介绍HDFS与MapReduce

阅读全文 »

SQL-spark

发表于 2019-08-13 | 分类于 大数据平台 , spark

星火SQL架构

getting started

SparkSession

Spark 中所有功能的入口是 SparkSession 类。要创建一个基本的SparkSession 对象, 只需要使用 SparkSession.builder():

1
2
3
4
5
6
7
8
// scala
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
1
2
3
4
5
6
7
8
// java
import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
1
2
3
4
5
6
7
8
# python
from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

创建 DataFrame

应用程序可以使用 SparkSession从一个现有的 RDD,Hive表或 Spark 数据源创建 DataFrame。下面基于一个 JSON 文件的内容创建一个 DataFrame:

1
2
3
4
5
6
7
val df = spark.read.json("examples/src/main/resources/people.json")           //scala

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json"); //java

df = spark.read.json("examples/src/main/resources/people.json") //python

DataFrame 操作

DataFrame为 Scala,Java, Python 以及 R 语言中的结构化数据操作提供了一种领域特定语言。

scala java python
df.printSchema() df.printSchema(); df.printSchema()
df.select("name") df.select("name").show(); df.select("name").show();
df.select($"name", $"age" + 1).show() df.select(col("name"), col("age").plus(1)).show(); df.select(df['name'], df['age'] + 1).show()
df.filter($"age" > 21).show() df.filter(col("age").gt(21)).show(); df.filter(df['age'] > 21).show()
df.groupBy("age").count().show() df.groupBy("age").count().show() df.groupBy("age").count().show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/************************** scala*************************/

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/************************** java*************************/

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#####################             python                ###################

# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# | name|
# +-------+
# |Michael|
# | Andy|
# | Justin|
# +-------+

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# | name|(age + 1)|
# +-------+---------+
# |Michael| null|
# | Andy| 31|
# | Justin| 20|
# +-------+---------+

# Select people older than 21
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Count people by age
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# | 19| 1|
# |null| 1|
# | 30| 1|
# +----+-----+

SQL查询

通过SQL访问数据库,返回一个DataFrame的类型

1
2
3
4
5
6
/***************************   scala   *********************************/

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
1
2
3
4
5
6
7
8
/***************************   java   *********************************/
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
1
2
3
4
5
6
#############################   python    ###############################

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,它将消失。如果您希望拥有一个在所有会话之间共享的临时视图并保持活动状态,直到Spark应用程序终止,您可以创建一个全局临时视图。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*********************************  scala  *************************************/

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*********************************  java  *************************************/

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
###############################   python   ################################

# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
# +----+-------+
# | age| name|
# +----+-------+
# |null|Michael|
# | 30| Andy|
# | 19| Justin|
# +----+-------+

创建dataset

Dataset是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作。每个Dataset都有一个称为DataFrame的非类型化的视图,这个视图是行的数据集。上面的定义看起来和RDD的定义类似,DataSet和RDD主要的区别是:DataSet是特定域的对象集合;然而RDD是任何对象的集合。DataSet的API总是强类型的;而且可以利用这些模式进行优化,然而RDD却不行。DataFrame是特殊的Dataset,它在编译时不会对模式进行检测。在未来版本的Spark,Dataset将会替代RDD成为我们开发编程使用的API(注意,RDD并不是会被取消,而是会作为底层的API提供给用户使用)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//   scala
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// java

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
private String name;
private int age;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

与RDD互相转换

Spark SQL 支持两种不同的方法将RDDs 转换成 Datasets.

??????????????????????????????????????????

聚合函数

DataFrame可以使用聚合函数,例如count()、max()等

自定义聚合函数

使用UserDefinedAggregateFunction a自定义聚合函数。

1
2


hive

发表于 2019-08-13 | 分类于 大数据平台 , Hadoop

查看表结构

命令 说明
show tables like '*name*' hive模糊搜索表
desc formatted table_name
desc table_name
查看表结构信息
show partitions table_name 查看分区信息
select table_coulm from table_name where partition_name = '2014-02-25'; 根据分区查询数据
dfs -ls /user/hive/warehouse/table02; 查看hdfs文件信息
dfs -tail /user/hive/warehouse/table02; 显示hdfs文件的一行信息

创建表格

命令 说明
create table as 选中表格创建表格

create table as

create创建部分可以指定表的存储格式等属性,比如指定新表的行列分隔符,存储格式等。

1
2
3
4
5
6
7
CREATE TABLE temp_copy
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n';
STORED AS textfile
AS
SELECT * FROM temp WHERE start_date IS NOT NULL;
  1. hive中用CTAS创建表,所创建的表统一都是非分区表,不管源表是否是分区表。所以对于分区表的创建使用create table ..as一定要注意分区功能的丢失。当然创建表以后可以添加分区,成为分区表。

  2. 如果使用create table as select *创建表时源表是分区表,则新建的表会多字段,具体多的字段个数和名称就是源表分区的个数和名称。当然如果select选择的是指定的列则不会有这种问题。

  3. 如果源表的存储格式不是textfile。则使用CTAS创建的表存储格式会变成默认的格式textfile。比如这里源表是RCFILE。而新建的表则是textfile。当然可以在使用create table ....as创建表时指定存储格式和解析格式,甚至是列的名称等属性。具体参考博客:hive使用create as创建表指定存储格式等属性
  4. 使用CTAS方式创建的表不能是外部表。
  5. 使用CTAS创建的表不能分桶表。对于每一个表或者是分区,Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。

RDD-Spark

发表于 2019-08-12 | 分类于 大数据平台 , spark

Actions

命令 说明
reduce(func) 通过函数func聚集数据集中的所有元素。func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
collect() 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
count() 统计RDD中元素的个数
take(n) 取RDD中的前n个元素。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)
first() 返回数据集的第一个元素(类似于take(1) )
reduce(func) 按照指定规则聚合RDD中的元素
countByValue() 统计出RDD中每个元素的个数
countByKey() 统计出KV格式的RDD中相同的K的个数
foreach(func) 以元素为单位,遍历RDD,运行func函数。
foreachPartition(func) 以分区为单位,遍历RDD,运行func函数。
saveAsTextFile(path) 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本
saveAsSequenceFile(path) 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//collect 把运行结果拉回到Driver端
val rdd = sc.makeRDD(Array(
(5,"Tom"),(10,"Jed"),(3,"Tony"),(2,"Jack")
))
val resultRDD = rdd.sortByKey()
val list = resultRDD.collect()
list.foreach(println)

val rdd = sc.makeRDD(Array(
(5,"Tom"),(10,"Jed"),(3,"Tony"),(2,"Jack")
))
val resultRDD = rdd.sortByKey()
val list = resultRDD.collect()
list.foreach(println)

//take(n):取RDD中的前n个元素
val rdd = sc.makeRDD(Array("hello","hello","hello","world"))
rdd.take(2).foreach(println)

//first 相当于take(1)
val rdd = sc.makeRDD(Array("hello","hello","hello","world"))
println(rdd.first)

// count 统计RDD中元素的个数
val rdd = sc.makeRDD(Array("hello","hello","hello","world"))
val num = rdd.count()

// reduce 按照指定规则聚合RDD中的元素
val numArr = Array(1,2,3,4,5)
val rdd = sc.parallelize(numArr)
val sum = rdd.reduce(_+_)
println(sum)

//countByValue 统计出RDD中每个元素的个数
val rdd = sc.parallelize(Array(
"Tom","Jed","Tom",
"Tom","Jed","Jed",
"Tom","Tony","Jed"
))
val result = rdd.countByValue();
result.foreach(println)

// countByKey 统计出KV格式的RDD中相同的K的个数
val rdd = sc.parallelize(Array(
("销售部","Tom"), ("销售部","Jack"),("销售部","Bob"),("销售部","Terry"),
("后勤部","Jack"),("后勤部","Selina"),("后勤部","Hebe"),
("人力部","Ella"),("人力部","Harry"),
("开发部","Allen")
))
val result = rdd.countByKey();
result.foreach(println)

// foreach 遍历RDD中的元素
val rdd = sc.makeRDD(Array("hello","hello","hello","world"))
rdd.foreach(println)

//foreachPartition 以分区为单位遍历RDD
val rdd = sc.parallelize(1 to 6, 2)
rdd.foreachPartition(x => {
println("data from a partition:")
while(x.hasNext) {
println(x.next())
}
})

Transformations

命令 说明
filter(func) 返回一个新的数据集,由经过func函数后返回值为true的原元素组成
map(func) 返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
flatMap(func) 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
sample(withReplacement, frac, seed) 随机抽样
withReplacement:是否是放回式抽样
frac::抽样的比例
seed:随机种子
reduceByKey(func, [numTasks]) 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
groupByKey([numTasks]) 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
sortByKey(func, [numTasks]) 按key值进行排序
union(otherDataset) 返回一个新的数据集,由原数据集和参数联合而成
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
cartesian(otherDataset) 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U) 对的数据集,所有元素交互进行笛卡尔积。
map(func) 以元素为单位,遍历RDD,运行func函数。
mapPartitions(func) 以分区为单位,遍历RDD,运行func函数。
mapPartitionsWithIndex 与mapPartitions基本相同,只是在处理函数的参数是一个二元元组,元组的第一个元素是当前处理的分区的index,元组的第二个元素是当前处理的分区元素组成的Iterator
coalesce(n,shuffle) 改变RDD的分区数
n:新的分区数目
shuffle:false:不产生shuffle;true:产生shuffle,如果重分区的数量大于原来的分区数量,必须设置为true,否则分区数不变
repartition(n) 改变RDD分区数。repartition(int n) = coalesce(int n, true)
partitionBy 通过自定义分区器改变RDD分区数
randomSplit(Array) 根据传入的 Array中每个元素的权重将rdd拆分成Array.size个RDD拆分后的RDD中元素数量由权重来决定,
groupWith(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup

map和flatMap

img

filter、sample、sortByKey、sortBy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//filter:过滤
val rdd = sc.makeRDD(Array("hello","hello","hello","world"))
rdd.filter(!_.contains("hello")).foreach(println)

/********** map和flatMap**************
*map: 输入一条记录,输出一条记录
*flatmap:输入一条记录,输出多头记录
*************************************/

//sample :随机抽样
val rdd = sc.makeRDD(Array(
"hello1","hello2","hello3","hello4","hello5","hello6",
"world1","world2","world3","world4"
))
rdd.sample(false, 0.3).foreach(println)

//sortByKey:按key进行排序
val rdd = sc.makeRDD(Array(
(5,"Tom"),(10,"Jed"),(3,"Tony"),(2,"Jack")
))
rdd.sortByKey().foreach(println)

//sortBy:自定义排序规则
object SortByOperator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestSortBy").setMaster("local")
val sc = new SparkContext(conf)
val arr = Array(
Tuple3(190,100,"Jed"),
Tuple3(100,202,"Tom"),
Tuple3(90,111,"Tony")
)
val rdd = sc.parallelize(arr)
rdd.sortBy(_._1).foreach(println) //按照190、100、90排序

groupByKey和reduceByKey

img

distinct:去掉重复数据

img

1
2
3
4
5
6
7
8
9
10
11
val rdd = sc.makeRDD(Array(
"hello",
"hello",
"hello",
"world"
))
val distinctRDD = rdd
.map {(_,1)}
.reduceByKey(_+_)
.map(_._1)
distinctRDD.foreach {println} //等价于:rdd.distinct().foreach {println}

map和mapPartitions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val arr = Array("Tom","Bob","Tony","Jerry")
//把4条数据分到两个分区中
val rdd = sc.parallelize(arr,2)

/*
* 模拟把RDD中的元素写入数据库的过程
*/
rdd.map(x => {
println("创建数据库连接...")
println("写入数据库...")
println("关闭数据库连接...")
println()
}).count()
结果:
创建数据库连接...
写入数据库...
关闭数据库连接...
创建数据库连接...
写入数据库...
关闭数据库连接...
创建数据库连接...
写入数据库...
关闭数据库连接...
创建数据库连接...
写入数据库...
关闭数据库连接...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
* 将RDD中的数据写入到数据库中,绝大部分使用mapPartitions算子来实现
*/
rdd.mapPartitions(x => {
println("创建数据库")
val list = new ListBuffer[String]()
while(x.hasNext){
//写入数据库
list += x.next()+":写入数据库"
}
//执行SQL语句 批量插入
list.iterator
})foreach(println)
结果:
创建数据库
Tom:写入数据库
Bob:写入数据库
创建数据库
Tony:写入数据库
Jerry:写入数据库

mapPartitionsWithIndex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val dataArr = Array("Tom01","Tom02","Tom03"
,"Tom04","Tom05","Tom06"
,"Tom07","Tom08","Tom09"
,"Tom10","Tom11","Tom12")
val rdd = sc.parallelize(dataArr, 3);
val result = rdd.mapPartitionsWithIndex((index,x) => {
val list = ListBuffer[String]()
while (x.hasNext) {
list += "partition:"+ index + " content:" + x.next
}
list.iterator
})
println("分区数量:" + result.partitions.size)
val resultArr = result.collect()
for(x <- resultArr){
println(x)
}

coalesce:改变RDD的分区数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* false:不产生shuffle
* true:产生shuffle
* 如果重分区的数量大于原来的分区数量,必须设置为true,否则分区数不变
* 增加分区会把原来的分区中的数据随机分配给设置的分区个数
*/
val coalesceRdd = result.coalesce(6,true)

val results = coalesceRdd.mapPartitionsWithIndex((index,x) => {
val list = ListBuffer[String]()
while (x.hasNext) {
list += "partition:"+ index + " content:[" + x.next + "]"
}
list.iterator
})

println("分区数量:" + results.partitions.size)
val resultArr = results.collect()
for(x <- resultArr){
println(x)
}

img

randomSplit:拆分RDD

1
2
3
4
5
6
7
/**
* randomSplit:
* 根据传入的 Array中每个元素的权重将rdd拆分成Array.size个RDD
* 拆分后的RDD中元素数量由权重来决定,数据量不大时不一定准确
*/
val rdd = sc.parallelize(1 to 10)
rdd.randomSplit(Array(0.1,0.2,0.3,0.4)).foreach(x => {println(x.count)})

zip

img

spark

发表于 2019-08-09 | 分类于 大数据平台 , spark

spark简介

Spark是开发通用的大数据处理框架。Spark应用程序可以使用R语言、Java、Scala和Python进行编写,极少使用R语言编写Spark程序,Java和Scala语言编写的Spark程序的执行效率是相同的,但Java语言写的代码量多,Scala简洁优雅,但可读性不如Java,Python语言编写的Spark程序的执行效率不如Java和Scala。

img

阅读全文 »

评估模型

发表于 2019-08-07 | 分类于 人工智能 , 评估
真实类别 预测为正例 预测为反例
正例 TP(真正例) FN(假反例)
反例 FP(假正例) TN(真反例)
1
2
3
4
# 混淆矩阵
from sklearn.metrics import confusion_matrix

confusion = confusion_matrix(y_test,y_prob)
阅读全文 »

linux子系统-win10

发表于 2019-08-05 | 分类于 linux , wsl

Windows10安装Ubuntu子系统

一、开启开发模式

“设置 - 更新和安全 - 针对开发人员”设置页面,选中“开发人员模式”。

tempsnip

阅读全文 »
1234…10
路西法

路西法

不忘初心,方得始终

95 日志
25 分类
59 标签
GitHub E-Mail
© 2019 路西法
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4
本站访客数 人次 本站总访问量 次