[1] BigData-Notes/Hadoop集群环境搭建.md at master · heibaiying/BigData-Notes (github.com)

[2] Hadoop集群配置 - 简书 (jianshu.com)

安装方法参考官网 Apache Hadoop 3.3.3 – Hadoop: Setting up a Single Node Cluster.

原理

HDFS简明入门教程 (biancheng.net)

操作步骤如下:

  1. 客户端发起文件读取的请求。
  2. NameNode 将文件对应的数据块信息及每个块的位置信息,包括每个块的所有副本的位置信息(即每个副本所在的 DataNode 的地址信息)都传送给客户端。
  3. 客户端收到数据块信息后,直接和数据块所在的 DataNode 通信,并行地读取数据块。

在客户端获得 NameNode 关于每个数据块的信息后,客户端会根据网络拓扑选择与它最近的 DataNode 来读取每个数据块。当与 DataNode 通信失败时,它会选取另一个较近的 DataNode,同时会对出故障的 DataNode 做标记,避免与它重复通信,并发送 NameNode 故障节点的信息。

Prerequisite

设置环境变量:

  1. JAVA_HOME={path_to_jdk_dir}
  2. HADOOP_HOME={path_to_hadoop_home}

多台设备间能免密ssh

  1. 修改每台服务器的hostname
  2. :star:在\etc\hosts中绑定集群中每个设备的IP地址与域名(使用真实IP表示本机而不用localhost)
  3. ~/.ssh生成公密钥,用ssh-copy-id命令将公钥添加到其他设备
  4. 使用ssh {hostname}命令验证是否能免密登录其他主机

Configuration

修改hadoop\etc中的配置文件,如下。

1. core-site.xml

The fs.defaultFS makes HDFS a file abstraction over a cluster, so that its root is not the same as the local system’s.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<configuration>
<!-- 通过该参数配置集群的文件抽象 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop-master:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/hadoop/tmp</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
</configuration>

2. hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<configuration>
<property>
<!-- 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔 -->
<name>dfs.namenode.name.dir</name>
<value>/home/hadoop/namenode/data</value>
</property>
<property>
<!-- 节点数据(即数据块)的存放位置 -->
<name>dfs.datanode.data.dir</name>
<value>/home/hadoop/datanode/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>

3. hadoop-env.sh

1
2
3
4
5
6
7
8
9
export JAVA_HOME=/opt/jdk
export HADOOP_HOME=/opt/hadoop
export PDSH_RCMD_TYPE=ssh

# export HDFS_NAMENODE_USER=hadoop
# export HDFS_DATANODE_USER=hadoop
# export HDFS_SECONDARYNAMENODE_USER=hadoop
# export YARN_RESOURCEMANAGER_USER=hadoop
# export YARN_NODEMANAGER_USER=hadoop

4. yarn-site.xml (可选,如果不使用分布式计算)

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop-master</value>
</property>
</configuration>

5. mapred-site.xml (可选,如果不使用MapReduce)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<configuration>
<property>
<!--指定 mapreduce 作业运行在 yarn 上-->
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
</configuration>

6. wokers

在目录的wokers中添加所有的datanode节点的主机名

1
2
3
4
hadoop-master
hadoop-slave1
hadoop-slave2
...

修改配置文件后,在namenode节点(即master)使用hdfs namenode -format格式化namenode的文件系统(不需要在slave节点上执行该命令)

执行sbin/start-all.sh命令开启服务。

由于是集群,因此任务的执行会在所有的设备上触发。

Typically one machine in the cluster is designated as the NameNode and another machine as the ResourceManager, exclusively. These are the masters. Other services (such as Web App Proxy Server and MapReduce Job History server) are usually run either on dedicated hardware or on shared infrastructure, depending upon the load.

The rest of the machines in the cluster act as both DataNode and NodeManager. These are the workers.

[hadoop slaves_猎人在吃肉的博客-CSDN博客](https://blog.csdn.net/xiaojin21cen/article/details/42421781

slaves 文件 (在新版中是worker文件)

一般在集群中你唯一地选择一台机器作为 NameNode ,一台机器作为 ResourceManager,这是master (主)。

那么,集群中剩下的机器作为DataNode 和 NodeManager。这些是slaves(从)。

在你的hadoop目录/etc/hadoop/slaves文件上列出全部slave机器名或IP地址,一个一行。

Spark

RDD Programming Guide - Spark 3.2.1 Documentation (apache.org)

To run Spark applications in Python without pip installing PySpark, use the bin/spark-submit script located in the Spark directory. This script will load Spark’s Java/Scala libraries and allow you to submit applications to a cluster. You can also use bin/pyspark to launch an interactive Python shell.

使用spark-submit提交应用到集群,连接集群需要初始化

集群连接方法

  1. initializing-spark
  2. spark-submit

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

1
2
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

连接集群需要创建SparkContext对象,描述集群的连接方法,使用配置文件传入应用名与master(可理解集群的类型)。

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

Unlike other cluster managers supported by Spark in which the master’s address is specified in the --master parameter, in YARN mode the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the --master parameter is yarn.

此处的master如果使用Hadoop Yarn可以传入yarn,而不是spark://...,具体的脚本参数可查看官网。

PySpark applications start with initializing SparkSession which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users.

通过SparkSession开启Spark应用,交互式的bin/pyspark中已经创建了session。

在集群上运行(Cluster Manager Types)

Running Spark on YARN - Spark 3.2.1 Documentation (apache.org)

The system currently supports several cluster managers:

  • Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
  • Apache Mesos – a general cluster manager that can also run Hadoop MapReduce and service applications. (Deprecated)
  • Hadoop YARN – the resource manager in Hadoop 2.
  • Kubernetes – an open-source system for automating deployment, scaling, and management of containerized applications.

Spark支持四种集群模式。除了本地测试,此处选择在Hadoop Yarn上运行Spark

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# %%
import random

import matplotlib.pyplot as plt
import numpy as np
from pyspark.sql import SparkSession


def generate_noisy(cx, cy, N=50, std=1):
noise_x = std * np.random.randn(N)
noise_y = std * np.random.randn(N)

X = np.array([cx + noise for noise in noise_x])
Y = np.array([cy + noise for noise in noise_y])
return X, Y


def generate_dataset(centers):
X = np.zeros([0])
Y = np.zeros([0])
labels = np.zeros([0])

for i, center in enumerate(centers):
X_i, Y_i = generate_noisy(*center)
labels_i = i * np.ones_like(X_i)

X = np.concatenate([X, X_i])
Y = np.concatenate([Y, Y_i])
labels = np.concatenate([labels, labels_i])
return X.tolist(), Y.tolist(), labels.astype(np.int16).tolist()


def eu_distance(p1, p2):
return np.sqrt(np.sum(np.square(p1 - p2)))


def KMeans(data, k=2, seed=1, num_iter=3):
r"""
Args:
`data`: 2D points in format (x, y)
Return:
`centers`: [(cx1, cy1), (cx2, cy2), ... ]
"""
random.seed(seed)
centers = random.sample(data, k)
data = np.array(data)

for _ in range(num_iter):
clusters = [[] for _ in range(k)]
for item in data:
dist = [eu_distance(item, np.array(c)) for c in centers]
clusters[dist.index(min(dist))].append(item)
centers = [np.array(c).mean(axis=0).tolist() for c in clusters]
points = []
for label, cluster in enumerate(clusters):
for x, y in cluster:
points.append((x, y, label))
return centers, points


def plot_clustering(x, y, labels, centers):

colors = [['#FF0000', '#00FF00', '#0000FF'][i] for i in labels]
colors_center = ['#000000', '#66ffff']

f, ax = plt.subplots()
ax.scatter(x, y, color=colors)

for i, center in enumerate(centers):
for x, y in center:
ax.scatter(x, y, color=colors_center[i])

ax.set_xlabel('x')
ax.set_ylabel('y')
f.suptitle('K-Means')

plt.savefig('./result.jpg')
plt.show()


# %%
if __name__ == "__main__":

spark = SparkSession.builder.appName("k-means").getOrCreate()

file_path = "hdfs://hadoop-master:8020/user/hadoop/data.csv"

centers = [(random.randint(1, 8), random.randint(1, 8)) for _ in range(3)]
data = generate_dataset(centers)
spark.createDataFrame(
list(zip(*data))).write.mode('overwrite').csv(file_path)

df = spark.read.csv(file_path, inferSchema=True)

points = [(row[0], row[1]) for row in df.collect()]
centers_hat, points = KMeans(points, k=3, seed=2, num_iter=3)

print(f"GroundTruth: {centers}\nPredict: {centers_hat}")

x, y, labels = zip(*points)
plot_clustering(x, y, labels, (centers, centers_hat))

spark.stop()

# %%