博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的AbstractNonHaServices
阅读量:6885 次
发布时间:2019-06-27

本文共 17359 字,大约阅读时间需要 57 分钟。

本文主要研究一下flink的AbstractNonHaServices

HighAvailabilityServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java

public interface HighAvailabilityServices extends AutoCloseable {    // ------------------------------------------------------------------------    //  Constants    // ------------------------------------------------------------------------    /**     * This UUID should be used when no proper leader election happens, but a simple     * pre-configured leader is used. That is for example the case in non-highly-available     * standalone setups.     */    UUID DEFAULT_LEADER_ID = new UUID(0, 0);    /**     * This JobID should be used to identify the old JobManager when using the     * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a     * distinct JobID assigned.     */    JobID DEFAULT_JOB_ID = new JobID(0L, 0L);    // ------------------------------------------------------------------------    //  Services    // ------------------------------------------------------------------------    /**     * Gets the leader retriever for the cluster's resource manager.     */    LeaderRetrievalService getResourceManagerLeaderRetriever();    /**     * Gets the leader retriever for the dispatcher. This leader retrieval service     * is not always accessible.     */    LeaderRetrievalService getDispatcherLeaderRetriever();    /**     * Gets the leader retriever for the job JobMaster which is responsible for the given job     *     * @param jobID The identifier of the job.     * @return Leader retrieval service to retrieve the job manager for the given job     * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.     */    @Deprecated    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);    /**     * Gets the leader retriever for the job JobMaster which is responsible for the given job     *     * @param jobID The identifier of the job.     * @param defaultJobManagerAddress JobManager address which will be returned by     *                              a static leader retrieval service.     * @return Leader retrieval service to retrieve the job manager for the given job     */    LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);    LeaderRetrievalService getWebMonitorLeaderRetriever();    /**     * Gets the leader election service for the cluster's resource manager.     *     * @return Leader election service for the resource manager leader election     */    LeaderElectionService getResourceManagerLeaderElectionService();    /**     * Gets the leader election service for the cluster's dispatcher.     *     * @return Leader election service for the dispatcher leader election     */    LeaderElectionService getDispatcherLeaderElectionService();    /**     * Gets the leader election service for the given job.     *     * @param jobID The identifier of the job running the election.     * @return Leader election service for the job manager leader election     */    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);    LeaderElectionService getWebMonitorLeaderElectionService();    /**     * Gets the checkpoint recovery factory for the job manager     *     * @return Checkpoint recovery factory     */    CheckpointRecoveryFactory getCheckpointRecoveryFactory();    /**     * Gets the submitted job graph store for the job manager     *     * @return Submitted job graph store     * @throws Exception if the submitted job graph store could not be created     */    SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;    /**     * Gets the registry that holds information about whether jobs are currently running.     *     * @return Running job registry to retrieve running jobs     */    RunningJobsRegistry getRunningJobsRegistry() throws Exception;    /**     * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.     *     * @return Blob store     * @throws IOException if the blob store could not be created     */    BlobStore createBlobStore() throws IOException;    // ------------------------------------------------------------------------    //  Shutdown and Cleanup    // ------------------------------------------------------------------------    /**     * Closes the high availability services, releasing all resources.     *      * 

This method does not delete or clean up any data stored in external stores * (file systems, ZooKeeper, etc). Another instance of the high availability * services will be able to recover the job. * *

If an exception occurs during closing services, this method will attempt to * continue closing other services and report exceptions only after all services * have been attempted to be closed. * * @throws Exception Thrown, if an exception occurred while closing these services. */ @Override void close() throws Exception; /** * Closes the high availability services (releasing all resources) and deletes * all data stored by these services in external stores. * *

After this method was called, the any job or session that was managed by * these high availability services will be unrecoverable. * *

If an exception occurs during cleanup, this method will attempt to * continue the cleanup and report exceptions only after all cleanup steps have * been attempted. * * @throws Exception Thrown, if an exception occurred while closing these services * or cleaning up data stored by them. */ void closeAndCleanupAllData() throws Exception;}

  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices

AbstractNonHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/AbstractNonHaServices.java

public abstract class AbstractNonHaServices implements HighAvailabilityServices {    protected final Object lock = new Object();    private final RunningJobsRegistry runningJobsRegistry;    private final VoidBlobStore voidBlobStore;    private boolean shutdown;    public AbstractNonHaServices() {        this.runningJobsRegistry = new StandaloneRunningJobsRegistry();        this.voidBlobStore = new VoidBlobStore();        shutdown = false;    }    // ----------------------------------------------------------------------    // HighAvailabilityServices method implementations    // ----------------------------------------------------------------------    @Override    public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneCheckpointRecoveryFactory();        }    }    @Override    public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {        synchronized (lock) {            checkNotShutdown();            return new StandaloneSubmittedJobGraphStore();        }    }    @Override    public RunningJobsRegistry getRunningJobsRegistry() throws Exception {        synchronized (lock) {            checkNotShutdown();            return runningJobsRegistry;        }    }    @Override    public BlobStore createBlobStore() throws IOException {        synchronized (lock) {            checkNotShutdown();            return voidBlobStore;        }    }    @Override    public void close() throws Exception {        synchronized (lock) {            if (!shutdown) {                shutdown = true;            }        }    }    @Override    public void closeAndCleanupAllData() throws Exception {        // this stores no data, so this method is the same as 'close()'        close();    }    // ----------------------------------------------------------------------    // Helper methods    // ----------------------------------------------------------------------    @GuardedBy("lock")    protected void checkNotShutdown() {        checkState(!shutdown, "high availability services are shut down");    }    protected boolean isShutDown() {        return shutdown;    }}
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices

EmbeddedHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServices.java

public class EmbeddedHaServices extends AbstractNonHaServices {    private final Executor executor;    private final EmbeddedLeaderService resourceManagerLeaderService;    private final EmbeddedLeaderService dispatcherLeaderService;    private final HashMap
jobManagerLeaderServices; private final EmbeddedLeaderService webMonitorLeaderService; public EmbeddedHaServices(Executor executor) { this.executor = Preconditions.checkNotNull(executor); this.resourceManagerLeaderService = new EmbeddedLeaderService(executor); this.dispatcherLeaderService = new EmbeddedLeaderService(executor); this.jobManagerLeaderServices = new HashMap<>(); this.webMonitorLeaderService = new EmbeddedLeaderService(executor); } // ------------------------------------------------------------------------ // services // ------------------------------------------------------------------------ @Override public LeaderRetrievalService getResourceManagerLeaderRetriever() { return resourceManagerLeaderService.createLeaderRetrievalService(); } @Override public LeaderRetrievalService getDispatcherLeaderRetriever() { return dispatcherLeaderService.createLeaderRetrievalService(); } @Override public LeaderElectionService getResourceManagerLeaderElectionService() { return resourceManagerLeaderService.createLeaderElectionService(); } @Override public LeaderElectionService getDispatcherLeaderElectionService() { return dispatcherLeaderService.createLeaderElectionService(); } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) { checkNotNull(jobID); synchronized (lock) { checkNotShutdown(); EmbeddedLeaderService service = getOrCreateJobManagerService(jobID); return service.createLeaderRetrievalService(); } } @Override public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) { return getJobManagerLeaderRetriever(jobID); } @Override public LeaderRetrievalService getWebMonitorLeaderRetriever() { return webMonitorLeaderService.createLeaderRetrievalService(); } @Override public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) { checkNotNull(jobID); synchronized (lock) { checkNotShutdown(); EmbeddedLeaderService service = getOrCreateJobManagerService(jobID); return service.createLeaderElectionService(); } } @Override public LeaderElectionService getWebMonitorLeaderElectionService() { return webMonitorLeaderService.createLeaderElectionService(); } // ------------------------------------------------------------------------ // internal // ------------------------------------------------------------------------ @GuardedBy("lock") private EmbeddedLeaderService getOrCreateJobManagerService(JobID jobID) { EmbeddedLeaderService service = jobManagerLeaderServices.get(jobID); if (service == null) { service = new EmbeddedLeaderService(executor); jobManagerLeaderServices.put(jobID, service); } return service; } // ------------------------------------------------------------------------ // shutdown // ------------------------------------------------------------------------ @Override public void close() throws Exception { synchronized (lock) { if (!isShutDown()) { // stop all job manager leader services for (EmbeddedLeaderService service : jobManagerLeaderServices.values()) { service.shutdown(); } jobManagerLeaderServices.clear(); resourceManagerLeaderService.shutdown(); webMonitorLeaderService.shutdown(); } super.close(); } }}
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices

StandaloneHaServices

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/highavailability/nonha/standalone/StandaloneHaServices.java

public class StandaloneHaServices extends AbstractNonHaServices {    /** The constant name of the ResourceManager RPC endpoint */    private static final String RESOURCE_MANAGER_RPC_ENDPOINT_NAME = "resource_manager";    /** The fix address of the ResourceManager */    private final String resourceManagerAddress;    /** The fix address of the Dispatcher */    private final String dispatcherAddress;    /** The fix address of the JobManager */    private final String jobManagerAddress;    private final String webMonitorAddress;    /**     * Creates a new services class for the fix pre-defined leaders.     *     * @param resourceManagerAddress    The fix address of the ResourceManager     * @param webMonitorAddress     */    public StandaloneHaServices(            String resourceManagerAddress,            String dispatcherAddress,            String jobManagerAddress,            String webMonitorAddress) {        this.resourceManagerAddress = checkNotNull(resourceManagerAddress, "resourceManagerAddress");        this.dispatcherAddress = checkNotNull(dispatcherAddress, "dispatcherAddress");        this.jobManagerAddress = checkNotNull(jobManagerAddress, "jobManagerAddress");        this.webMonitorAddress = checkNotNull(webMonitorAddress, webMonitorAddress);    }    // ------------------------------------------------------------------------    //  Services    // ------------------------------------------------------------------------    @Override    public LeaderRetrievalService getResourceManagerLeaderRetriever() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderRetrievalService(resourceManagerAddress, DEFAULT_LEADER_ID);        }    }    @Override    public LeaderRetrievalService getDispatcherLeaderRetriever() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderRetrievalService(dispatcherAddress, DEFAULT_LEADER_ID);        }    }    @Override    public LeaderElectionService getResourceManagerLeaderElectionService() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderElectionService();        }    }    @Override    public LeaderElectionService getDispatcherLeaderElectionService() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderElectionService();        }    }    @Override    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderRetrievalService(jobManagerAddress, DEFAULT_LEADER_ID);        }    }    @Override    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderRetrievalService(defaultJobManagerAddress, DEFAULT_LEADER_ID);        }    }    @Override    public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderElectionService();        }    }    @Override    public LeaderRetrievalService getWebMonitorLeaderRetriever() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderRetrievalService(webMonitorAddress, DEFAULT_LEADER_ID);        }    }    @Override    public LeaderElectionService getWebMonitorLeaderElectionService() {        synchronized (lock) {            checkNotShutdown();            return new StandaloneLeaderElectionService();        }    }}
  • StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

小结

  • HighAvailabilityServices定义了highly-available所需的各种services的get方法,它有两个直接子类,一个是ZooKeeperHaServices,一个是AbstractNonHaServices
  • AbstractNonHaServices实现了HighAvailabilityServices的getCheckpointRecoveryFactory、getSubmittedJobGraphStore、getRunningJobsRegistry、createBlobStore、close、closeAndCleanupAllData方法;它有两个子类,分别是EmbeddedHaServices及StandaloneHaServices
  • EmbeddedHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在ResourceManager, JobManagers, TaskManagers运行在同一个进程的non-high-availability场景下的实现,FlinkMiniCluster使用的就是EmbeddedHaServices;StandaloneHaServices继承了AbstractNonHaServices,它是对HighAvailabilityServices接口在non-high-availability场景下的实现,ClusterEntrypoint在highAvailabilityMode为NONE的时候使用的是StandaloneHaServices

doc

转载地址:http://xhxbl.baihongyu.com/

你可能感兴趣的文章
原生JS替代jQuery的各种方法汇总
查看>>
彻底删除Cygwin
查看>>
一次单核CPU占用过高问题的处理
查看>>
js常用加解密函数汇总
查看>>
排查nginx、tomcat内存和服务器负载之后
查看>>
[转]比特币测试链——Testnet介绍
查看>>
如何让自己的电脑发布ASP http://jingyan.baidu.com/article/19192ad853224ce53f570748.html
查看>>
SQL基础
查看>>
String是基本的数据类型吗?
查看>>
程序员的八个级别
查看>>
为什么互联网产品的成功率这么低
查看>>
Android动画学习笔记-Android Animation
查看>>
sql2003安装sql2005企业版
查看>>
HDU 4394 Digital Square
查看>>
ASP.NET 1.1 中 QueryString 的安全获取写法
查看>>
.Net Remoting(分离服务程序实现) - Part.3
查看>>
Repeater控件结合UpdatePanel实现Ajax分页和删除功能
查看>>
js问题总结
查看>>
资源分享来源
查看>>
TServerSocket组件
查看>>