手写基于Akka的RPC通信框架

spark是一个并行分布式的计算框架,其设计阶段肯定要考虑到数据的交互。那么spark是怎么进行数据交互的?Spark是用Scala编写的,今天来看一下如何使用Scala设计一个Rpc通信服务

设计思路

akka.actor.Actor 组件它于servlet有点类似,你可以把它想象成一个servlet,它同样也有自己的生命周期,preStart会在构造函数执行以后被调用,receive会在接收到消息以后被调用

Actor中我们分出两类,一个叫Master,一个叫Worker

Master 是头头Worker是小弟,就像Yarn里面的resourceManage和nodeManage、HDFS中的NameNode和dataNode一样,无规矩不成方圆,代码里面也是一样的。

1.worker 启动后,在preStart方法中与master建立链接,向Master发送注册消息(将worker的信息通过样例类封装起来发送给master)

2.Master接收到Worker的注册消息后将worker的信息保存起来,向worker反馈注册成功

3.Worker定期要向master发送心跳(防止worker挂掉)

4.如果worker长时间不回复,就把自己内存中的信息清除,防止将任务分配给死掉的worker

创建maven项目

创建一个空的maven项目

image

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

<properties>
...
<scala.version>2.10.6</scala.version>
<scala.compat.version>2.10</scala.compat.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.3.14</version>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.10</artifactId>
<version>2.3.14</version>
</dependency>

</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>

<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zonegood.Master</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

如果没有src/main/scala 需要手动创建

编写Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.zonegood

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import com.zonegood.MessageBox._

import scala.collection.mutable
import scala.concurrent.duration._

class Master extends Actor{

val hashMap = new mutable.HashMap[String,WorkerInfo]()
val CHECK_INTERVAL = 15000
override def preStart(): Unit = {
println("master run...")

import context.dispatcher
// 注册定时task 如果worker长时间不回复,就把自己内存中的信息清除,防止将任务分配给死掉的worker
context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,
self,CheckTimeOutWorker)
}

override def receive: Receive = {
case WorkerStartedMessage(workerId:String,workerInfo : WorkerInfo) => {
println("worker connect")
// 将worker信息
hashMap.put(workerId,workerInfo)
// 向worker反馈注册成功
sender ! RegisterSuccessMessage
}
case HeartBeatMessage(workerId:String) =>{
// 刷新本地worker的状态
if(hashMap.contains(workerId)){
val worker = hashMap(workerId)
worker.flashTime = System.currentTimeMillis()
}
}
case CheckTimeOutWorker =>{
// 拿到的是超时的消息,检测worker是否超时
val currentTime = System.currentTimeMillis()
val remove = hashMap.filter(t => (currentTime - t._2.flashTime) > CHECK_INTERVAL)
for (e <- remove){
hashMap -= e._1
}
println(hashMap.size)

}
case HeartBeatSendMessage(workerId) =>{
// 更新
val workerInfo = hashMap(workerId)
workerInfo.flashTime = System.currentTimeMillis()
}
}
}

object Master{
def main(args : Array[String]) :Unit = {
val host = args(0)
val port = args(1).toInt
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val conf = ConfigFactory.parseString(configStr)
val masterSystem = ActorSystem("masterSystem",conf)
masterSystem.actorOf(Props[Master],"masterActor")
masterSystem.awaitTermination()
}

}

编写Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

package com.zonegood

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import com.zonegood.MessageBox.{HeartBeatMessage, HeartBeatSendMessage, RegisterSuccessMessage, WorkerStartedMessage}

import scala.concurrent.duration._

class Worker(id:String,masterHost:String,masterPort:Int) extends Actor with Serializable{
var master : ActorSelection = _
private[this] val HEART_BEAT_INTERVAL = 10000

val workerInfo = new WorkerInfo()

override def preStart(): Unit = {

// connect master
master = context.actorSelection(
s"akka.tcp://masterSystem@$masterHost:$masterPort/user/masterActor")
// register
master ! new WorkerStartedMessage(this.id,workerInfo)


}

override def receive: Receive = {
case RegisterSuccessMessage => {

// 注册监听器
import context.dispatcher
context.system.scheduler.schedule(0 millis, HEART_BEAT_INTERVAL millis
, self, HeartBeatMessage)
}
case HeartBeatMessage => {
// resend message to master
master ! new HeartBeatSendMessage(this.id)
}
}

}

object Worker{
def main(args : Array[String]) : Unit = {
val host = args(0)
val port = args(1).toInt
val masterHost = args(2)
val masterPort = args(3).toInt
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = "$port"
""".stripMargin
val conf = ConfigFactory.parseString(configStr)
val workerSystem = ActorSystem("workerSystem",conf)
workerSystem.actorOf(Props(new Worker(UUID.randomUUID().toString,masterHost,masterPort)),"workerActor")
workerSystem.awaitTermination()

}
}

样例类

1
2
3
4
5
6
7
8
9
package com.zonegood

object MessageBox {
case class WorkerStartedMessage(workerId:String,workerInfo : WorkerInfo)
case class RegisterSuccessMessage()
case class CheckTimeOutWorker()
case class HeartBeatMessage(workerId:String)
case class HeartBeatSendMessage(workerId:String)
}

封装数据类

1
2
3
4
5
6
package com.zonegood

class WorkerInfo extends Serializable{

var flashTime = System.currentTimeMillis()
}

使用maven-shade-plugin插件打包

1.打包Master执行 jar

修改pom.xml

1
2
3
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.zonegood.Master</mainClass>
</transformer>

运行 mvn clean package

mv target/my-scala-rpc-1.0-SNAPSHOT.jar ~/workspace/master.jar

2.打包Worker执行 jar

同理 打出worker.jar

进入到~/workspace你就可以看到新的两个jar文件

image

运行

$ cd ~/workspace

$ java -jar master.jar 127.0.0.1 8888

$ java -jar worker.jar 127.0.0.1 7001 127.0.0.1 8888

$ java -jar worker.jar 127.0.0.1 7002 127.0.0.1 8888

$ java -jar worker.jar 127.0.0.1 7003 127.0.0.1 8888

查看运行结果

image

关闭其中一个Worker查看console结果

image