0%

Flink JM HA 在 Kubernetes 上的实现

这篇文章介绍一下 Flink 的 JobManager HA 在 Kubernetes 上面的实现思路。Flink 1.12 还没有 release,但是在开发计划中已经看到了这块内容。但是这篇文章主要介绍我们内部的实现。下一篇在 Flink 1.12 正式 release 之后再进行介绍官方的实现。

这里说的 HA 是 HighAvailability 的缩写,HA 在很多分布式系统中都有涉及,主要是为了解决 SPOF (single point of failure,也就是单点)问题。举个例子,大数据生态的分布式文件系统 HDFS 的 NameNode (可以简单理解为 master-slave 结构中的 master)在生产环境都是要开启 HA 的。类似的,很多 HA 都是针对分布式架构(master-slave)中的 master。

那么具体一点,HA 如何来解决单点问题呢?在非 HA 的系统中,master 一旦挂掉,很多 master 角色承担的工作将会停止。比如 Flink 的 JobManager 负责 task 的转发。在非 HA 情况下我们不得不重启作业从头开始计算。但是有了 HA 之后,一台 JobManager 挂掉之后,会有新的 JobManager 来取代他从而使得作业可以继续运行。

需要 HA 就需要多个同样的服务进程的同时存在,但是真正服务的应该只有一个进程,这个就是 Leader,而如何在这些服务中选出一个进行服务就是 Leader 选举。

1. Leader 选举

在一个典型的 Leader 选举过程中,多个候选者(candidates)会去竞争 leader (正常只有一个)的位置,这个过程称为 Leader Election。Leader 选出来之后,其他的候选者一般称为 Follower,为了向其他的 follower 说明自己的 leader 角色,leader 需要不断发送心跳。但是心跳有一个弊端,在网络不稳定的时候可能会导致多主。为了解决这个问题,租约(lease)的概念被引入进来。租约机制简单来说是由一个特定的角色来向所有的系统中的节点颁发租约,获得租约的节点即为 leader。租约对应一个有效时长,在时长耗尽(timeout)之前,leader 需要去 renew 租约,否则就需要新的一轮的 leader 选举。

要实现 Leader 选举除了自己实现一套一致性算法,还可以使用一些分布式协调系统(coordinator),比如 ZooKeeper,Etcd 等。

目前 Flink 的 HA 服务被抽象成了 interface HighAvailabilityServices,我们可以以插件化的方式实现我们自己的 HA 服务。HighAvailabilityServices 负责所有需要保证高可用的服务,主要工作包括:

  • ResourceManager leader 选举和 leader 获取。(ResourceManager 是 JobManager 的一个组件)
  • JobManager leader 选举和 leader 获取。
  • checkpoint 的元数据存储。checkpoint 是分布式计算系统的一个分布式快照,新的 JobManager 可以通过读取 checkpoint 来恢复之前的作业状态。这里有一点需要注意的是,存储的只是元数据,比如 checkpoint 本身。checkpoint 本身一般是存储在分布式文件系统中,比如 HDFS,HA 服务只需要存储 checkpoint 的文件路径。
  • 注册最新的 checkpoint。checkpoint 每隔固定的时间间隔就会做一次,recover 的时候一般从最新的恢复即可。
  • Registry 负责管理作业的状态。

下面是 interface 的实现,其中部分 Deprecated 的方法已经去掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public interface HighAvailabilityServices extends ClientHighAvailabilityServices {
// 获取 resource manager 的 leader retriever
LeaderRetrievalService getResourceManagerLeaderRetriever();

// 获取 dispatcher 的 leader retriever
LeaderRetrievalService getDispatcherLeaderRetriever();

/**
* Gets the leader retriever for the job JobMaster which is responsible for the given job.
*
* @param jobID The identifier of the job.
* @param defaultJobManagerAddress JobManager address which will be returned by
* a static leader retrieval service.
* @return Leader retrieval service to retrieve the job manager for the given job
*/
LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

/**
* Gets the leader election service for the cluster's resource manager.
*
* @return Leader election service for the resource manager leader election
*/
LeaderElectionService getResourceManagerLeaderElectionService();

/**
* Gets the leader election service for the cluster's dispatcher.
*
* @return Leader election service for the dispatcher leader election
*/
LeaderElectionService getDispatcherLeaderElectionService();

/**
* Gets the leader election service for the given job.
*
* @param jobID The identifier of the job running the election.
* @return Leader election service for the job manager leader election
*/
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

/**
* Gets the checkpoint recovery factory for the job manager.
*
* @return Checkpoint recovery factory
*/
CheckpointRecoveryFactory getCheckpointRecoveryFactory();

/**
* Gets the submitted job graph store for the job manager.
*
* @return Submitted job graph store
* @throws Exception if the submitted job graph store could not be created
*/
JobGraphStore getJobGraphStore() throws Exception;

/**
* Gets the registry that holds information about whether jobs are currently running.
*
* @return Running job registry to retrieve running jobs
*/
RunningJobsRegistry getRunningJobsRegistry() throws Exception;

/**
* Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
*
* @return Blob store
* @throws IOException if the blob store could not be created
*/
BlobStore createBlobStore() throws IOException;

/**
* Gets the leader election service for the cluster's rest endpoint.
*
* @return the leader election service used by the cluster's rest endpoint
*/
default LeaderElectionService getClusterRestEndpointLeaderElectionService() {
// for backwards compatibility we delegate to getWebMonitorLeaderElectionService
// all implementations of this interface should override getClusterRestEndpointLeaderElectionService, though
return getWebMonitorLeaderElectionService();
}

@Override
default LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
// for backwards compatibility we delegate to getWebMonitorLeaderRetriever
// all implementations of this interface should override getClusterRestEndpointLeaderRetriever, though
return getWebMonitorLeaderRetriever();
}

// ------------------------------------------------------------------------
// Shutdown and Cleanup
// ------------------------------------------------------------------------

/**
* Closes the high availability services, releasing all resources.
*
* <p>This method <b>does not delete or clean up</b> any data stored in external stores
* (file systems, ZooKeeper, etc). Another instance of the high availability
* services will be able to recover the job.
*
* <p>If an exception occurs during closing services, this method will attempt to
* continue closing other services and report exceptions only after all services
* have been attempted to be closed.
*
* @throws Exception Thrown, if an exception occurred while closing these services.
*/
@Override
void close() throws Exception;

/**
* Closes the high availability services (releasing all resources) and deletes
* all data stored by these services in external stores.
*
* <p>After this method was called, the any job or session that was managed by
* these high availability services will be unrecoverable.
*
* <p>If an exception occurs during cleanup, this method will attempt to
* continue the cleanup and report exceptions only after all cleanup steps have
* been attempted.
*
* @throws Exception Thrown, if an exception occurred while closing these services
* or cleaning up data stored by them.
*/
void closeAndCleanupAllData() throws Exception;
}

HighAvailabilityServices 还涉及到其他几个相关 Interface。

LedaerElectionService 帮助竞选者 LeaderContender 去参与 leader 选举。所有参加 leader 选举的服务都需要实现接口 LeaderContender,在 Flink 中有四个服务实现了该接口:

  • Dispatcher
  • JobManager
  • ResourceManager
  • WebMonitorEndpoint
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface LeaderElectionService {
// 启动 leader 选举服务,内部在选举结束之后会根据成功或者失败调用 LeaderContender 的指定的回调函数
// 除此之外还会将 contender 加入到 listener 中,以便在之后的选举流程中使用
void start(LeaderContender contender) throws Exception;

// 停止 leader 选举服务
void stop() throws Exception;

// 确认竞争者接受了 leader 委派并发布其 leader 地址
void confirmLeadership(UUID leaderSessionID, String leaderAddress);

// 返回在 leaderSessionId 内,与该 service 绑定的竞争者是否是 leader
boolean hasLeadership(@Nonnull UUID leaderSessionId);
}

// 需要参与 leader 选举的服务都需要实现该接口
public interface LeaderContender {
// 被选为 leader 时的回调函数
void grantLeadership(UUID leaderSessionID);

// 之前的 leader 被剥夺 leader 角色时的调用函数
void revokeLeadership();

// 错误处理
void handleError(Exception exception);

default String getDescription() {
return "LeaderContender: " + getClass().getSimpleName();
}
}

LeaderRetrievalService 用来获取支持 HA 服务的当前的 leader 和在 leader 变化的时候通知 listener 新的 leader。其中 listener 对应是 LeaderRetrievalListener,所有想要接收新 leader 通知消息的服务都需要实现该接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface LeaderRetrievalService {
// 启动 service,并且将 listener 加到回调队列中,当出现新的 leader 时进行回调。
void start(LeaderRetrievalListener listener) throws Exception;

void stop() throws Exception;
}

public interface LeaderRetrievalListener {
// 当新的 leader 被选出来时,会通过回调该方法进行通知
void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID);

void handleError(Exception exception);
}

CheckpointRecoveryFactory 是 checkpoint 相关的接口。HA Service 只会存储 checkpoint 的元数据比如 checkpoint 的存储路径(比如 HDFS)等,并不会存储 checkpoint 数据。这部分大部分是关于 checkpoint 的相关逻辑,这里不再赘述。

RunningJobsRegistry 用于追踪作业的状态。比如在发生 leader 切换的时候,新的 jobmanager 需要借助于 RunningJobsRegistry 去 recover。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface RunningJobsRegistry {
enum JobSchedulingStatus {
PENDING,
RUNNING,
DONE;
}

// 将作业状态设置为 Running
void setJobRunning(JobID jobID) throws IOException;

// 将作业状态设置为完成
void setJobFinished(JobID jobID) throws IOException;

// 获得作业的 schedule 状态
JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException;

void clearJob(JobID jobID) throws IOException;
}

1. 基于 ZooKeeper 的 HA 方案实现

1. ZooKeeper 选主

总所周知 zk 可以用来选主,原理很简单:所有参与选主的 candidate 都尝试去创建一个指定的 znode(创建路径具有原子性),其实就是一个路径,创建成功的就是 leader。那么我们如何感知到 leader 挂掉然后重新开始新的一轮选主呢?zk 的 znode 分为两种:persist 和 ephemeral。persist 的 znode 和其存储的数据会一直存在,除非手动删除;ephemeral 的 znode 生命周期和创建它的 client 保持一致,严格来说是和 client 和 server 的连接(也就是 session)的生命周期保持一致。client 和 server 一旦断开,ephemeral 的 znode 将会被删除。另外 zk 提供了一个 api 叫 watch,其他 follower 只要通过 watch leader 的 znode 就能监听到 leader 挂掉了。

简而言之,通过创建一个 ephemeral 的 znode 我们就可以实现 leader 选举和 leader 感知了。

Curator leaderLatch

尽管通过 zk 来实现选主比较简单,但是我们一般也不会去裸写相关代码逻辑。由于 zk 的常见用途比较固定(选主、分布式锁等),Netflix 公司开源了一套 zk 客户端操作框架:curator,现在大家对 zk 的操作基本都是通过 curator 来操作的,比如 Spark、Flink。

Curator 提供了一个 Class 来实现选主:LeaderLatch。要使用 LeaderLatch 来选主非常简单,基本按如下流程即可。

1
2
3
leaderLatch = new LeaderLatch(client, latchPath);	// 创建 LeaderLatch
leaderLatch.addListener(this); // 添加回调对象
leaderLatch.start(); // 参与选主

其中第二个添加回调对象,其实是如下的 interface。

1
2
3
4
5
public interface LeaderLatchListener {
void isLeader(); // leader 回调

void notLeader(); // 非 leader 回调
}

成功被选为 leader 的进程会回调方法 isLeader(),否则会回方法 notLeader()

Flink 的 HighAvailabilityServices 中有四个服务需要进行选主:ResourceManager,Dispatcher,JobManager,ClusterRestEndpoint。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 /**
* Gets the leader election service for the cluster's resource manager.
*/
LeaderElectionService getResourceManagerLeaderElectionService();
/**
* Gets the leader election service for the cluster's dispatcher.
*/
LeaderElectionService getDispatcherLeaderElectionService();
/**
* Gets the leader election service for the given job.
*/
LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
/**
* Gets the leader election service for the cluster's rest endpoint.
*/
default LeaderElectionService getClusterRestEndpointLeaderElectionService()

通过上面的 zk 选主的分析,我们可以想到这四个方法的返回应该是一个实现,只是对应的 zk 的 znode 的 path 不同。实际上也确实是这样,返回的都是 ZooKeeperLeaderElectionService

1
2
3
4
5
6
7
8
9
10
11
public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener, UnhandledErrorListener {
// zk 相关
/** Client to the ZooKeeper quorum. */
private final CuratorFramework client;

/** Curator recipe for leader election. */
private final LeaderLatch leaderLatch;

/** Curator recipe to watch a given ZooKeeper node for changes. */
private final NodeCache cache;
}

ZooKeeperLeaderElectionService 成员中包含了 curator 的 LeaderLatch 用来选主和 NodeCache 用来监听 leader 节点变化。ZooKeeperLeaderElectionService 除了实现 Flink 的 interface LeaderElectionService,还实现了 curator 的 interface LeaderLatchListenerNodeCacheListener,这样我们就可以将当前对象以回调的方式添加到 zk 的回调对象里面了。LeaderLatchListener 用于选主完成(成功或者失败)进行回调,NodeCacheListener 用于节点发生变化进行回调。

Leader Retrieval 其实说的就是 Flink 的 HaService 中的 interface LeaderRetrievalService,前面也说到 LeaderRetrievalService 的最大作用就是在 Leader 发生变化的时候通知到 client,其实就是 Flink 的 TaskManager 等。联想到 zk 的 leader 监听我们不难想到 zk 的 LeaderRetrievalService 一定是扩展了 curator 框架的 NodeCacheListener,我们来看一下是不是。

interface LeaderRetrievalService 基于 zk 的实现是 ZooKeeperLeaderRetrievalService。我们下面列出了三个方法:

  • start:interface LeaderRetrievalService 中对于 start 方法的描述是 Starts the leader retrieval service with the given listener to listen for new leaders. 其核心也就是将 listener 加入回调对象中,当 leader 发生变更的时候进行回调通知。但是基于 zk 的实现,回调是 zk 来做的,但是 start 的入参 LeaderRetrievalListener 并不能作为 zk 的回调对象(没有实现 NodeCacheListener)。所以这里回调其实是 zk 回调 ZooKeeperLeaderRetrievalService,然后 ZooKeeperLeaderRetrievalService 再去通知真正的 listener。
  • stop:没啥可说的,比较简单,主要就是停止并清理。
  • nodeChanged:就是当 leader 发生变更的时候 zk 的回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class ZooKeeperLeaderRetrievalService implements LeaderRetrievalService, NodeCacheListener, UnhandledErrorListener {
/** Connection to the used ZooKeeper quorum. */
private final CuratorFramework client;

/** Curator recipe to watch changes of a specific ZooKeeper node. */
private final NodeCache cache;

@Override
public void start(LeaderRetrievalListener listener) throws Exception {
synchronized (lock) {
leaderListener = listener;

client.getUnhandledErrorListenable().addListener(this);
cache.getListenable().addListener(this);
cache.start();

client.getConnectionStateListenable().addListener(connectionStateListener);

running = true;
}
}

@Override
public void stop() throws Exception {
synchronized (lock) {
if (!running) {
return;
}

running = false;
}

client.getUnhandledErrorListenable().removeListener(this);
client.getConnectionStateListenable().removeListener(connectionStateListener);

try {
cache.close();
} catch (IOException e) {
throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
}
}

@Override
public void nodeChanged() throws Exception {
synchronized (lock) {
if (running) {
try {
LOG.debug("Leader node has changed.");

ChildData childData = cache.getCurrentData();

String leaderAddress;
UUID leaderSessionID;

if (childData == null) {
leaderAddress = null;
leaderSessionID = null;
} else {
byte[] data = childData.getData();

if (data == null || data.length == 0) {
leaderAddress = null;
leaderSessionID = null;
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);

leaderAddress = ois.readUTF();
leaderSessionID = (UUID) ois.readObject();
}
}

if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
Objects.equals(leaderSessionID, lastLeaderSessionID))) {
LOG.debug(
"New leader information: Leader={}, session ID={}.",
leaderAddress,
leaderSessionID);

lastLeaderAddress = leaderAddress;
lastLeaderSessionID = leaderSessionID;
leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
}
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
}

4. RunningJobsRegistry

HA Service 中还有一块内容是 RunningJobsRegistry,用来记录运行的作业。基于 zk 的实现想一想也能想到只要基于一个固定的路径进行读写就可以了,比较简单,不再赘述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* A zookeeper based registry for running jobs, highly available.
*/
public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {

private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperRunningJobsRegistry.class);

private static final Charset ENCODING = Charset.forName("utf-8");

/** The ZooKeeper client to use. */
private final CuratorFramework client;

private final String runningJobPath;

public ZooKeeperRunningJobsRegistry(final CuratorFramework client, final Configuration configuration) {
this.client = checkNotNull(client, "client");
this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
}

@Override
public void setJobRunning(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING);
}
catch (Exception e) {
throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
}
}

@Override
public void setJobFinished(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE);
}
catch (Exception e) {
throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
}
}

@Override
public JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
final String zkPath = createZkPath(jobID);
final Stat stat = client.checkExists().forPath(zkPath);
if (stat != null) {
// found some data, try to parse it
final byte[] data = client.getData().forPath(zkPath);
if (data != null) {
try {
final String name = new String(data, ENCODING);
return JobSchedulingStatus.valueOf(name);
}
catch (IllegalArgumentException e) {
throw new IOException("Found corrupt data in ZooKeeper: " +
Arrays.toString(data) + " is no valid job status");
}
}
}

// nothing found, yet, must be in status 'PENDING'
return JobSchedulingStatus.PENDING;
}
catch (Exception e) {
throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e);
}
}

@Override
public void clearJob(JobID jobID) throws IOException {
checkNotNull(jobID);

try {
final String zkPath = createZkPath(jobID);
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
this.client.delete().forPath(zkPath);
}
catch (Exception e) {
throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e);
}
}

private String createZkPath(JobID jobID) {
return runningJobPath + jobID.toString();
}

private void writeEnumToZooKeeper(JobID jobID, JobSchedulingStatus status) throws Exception {
LOG.debug("Setting scheduling state for job {} to {}.", jobID, status);
final String zkPath = createZkPath(jobID);
this.client.newNamespaceAwareEnsurePath(zkPath).ensure(client.getZookeeperClient());
this.client.setData().forPath(zkPath, status.name().getBytes(ENCODING));
}
}

2. 为什么需要基于 Kubernetes 的 HA 方案

一定程度上来说,Kubernetes 已经成为云原生的事实标准。(所谓事实标准,就是谁用的多谁就是标准)。现在很多大数据生态也开始逐步拥抱 Kubernetes 生态,比如分布式计算框架 Flink、Spark 等。但是 Kubernetes 集群在分布式协调中使用的 etcd,并没有使用 zk,所以上面说的基于 zk 的 HA 方案就行不通了。那么我们能不能自己搭建一个 zk 集群来做 Flink 的 HA 呢?当然可以,但是对于之前使用 Kubernetes 的用户为了 Flink 的 HA 需要另外管理一个 zk 集群肯定是不妥的。

针对这种情况,基于 Kubernetes 的 Flink HA 方案就显的比较重要了。有些人可能会问,为什么不是基于 etcd 呢?很简单,Kubernetes 虽然使用的是 etcd,但是并没有将 etcd 服务暴露给用户,而是将 etcd 作为 Kubernetes 自身的一些元数据后端存储。但是我们使用 k8s 也是相当于间接的使用 etcd 了。

3. 方案具体实现

1. 基于 k8s 的 leader 选举

对于 zk 来说,我们通过创建 znode 的 api 的原子性来实现 Leader 选举,对于 k8s 也是类似的。在 k8s 的官方文档中有一篇 2016 年 blog 就详细介绍了如何借助 k8s 来做 leader 选举,原文链接:https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/ 。简单来说就是利用 k8s 的 API 的原子性封装成一个分布式锁,所有参与选举的 candidate 都去竞争该锁,获得锁的就是 leader。

在 k8s 的 client-go 的 api 中,上面的锁叫做 resourcelock,现在 client-go 中支持四种 resourceLock:

  • configMapLock:基于 configMap 资源的操作扩展的分布式锁

  • endpointLock: 基于 endPoint 资源的操作扩展的分布式锁

  • leaseLock:基于 lease 资源,相对来说 leader 资源比较轻量

  • multiLock:多种锁混合使用

    resourceLock 的本质是用锁锁住 k8s 的资源,然后提供特定的 method 供选举使用。这里说的特定的 method 就是 Lock 的 interfact 定义的方法,主要包括如下几个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Interface offers a common interface for locking on arbitrary
// resources used in leader election. The Interface is used
// to hide the details on specific implementations in order to allow
// them to change over time. This interface is strictly for use
// by the leaderelection code.
type Interface interface {
// Get returns the LeaderElectionRecord
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)

// Create attempts to create a LeaderElectionRecord
Create(ctx context.Context, ler LeaderElectionRecord) error

// Update will update and existing LeaderElectionRecord
Update(ctx context.Context, ler LeaderElectionRecord) error

// RecordEvent is used to record events
RecordEvent(string)

// Identity will return the locks Identity
Identity() string

// Describe is used to convert details on current resource lock
// into a string
Describe() string
}

对于特定的 resourceLock 的 实现实际上就是通过调用 k8s 的 api 对资源(configMap,endpoint,release 等)进行操作,比如 configMapLock 的 Create 的实现如下:尝试在特定的 namespace 下创建一个特定名称的 configMap,并且将 leader 信息写入到 annotation 中。(k8s 的 API 对象 annotation 基本可以写入任意信息)当多个 candidate 同时尝试去在同一个 namespace 下创建同名的 configMap 的时候,只有一个会成功,这个就是 leader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Create attempts to create a LeaderElectionRecord annotation
func (cml *ConfigMapLock) Create(ctx context.Context, ler LeaderElectionRecord) error {
recordBytes, err := json.Marshal(ler)
if err != nil {
return err
}
cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(ctx, &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: cml.ConfigMapMeta.Name,
Namespace: cml.ConfigMapMeta.Namespace,
Annotations: map[string]string{
LeaderElectionRecordAnnotationKey: string(recordBytes),
},
},
}, metav1.CreateOptions{})
return err
}

借助于 client-go 提供的 resourceLock package 和 leaderelection,我们可以很容易实现我们自己的 leader 选举逻辑。下面是一个使用 leaseLock 的例子,完整代码在:https://github.com/kubernetes/client-go/blob/master/examples/leader-election/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
 // we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}

// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
run(ctx)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})

上面j介绍了 k8s 中的 leader 选举的部分是在 client-go 中,对于 Java 调用来说我们可以使用 java 对应 client SDK:io.kubernetes:client-java-extend。我们可以看到代码中提供了三种 resourceLock: configMapLock、endpointLock 和 leaseLock 。链接:https://github.com/kubernetes-client/java/tree/master/extended/src/main/java/io/kubernetes/client/extended/leaderelection/resourcelock

我们内部使用的 configMapLock,也就是说使用 configMap 的 annotation 来存储 leader 信息。

1. LeaderElectionService

要实现 Java 版本的基于 k8s 的 LedaerElectionService,我们可以参考 sdk 中提供的 class: LeaderElectorLeaderElector 提供了一个 run 方法用来运行 leader 选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Runs the leader election in foreground.
*/
public void run(Runnable startLeadingHook, Runnable stopLeadingHook) {
run(startLeadingHook, stopLeadingHook, null);
}

/**
* Runs the leader election in foreground.
*/
public void run(
Runnable startLeadingHook, Runnable stopLeadingHook, Consumer<String> onNewLeaderHook) {
this.onNewLeaderHook = onNewLeaderHook;
log.info("Start leader election with lock {}", config.getLock().describe());
try {
if (!acquire()) {
// Fail to acquire leadership
return;
}
log.info("Successfully acquired lease, became leader");
// Hook on start leading
hookWorkers.submit(startLeadingHook);
renewLoop();
log.info("Failed to renew lease, lose leadership");
// Hook on stop leading
stopLeadingHook.run();
} catch (Throwable t) {
stopLeadingHook.run();
}
}

但是这个 run 方法是一次性的,为了实现 Flink 的 LedaerElectionService,我们需要写一个 class 来做一些辅助工作,比如:

  • 封装 LeaderElector 来运行 leader 选举
  • 定义一个 daemon 进程来循环运行 leader 选举
  • 定义一个 Listener 接口用来回调

2. LeaderRetrievalService

基于 k8s 来实现 LeaderRetrievalService 相对来说就比较简单了,因为我们把 leader 信息存储到 k8s 的特定的资源(ConfigMap、Endpoint、Lease)的 annotation 中,所以我们直接读取对应的资源的 annotation 就可以了。但是同样的,我们也需要扩展出一个常驻进程和 Listener 接口用来当 Leader 信息发生变化的时候通知 Listener。

剩下两个 CheckpointRecoveryFactoryRunningJobsRegistry 也是类似。