博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的RestartStrategies
阅读量:7250 次
发布时间:2019-06-29

本文共 21830 字,大约阅读时间需要 72 分钟。

本文主要研究一下flink的RestartStrategies

RestartStrategies

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

@PublicEvolvingpublic class RestartStrategies {    /**     * Generates NoRestartStrategyConfiguration.     *     * @return NoRestartStrategyConfiguration     */    public static RestartStrategyConfiguration noRestart() {        return new NoRestartStrategyConfiguration();    }    public static RestartStrategyConfiguration fallBackRestart() {        return new FallbackRestartStrategyConfiguration();    }    /**     * Generates a FixedDelayRestartStrategyConfiguration.     *     * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy     * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy     * @return FixedDelayRestartStrategy     */    public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {        return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));    }    /**     * Generates a FixedDelayRestartStrategyConfiguration.     *     * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy     * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy     * @return FixedDelayRestartStrategy     */    public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {        return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);    }    /**     * Generates a FailureRateRestartStrategyConfiguration.     *     * @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job     * @param failureInterval Time interval for failures     * @param delayInterval Delay in-between restart attempts     */    public static FailureRateRestartStrategyConfiguration failureRateRestart(            int failureRate, Time failureInterval, Time delayInterval) {        return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);    }    //......}
  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration

RestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

public abstract static class RestartStrategyConfiguration implements Serializable {        private static final long serialVersionUID = 6285853591578313960L;        private RestartStrategyConfiguration() {}        /**         * Returns a description which is shown in the web interface.         *         * @return Description of the restart strategy         */        public abstract String getDescription();    }
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类

NoRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration {        private static final long serialVersionUID = -5894362702943349962L;        @Override        public String getDescription() {            return "Restart deactivated.";        }        @Override        public boolean equals(Object o) {            if (this == o) {                return true;            }            return o instanceof NoRestartStrategyConfiguration;        }        @Override        public int hashCode() {            return Objects.hash();        }    }
  • NoRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表no restart strategy

FixedDelayRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration {        private static final long serialVersionUID = 4149870149673363190L;        private final int restartAttempts;        private final Time delayBetweenAttemptsInterval;        FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {            this.restartAttempts = restartAttempts;            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;        }        public int getRestartAttempts() {            return restartAttempts;        }        public Time getDelayBetweenAttemptsInterval() {            return delayBetweenAttemptsInterval;        }        @Override        public int hashCode() {            int result = restartAttempts;            result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);            return result;        }        @Override        public boolean equals(Object obj) {            if (obj instanceof FixedDelayRestartStrategyConfiguration) {                FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;                return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);            } else {                return false;            }        }        @Override        public String getDescription() {            return "Restart with fixed delay (" + delayBetweenAttemptsInterval + "). #"                + restartAttempts + " restart attempts.";        }    }
  • FixedDelayRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表fixed delay restart strategy,它有restartAttempts及delayBetweenAttemptsInterval两个属性

FailureRateRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {        private static final long serialVersionUID = 1195028697539661739L;        private final int maxFailureRate;        private final Time failureInterval;        private final Time delayBetweenAttemptsInterval;        public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {            this.maxFailureRate = maxFailureRate;            this.failureInterval = failureInterval;            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;        }        public int getMaxFailureRate() {            return maxFailureRate;        }        public Time getFailureInterval() {            return failureInterval;        }        public Time getDelayBetweenAttemptsInterval() {            return delayBetweenAttemptsInterval;        }        @Override        public String getDescription() {            return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()                    + " and fixed delay " + delayBetweenAttemptsInterval.toString();        }        @Override        public boolean equals(Object o) {            if (this == o) {                return true;            }            if (o == null || getClass() != o.getClass()) {                return false;            }            FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;            return maxFailureRate == that.maxFailureRate &&                Objects.equals(failureInterval, that.failureInterval) &&                Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);        }        @Override        public int hashCode() {            return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);        }    }
  • FailureRateRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表failure rate restart strategy,它有maxFailureRate、failureInterval、delayBetweenAttemptsInterval三个属性

FallbackRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {        private static final long serialVersionUID = -4441787204284085544L;        @Override        public String getDescription() {            return "Cluster level default restart strategy";        }        @Override        public boolean equals(Object o) {            if (this == o) {                return true;            }            return o instanceof FallbackRestartStrategyConfiguration;        }        @Override        public int hashCode() {            return Objects.hash();        }    }
  • FallbackRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表Cluster level default restart strategy

RestartStrategyResolving

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java

public final class RestartStrategyResolving {    /**     * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.     * The resolving strategy is as follows:     * 
    *
  1. Strategy set within job graph.
  2. *
  3. Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing * is enabled.
  4. *
  5. If no strategy was set on client and server side and checkpointing was enabled then * {@link FixedDelayRestartStrategy} is used
  6. *
* * @param clientConfiguration restart configuration given within the job graph * @param serverStrategyFactory default server side strategy factory * @param isCheckpointingEnabled if checkpointing was enabled for the job * @return resolved strategy */ public static RestartStrategy resolve( RestartStrategies.RestartStrategyConfiguration clientConfiguration, RestartStrategyFactory serverStrategyFactory, boolean isCheckpointingEnabled) { final RestartStrategy clientSideRestartStrategy = RestartStrategyFactory.createRestartStrategy(clientConfiguration); if (clientSideRestartStrategy != null) { return clientSideRestartStrategy; } else { if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) { return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory) .createRestartStrategy(isCheckpointingEnabled); } else { return serverStrategyFactory.createRestartStrategy(); } } } private RestartStrategyResolving() { }}
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy

RestartStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java

public interface RestartStrategy {    /**     * True if the restart strategy can be applied to restart the {@link ExecutionGraph}.     *     * @return true if restart is possible, otherwise false     */    boolean canRestart();    /**     * Called by the ExecutionGraph to eventually trigger a full recovery.     * The recovery must be triggered on the given callback object, and may be delayed     * with the help of the given scheduled executor.     *     * 

The thread that calls this method is not supposed to block/sleep. * * @param restarter The hook to restart the ExecutionGraph * @param executor An scheduled executor to delay the restart */ void restart(RestartCallback restarter, ScheduledExecutor executor);}

  • RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

NoRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java

public class NoRestartStrategy implements RestartStrategy {    @Override    public boolean canRestart() {        return false;    }    @Override    public void restart(RestartCallback restarter, ScheduledExecutor executor) {        throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");    }    /**     * Creates a NoRestartStrategyFactory instance.     *     * @param configuration Configuration object which is ignored     * @return NoRestartStrategyFactory instance     */    public static NoRestartStrategyFactory createFactory(Configuration configuration) {        return new NoRestartStrategyFactory();    }    @Override    public String toString() {        return "NoRestartStrategy";    }    public static class NoRestartStrategyFactory extends RestartStrategyFactory {        private static final long serialVersionUID = -1809462525812787862L;        @Override        public RestartStrategy createRestartStrategy() {            return new NoRestartStrategy();        }    }}
  • NoRestartStrategy实现了RestartStrategy接口,它的canRestart方法返回false,restart方法抛出UnsupportedOperationException

FixedDelayRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java

public class FixedDelayRestartStrategy implements RestartStrategy {    private final int maxNumberRestartAttempts;    private final long delayBetweenRestartAttempts;    private int currentRestartAttempt;    public FixedDelayRestartStrategy(        int maxNumberRestartAttempts,        long delayBetweenRestartAttempts) {        Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive.");        Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");        this.maxNumberRestartAttempts = maxNumberRestartAttempts;        this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;        currentRestartAttempt = 0;    }    public int getCurrentRestartAttempt() {        return currentRestartAttempt;    }    @Override    public boolean canRestart() {        return currentRestartAttempt < maxNumberRestartAttempts;    }    @Override    public void restart(final RestartCallback restarter, ScheduledExecutor executor) {        currentRestartAttempt++;        executor.schedule(new Runnable() {            @Override            public void run() {                restarter.triggerFullRecovery();            }        }, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);    }    /**     * Creates a FixedDelayRestartStrategy from the given Configuration.     *     * @param configuration Configuration containing the parameter values for the restart strategy     * @return Initialized instance of FixedDelayRestartStrategy     * @throws Exception     */    public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {        int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);        String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);        long delay;        try {            delay = Duration.apply(delayString).toMillis();        } catch (NumberFormatException nfe) {            throw new Exception("Invalid config value for " +                    ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +                    ". Value must be a valid duration (such as '100 milli' or '10 s')");        }        return new FixedDelayRestartStrategyFactory(maxAttempts, delay);    }    @Override    public String toString() {        return "FixedDelayRestartStrategy(" +                "maxNumberRestartAttempts=" + maxNumberRestartAttempts +                ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +                ')';    }    public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {        private static final long serialVersionUID = 6642934067762271950L;        private final int maxAttempts;        private final long delay;        public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {            this.maxAttempts = maxAttempts;            this.delay = delay;        }        @Override        public RestartStrategy createRestartStrategy() {            return new FixedDelayRestartStrategy(maxAttempts, delay);        }    }}
  • FixedDelayRestartStrategy实现了RestartStrategy接口,它的canRestart方法依据currentRestartAttempt及maxNumberRestartAttempts来判断;restart方法则直接调用ScheduledExecutor.schedule方法,延时delayBetweenRestartAttempts毫秒执行RestartCallback.triggerFullRecovery()

FailureRateRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java

public class FailureRateRestartStrategy implements RestartStrategy {    private final Time failuresInterval;    private final Time delayInterval;    private final int maxFailuresPerInterval;    private final ArrayDeque
restartTimestampsDeque; public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) { Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null."); Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null."); Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0."); Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms."); Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms."); this.failuresInterval = failuresInterval; this.delayInterval = delayInterval; this.maxFailuresPerInterval = maxFailuresPerInterval; this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval); } @Override public boolean canRestart() { if (isRestartTimestampsQueueFull()) { Long now = System.currentTimeMillis(); Long earliestFailure = restartTimestampsDeque.peek(); return (now - earliestFailure) > failuresInterval.toMilliseconds(); } else { return true; } } @Override public void restart(final RestartCallback restarter, ScheduledExecutor executor) { if (isRestartTimestampsQueueFull()) { restartTimestampsDeque.remove(); } restartTimestampsDeque.add(System.currentTimeMillis()); executor.schedule(new Runnable() { @Override public void run() { restarter.triggerFullRecovery(); } }, delayInterval.getSize(), delayInterval.getUnit()); } private boolean isRestartTimestampsQueueFull() { return restartTimestampsDeque.size() >= maxFailuresPerInterval; } @Override public String toString() { return "FailureRateRestartStrategy(" + "failuresInterval=" + failuresInterval + "delayInterval=" + delayInterval + "maxFailuresPerInterval=" + maxFailuresPerInterval + ")"; } public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception { int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1); String failuresIntervalString = configuration.getString( ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString() ); String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL); String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString); Duration failuresInterval = Duration.apply(failuresIntervalString); Duration delay = Duration.apply(delayString); return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis())); } public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory { private static final long serialVersionUID = -373724639430960480L; private final int maxFailuresPerInterval; private final Time failuresInterval; private final Time delayInterval; public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) { this.maxFailuresPerInterval = maxFailuresPerInterval; this.failuresInterval = Preconditions.checkNotNull(failuresInterval); this.delayInterval = Preconditions.checkNotNull(delayInterval); } @Override public RestartStrategy createRestartStrategy() { return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval); } }}
  • FailureRateRestartStrategy实现了RestartStrategy接口,它的canRestart方法在restartTimestampsDeque队列大小小于maxFailuresPerInterval时返回true,大于等于maxFailuresPerInterval时则判断当前时间距离earliestFailure是否大于failuresInterval;restart方法则往restartTimestampsDeque添加当前时间,然后调用ScheduledExecutor.schedule方法,延时delayInterval执行RestartCallback.triggerFullRecovery()

小结

  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy;RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

doc

转载地址:http://xohbm.baihongyu.com/

你可能感兴趣的文章
镜像仓库Harbor私服高可用策略分析及部署
查看>>
重写cnodejs学习整理
查看>>
从浏览器渲染的角度谈谈html标签的语义化
查看>>
文件权限及特殊权限管理SUID、SGID和Sticky
查看>>
iis 7 asp.net ajax post 请求字节过大报错问题解决办法
查看>>
高仿腾讯QQ即时通讯IM项目
查看>>
winform 中xml简单的创建和读取
查看>>
活动设计的“七宗罪”(转)
查看>>
如何在ChemDraw中输入℃温度符号
查看>>
SSH-Struts第二弹:一个Form提交两个Action
查看>>
词法分析
查看>>
Linux命令(二)
查看>>
Web登录验证之 Shiro
查看>>
LeeCode-Sort Colors
查看>>
Snort2.9.2.3 Installation on CentOS 6.2
查看>>
我的友情链接
查看>>
给软件工程师的自学建议
查看>>
Linux下SVN的备份方式
查看>>
hadoop 3.0.0 alpha1 分布式搭建
查看>>
刘宇凡:从吃饭中的道理领悟SEO
查看>>