这里说的 HA 是 HighAvailability 的缩写,HA 在很多分布式系统中都有涉及,主要是为了解决 SPOF (single point of failure,也就是单点)问题。举个例子,大数据生态的分布式文件系统 HDFS 的 NameNode (可以简单理解为 master-slave 结构中的 master)在生产环境都是要开启 HA 的。类似的,很多 HA 都是针对分布式架构(master-slave)中的 master。
那么具体一点,HA 如何来解决单点问题呢?在非 HA 的系统中,master 一旦挂掉,很多 master 角色承担的工作将会停止。比如 Flink 的 JobManager 负责 task 的转发。在非 HA 情况下我们不得不重启作业从头开始计算。但是有了 HA 之后,一台 JobManager 挂掉之后,会有新的 JobManager 来取代他从而使得作业可以继续运行。
需要 HA 就需要多个同样的服务进程的同时存在,但是真正服务的应该只有一个进程,这个就是 Leader,而如何在这些服务中选出一个进行服务就是 Leader 选举。
/** * Gets the leader retriever for the job JobMaster which is responsible for the given job. * * @param jobID The identifier of the job. * @param defaultJobManagerAddress JobManager address which will be returned by * a static leader retrieval service. * @return Leader retrieval service to retrieve the job manager for the given job */ LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);
/** * Gets the leader election service for the cluster's resource manager. * * @return Leader election service for the resource manager leader election */ LeaderElectionService getResourceManagerLeaderElectionService();
/** * Gets the leader election service for the cluster's dispatcher. * * @return Leader election service for the dispatcher leader election */ LeaderElectionService getDispatcherLeaderElectionService();
/** * Gets the leader election service for the given job. * * @param jobID The identifier of the job running the election. * @return Leader election service for the job manager leader election */ LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
/** * Gets the checkpoint recovery factory for the job manager. * * @return Checkpoint recovery factory */ CheckpointRecoveryFactory getCheckpointRecoveryFactory();
/** * Gets the submitted job graph store for the job manager. * * @return Submitted job graph store * @throws Exception if the submitted job graph store could not be created */ JobGraphStore getJobGraphStore()throws Exception;
/** * Gets the registry that holds information about whether jobs are currently running. * * @return Running job registry to retrieve running jobs */ RunningJobsRegistry getRunningJobsRegistry()throws Exception;
/** * Creates the BLOB store in which BLOBs are stored in a highly-available fashion. * * @return Blob store * @throws IOException if the blob store could not be created */ BlobStore createBlobStore()throws IOException;
/** * Gets the leader election service for the cluster's rest endpoint. * * @return the leader election service used by the cluster's rest endpoint */ default LeaderElectionService getClusterRestEndpointLeaderElectionService(){ // for backwards compatibility we delegate to getWebMonitorLeaderElectionService // all implementations of this interface should override getClusterRestEndpointLeaderElectionService, though return getWebMonitorLeaderElectionService(); }
@Override default LeaderRetrievalService getClusterRestEndpointLeaderRetriever(){ // for backwards compatibility we delegate to getWebMonitorLeaderRetriever // all implementations of this interface should override getClusterRestEndpointLeaderRetriever, though return getWebMonitorLeaderRetriever(); }
// ------------------------------------------------------------------------ // Shutdown and Cleanup // ------------------------------------------------------------------------
/** * Closes the high availability services, releasing all resources. * * <p>This method <b>does not delete or clean up</b> any data stored in external stores * (file systems, ZooKeeper, etc). Another instance of the high availability * services will be able to recover the job. * * <p>If an exception occurs during closing services, this method will attempt to * continue closing other services and report exceptions only after all services * have been attempted to be closed. * * @throws Exception Thrown, if an exception occurred while closing these services. */ @Override voidclose()throws Exception;
/** * Closes the high availability services (releasing all resources) and deletes * all data stored by these services in external stores. * * <p>After this method was called, the any job or session that was managed by * these high availability services will be unrecoverable. * * <p>If an exception occurs during cleanup, this method will attempt to * continue the cleanup and report exceptions only after all cleanup steps have * been attempted. * * @throws Exception Thrown, if an exception occurred while closing these services * or cleaning up data stored by them. */ voidcloseAndCleanupAllData()throws Exception; }
/** * Gets the leader election service for the cluster's resource manager. */ LeaderElectionService getResourceManagerLeaderElectionService(); /** * Gets the leader election service for the cluster's dispatcher. */ LeaderElectionService getDispatcherLeaderElectionService(); /** * Gets the leader election service for the given job. */ LeaderElectionService getJobManagerLeaderElectionService(JobID jobID); /** * Gets the leader election service for the cluster's rest endpoint. */ default LeaderElectionService getClusterRestEndpointLeaderElectionService()
publicclassZooKeeperLeaderRetrievalServiceimplementsLeaderRetrievalService, NodeCacheListener, UnhandledErrorListener{ /** Connection to the used ZooKeeper quorum. */ privatefinal CuratorFramework client;
/** Curator recipe to watch changes of a specific ZooKeeper node. */ privatefinal NodeCache cache; @Override publicvoidstart(LeaderRetrievalListener listener)throws Exception { synchronized (lock) { leaderListener = listener;
try { writeEnumToZooKeeper(jobID, JobSchedulingStatus.RUNNING); } catch (Exception e) { thrownew IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e); } }
try { writeEnumToZooKeeper(jobID, JobSchedulingStatus.DONE); } catch (Exception e) { thrownew IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e); } }
@Override public JobSchedulingStatus getJobSchedulingStatus(JobID jobID)throws IOException { checkNotNull(jobID);
try { final String zkPath = createZkPath(jobID); final Stat stat = client.checkExists().forPath(zkPath); if (stat != null) { // found some data, try to parse it finalbyte[] data = client.getData().forPath(zkPath); if (data != null) { try { final String name = new String(data, ENCODING); return JobSchedulingStatus.valueOf(name); } catch (IllegalArgumentException e) { thrownew IOException("Found corrupt data in ZooKeeper: " + Arrays.toString(data) + " is no valid job status"); } } }
// nothing found, yet, must be in status 'PENDING' return JobSchedulingStatus.PENDING; } catch (Exception e) { thrownew IOException("Get finished state from zk fail for job " + jobID.toString(), e); } }
// Interface offers a common interface for locking on arbitrary // resources used in leader election. The Interface is used // to hide the details on specific implementations in order to allow // them to change over time. This interface is strictly for use // by the leaderelection code. type Interface interface { // Get returns the LeaderElectionRecord Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events RecordEvent(string)
// Identity will return the locks Identity Identity() string
// Describe is used to convert details on current resource lock // into a string Describe() string }
// we use the Lease lock type since edits to Leases are less common // and fewer objects in the cluster watch "all Leases". lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: leaseLockName, Namespace: leaseLockNamespace, }, Client: client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, }
// start the leader election code loop leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, // IMPORTANT: you MUST ensure that any code you have that // is protected by the lease must terminate **before** // you call cancel. Otherwise, you could have a background // loop still running and another process could // get elected before your background loop finished, violating // the stated goal of the lease. ReleaseOnCancel: true, LeaseDuration: 60 * time.Second, RenewDeadline: 15 * time.Second, RetryPeriod: 5 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { // we're notified when we start - this is where you would // usually put your code run(ctx) }, OnStoppedLeading: func() { // we can do cleanup here klog.Infof("leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { // we're notified when new leader elected if identity == id { // I just got the lock return } klog.Infof("new leader elected: %s", identity) }, }, })