之前了解到的 Flink 的心跳服务都比较浅显,只知道 在 Flink 中心跳服务是由 ReourceManager 发送给 JobMaster 和 TaskExecutor 以及 JobMaster 发送给 TaskExecutor。 然后 TaskExecutor 返回相关的Slot等数据给 ResouceManager。所以一直以为 心跳服务是 Akka 的 ask 进行传递的。 但是查看相关源码发现和我的理解有些出入。并且在最开始查看源码的时候发现,Flink 对心跳服务封装的比较好,定义的接口在很多地方都是匿名的实现,所以一开始看的时候很容易混淆,搞不清楚整个心跳的流程,下面用ResourceManager和TaskManager的心跳服务 来简单聊一聊 Flink 中心跳服务的流程。
下面是心跳服务类的继承关系
最核心的类就是HeaderbeatManager
接口的实现类HeartbeatManagerImpl
类。其中实现了接收到心跳请求和接受到心跳的代码。它的子类HeartbeatManagerSendImpl
继承了Runnable接口,用于定期触发心跳请求。
在HeartbeatManagerImpl
中有一个存放HeartbeatMonitor
对象的 Map 集合。HeartbeatMonitor
类主要是记录心跳的时间,判断心跳是否超时。在构造HeartbeatMonitor
的时候需要传入一个HeartbeatTarget
接口的实现对象。HeartbeatTarget
接口定义的是接受到心跳请求后的操作和接收到心跳的操作。 该接口的实现类主要在两个地方,一个是在添加 Motitor 时的匿名对象,比如在RM添加对 TaskManager 监听时会传入一个实现了HeartbeatTarget 接口的匿名对象。一个是在HeartbeatManagerSendImpl
中的实现。这个地方我最开始看源码时特别容易混淆。HeartbeatManagerSendImpl
中的requestHeartbeat()
方法是接收到心跳请求后的处理,receiveHeartbeat()
是接收到心跳后的处理。 在匿名对象中的requestHeartbeat()
是发送心跳请求的动作(e.g. RM向TM发送心跳请求)而receiveHeartbeat()
则是实现了 接收到心跳请求后发送心跳的动作 (e.g. TM 就收到RM的心跳请求,向RM发送心跳及需要汇报的信息)
下面是ResourceManager 和 TaskExecutor 的心跳服务的流程
在最开始,ResourceManager 服务启动的时会创建两个 心跳服务管理对象, RM用来管理TaskManager的心跳服务的对象名叫taskManagerHeartbeatManager
private void startHeartbeatServices() {taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
一个用来管理 TaskManager 的心跳通信,一个用来管理 JobManager 的心跳通信。这两个对象都是HeartbeatManagerSenderImpl
对象。在HeartbeatManagerSenderImpl
的构造方法中就会启动定时任务。
public class HeartbeatManagerSenderImplextends HeartbeatManagerImplimplements Runnable {
HeartbeatManagerSenderImpl(ScheduledExecutor mainThreadExecutor, ...) {super(heartbeatTimeout,...mainThreadExecutor);
this.heartbeatPeriod = heartbeatPeriod;
// 开始任务调度
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
@Override
public void run() {if (!stopped) {log.debug("Trigger heartbeat request.");
for (HeartbeatMonitorheartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);
}
// 设置新的任务调度
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
}
mainThreadExecutor
是一个RpcEndpoint
的静态内部类,这里使用它的schedule()
方法来实现定时任务调度。schedule()
接受一个Runnable
接口的对象,而HeartbeatManagerSenderImpl
就实现了Runnable
接口,所以,在定时任务被触发时就会执行HeartbeatManagerSenderImpl#run()
方法。 在run()
方法中,会继续设置一个新的定时任务,这样不断地循环。这里默认的延迟时间为 10000 毫秒。
schedule()
方法实现任务的延迟执行主要是通过给 Actor 发送一条异步任务的消息,该消息会带上延迟执行的时间。 在这里就是 ResourceManager 给自己的 Acotr 发送了一条延迟消息。
@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {if (isLocal) {// 计算任务调度的时间
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
// 向自己发送一条 异步任务处理 的消息
tell(new RunAsync(runnable, atTimeNanos));
}
}
在 ResourceManager 的 Actor 接收到这条消息的时候,会判断任任务是否需要立即执行,如果是延迟执行,则会使用 Akka 的 ActorSystem.scheduler() 来定时执行该任务。
private void handleRunAsync(RunAsync runAsync) {final long timeToRun = runAsync.getTimeNanos();
final long delayNanos;
// 如果接收到的任务已经到达任务的执行时间则立即执行
if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime())<= 0) {// run immediately
try {runAsync.getRunnable().run();
} catch (Throwable t) {log.error("Caught exception while executing runnable in main thread.", t);
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
// 如果没有到达任务的执行时间,则发送一条新的延迟消息给自己
} else {// schedule for later. send a new message after the delay, which will then be
// immediately executed
FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
final Object envelopedSelfMessage = envelopeSelfMessage(message);
getContext()
.system()
.scheduler()
.scheduleOnce(
delay,
getSelf(),
envelopedSelfMessage,
getContext().dispatcher(),
ActorRef.noSender());
}
}
心跳监听对象的添加与触发heartbeatMonitor 对象的添加是在 TaskManger 启动后,向 ResourceManager 注册时调用HeartbeatManagerSenderImpl#monitorTarget()
方法添加的。 添加的时候会传入一个HeartbeatTarget 接口的匿名实现类。 该实现类就定义了触发心跳请求时的操作。下面代码中就定义了RM向TaskManager发送心跳时需要怎么做,但是接收心跳请求的方法
private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
TaskExecutorRegistration taskExecutorRegistration) {// 向 RM的TaskManager心跳管理服务 添加心跳监听对象
taskManagerHeartbeatManager.monitorTarget(
taskExecutorResourceId,
new HeartbeatTarget() {@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the
// TaskManager
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});
}
在HeartbeatManagerSenderImpl
的run()
方法中,会遍历所有的正在监视的 heartbeatMonitor 对象,并调用 在添加监视时传入的heartbeatTarget
匿名对象的requestHeartbeat()
方法,就像上面代码一样。所以在RM向TaskManager 发送心跳请求的时候 是通过 调用taskExecutorGateway
的heartbeatFromResourceManager() 发送了 RPC 请求
在 TM 服务启动的时候同样也会创建一个心跳服务来管理与RM之间的心跳
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
// ============
publicHeartbeatManagercreateHeartbeatManager(
ResourceID resourceId,
HeartbeatListenerheartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {return new HeartbeatManagerImpl<>(
heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
}
在TM中创建的就是HeartbeatManagerImpl
对象,因为TM并不需要发送心跳请求,所以不是创建HeartbeatManagerSenderImpl
对象。
TM 向 RM 注册成功后,会添加一个对 RM 的监听对象
// monitor the resource manager as heartbeat target
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
new HeartbeatTarget() {@Override
public void receiveHeartbeat(
ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {resourceManagerGateway.heartbeatFromTaskManager(
resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(
ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
在这里,HeartbeatTarget 匿名对象中,receiveHeartbeat() 就是向RM 发送心跳并附带上汇报信息,而requestHeartbeat 是空的,因为 TM 不会向 RM 发送心跳请求。
TaskManager 接受心跳请求并发送心跳回到之前,RM 调用taskExecutorGateway的heartbeatFromResourceManager方法,通过RPC方式发送了心跳请求。 在TaskExecutor类中的heartbeatFromResourceManager方法就会被调用。并传入了RM 的 resourceID。
@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
resourceManagerHeartbeatManager 就是 TM 时创建的HeartbeatManagerImpl
对象,所以这里调用的requestHeartbeat() 方法是HeartbeatManagerImpl
中的方法。
public class HeartbeatManagerImplimplements HeartbeatManager{// 接受到RM的心跳请求
@Override
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);
// 汇报心跳,清除HeartbeatMonitor中的超时Future
final HeartbeatTargetheartbeatTarget = reportHeartbeat(requestOrigin);
if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
heartbeatTarget.receiveHeartbeat(
getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
}
在这个方法中 首先会调用reportHeartbeat方法.
HeartbeatTargetreportHeartbeat(ResourceID resourceID) {if (heartbeatTargets.containsKey(resourceID)) {// 通过 RM 的reosurceID 找到 TM对RM的监听器
HeartbeatMonitorheartbeatMonitor = heartbeatTargets.get(resourceID);
// 重新设置 监听器的超时时间
heartbeatMonitor.reportHeartbeat();
return heartbeatMonitor.getHeartbeatTarget();
} else {return null;
}
}
之后就会调用之前创建监听器时的匿名对象的方法来通过RPC调用向RM发送心跳数据。
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
之后又回到了RM
RM 接受TM的心跳数据在 TM 发送 RPC 请求后,ResourceManager 类中的heartbeatFromTaskManager()
方法会被调用。该方法只有一行代码
@Override
public void heartbeatFromTaskManager(
final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}
所以在这里,会调用 RM 管理 TM 的心跳服务对象(HeartbeatManagerSenderImpl) 的receiveHeartbeat()
方法。
@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat from {}.", heartbeatOrigin);
reportHeartbeat(heartbeatOrigin);
if (heartbeatPayload != null) {heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
}
}
}
这里首先会调用reportHeartbeat()
来重新设置 在 RM 中对 TM 的监听器的超时时间。 然后调用heartbeatListener来处理TM 传过来的数据。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧