Spring Cloud全解析:熔断之HystrixCommand如何执行

news/2024/9/18 3:22:43 标签: java

文章目录

    • HystrixCommand如何执行
      • 1.创建MetaHolder
      • 2.创建HystrixInvokable
      • 3.执行
        • 3.1
          • 3.1.1


HystrixCommand如何执行

有一个HystrixCommandAspect是专门用来处理@HystrixCommand注解的

java">@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}

是不是感觉很熟悉,就是一个AOP切面,然后会使用@Around环绕来进行处理

java">@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
  // 获取到对应的执行方法
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                "annotations at the same time");
    }
  // 使用@HystrixCommand注解的话这里的type就是HystrixPointcutType.COMMAND,即得到的CommandMetaHolderFactory
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
  	// 1.创建MetaHolder
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
  // 2.创建HystrixInvokable
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
      // 这里我就先按照 普通的方式来进行分析了,所以就!metaHolder.isObservable()为true
      // 3.执行
        if (!metaHolder.isObservable()) {
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
    return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
            .onErrorResumeNext(new Func1<Throwable, Observable>() {
                @Override
                public Observable call(Throwable throwable) {
                    if (throwable instanceof HystrixBadRequestException) {
                        return Observable.error(throwable.getCause());
                    } else if (throwable instanceof HystrixRuntimeException) {
                        HystrixRuntimeException hystrixRuntimeException = (HystrixRuntimeException) throwable;
                        return Observable.error(hystrixRuntimeExceptionToThrowable(metaHolder, hystrixRuntimeException));
                    }
                    return Observable.error(throwable);
                }
            });
}

1.创建MetaHolder

java">private static class CommandMetaHolderFactory extends MetaHolderFactory {
    @Override
    public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
      // 获取到@HystrixCommand注解
        HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
      // 根据方法的返回类型来区分执行类型
      // Future ->ExecutionType.ASYNCHRONOUS  异步
      // Observable -> ExecutionType.OBSERVABLE  反应式(异步回调)
      // 否则 -> ExecutionType.SYNCHRONOUS 同步
        ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
      // 设置配置的@DefaultProperties以及fallback方法
        MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
        if (isCompileWeaving()) {
            builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
        }
        return builder.defaultCommandKey(method.getName())
                        .hystrixCommand(hystrixCommand)
                        .observableExecutionMode(hystrixCommand.observableExecutionMode())
                        .executionType(executionType)
                        .observable(ExecutionType.OBSERVABLE == executionType)
                        .build();
    }
}

2.创建HystrixInvokable

java">public HystrixInvokable create(MetaHolder metaHolder) {
    HystrixInvokable executable;
    if (metaHolder.isCollapserAnnotationPresent()) {
        executable = new CommandCollapser(metaHolder);
    } else if (metaHolder.isObservable()) {
        executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    } else {
        executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
    }
    return executable;
}

3.执行

java">public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {
        case SYNCHRONOUS: {
          // 进行执行
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

executionType为SYNCHRONOUS

java">public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}
3.1
java"> public Future<R> queue() {
     /*
      * The Future returned by Observable.toBlocking().toFuture() does not implement the
      * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
      * thus, to comply with the contract of Future, we must wrap around it.
      */
     final Future<R> delegate = toObservable().toBlocking().toFuture();
   
     final Future<R> f = new Future<R>() {

         @Override
         public boolean cancel(boolean mayInterruptIfRunning) {
             if (delegate.isCancelled()) {
                 return false;
             }

             if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                 /*
                  * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                  * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                  * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                  * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                  * than that interruption request cannot be taken back.
                  */
                 interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
          }

             final boolean res = delegate.cancel(interruptOnFutureCancel.get());

             if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                 final Thread t = executionThread.get();
                 if (t != null && !t.equals(Thread.currentThread())) {
                     t.interrupt();
                 }
             }

             return res;
}

         @Override
         public boolean isCancelled() {
             return delegate.isCancelled();
}

         @Override
         public boolean isDone() {
             return delegate.isDone();
}

         @Override
         public R get() throws InterruptedException, ExecutionException {
             return delegate.get();
         }

         @Override
         public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
             return delegate.get(timeout, unit);
         }
       
     };

     /* special handling of error states that throw immediately */
     if (f.isDone()) {
         try {
             f.get();
             return f;
         } catch (Exception e) {
             Throwable t = decomposeException(e);
             if (t instanceof HystrixBadRequestException) {
                 return f;
             } else if (t instanceof HystrixRuntimeException) {
                 HystrixRuntimeException hre = (HystrixRuntimeException) t;
                 switch (hre.getFailureType()) {
      case COMMAND_EXCEPTION:
      case TIMEOUT:
         // we don't throw these types from queue() only from queue().get() as they are execution errors
         return f;
      default:
         // these are errors we throw from queue() as they as rejection type errors
         throw hre;
      }
             } else {
                 throw Exceptions.sneakyThrow(t);
             }
         }
     }

     return f;
 }
3.1.1
  • 这里用的是Rxjava的,需要后续研究一下
java">public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    //doOnCompleted handler already did all of the SUCCESS work
    //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
    final Action0 terminateCommandCleanup = new Action0() {

        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    try {
                        executionHook.onUnsubscribe(_cmd);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                    }
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    try {
                        executionHook.onUnsubscribe(_cmd);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                    }
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(true); //user code did run
            }
        }
    };

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };

    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
        @Override
        public R call(R r) {
            R afterFirstApplication = r;

            try {
                afterFirstApplication = executionHook.onComplete(_cmd, r);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
            }

            try {
                return executionHook.onEmit(_cmd, afterFirstApplication);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                return afterFirstApplication;
            }
        }
    };

    final Action0 fireOnCompletedHook = new Action0() {
        @Override
        public void call() {
            try {
                executionHook.onSuccess(_cmd);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
            }
        }
    };

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             /* this is a stateful object so can only be used once */
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                //TODO make a new error type for this
                throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
            }

            commandStartTimestamp = System.currentTimeMillis();

            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }

            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }

            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // put in cache
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }

            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

参考文献

  • HystrixCommand

http://www.niftyadmin.cn/n/5653270.html

相关文章

CCF刷题计划——垦田计划(手握map砍竹子)

垦田计划 计算机软件能力认证考试系统 刷题最有用的一集&#xff01;做这道题的时候&#xff0c;我深刻感受到之前刷的题是有效果的QWQ。如果是以前&#xff0c;优先队列我是想都不会想的&#xff0c;如果是中间&#xff0c;优先队列只得70我就放弃了&#xff0c;但是现在&am…

C#异常数据处理

namespace 异常捕获 { internal class Program { static void Main(string[] args) { /* * try 尝试 * catch 捕获 * exception 例外 */ //错误有两种 运行时错误 和 编译错误 …

Git 的使用以及vscode 下git 的使用(一)

1、git 和svn Git 和 SVN 都是版本控制系统&#xff0c;它们都用于管理代码的版本&#xff0c;但它们之间有一些显著的区别&#xff1a; 分布式 vs 集中式&#xff1a;Git 是一个分布式版本控制系统&#xff0c;这意味着每个开发者都拥有整个代码库的完整副本&#xff0c;并且…

15_分布式数据结构

菜鸟&#xff1a; 老鸟&#xff0c;我最近在处理大量数据的时候遇到了瓶颈&#xff0c;单台服务器的内存和计算能力都不够用了。你知道有什么方法可以解决这个问题吗&#xff1f; 老鸟&#xff1a; 嗯&#xff0c;这种情况很常见。你可以考虑使用分布式数据结构。听说过吗&a…

深入探索Go语言中的函数:匿名函数、指针参数与函数返回

1. Go语言中的函数 函数是任何编程语言中的核心元素&#xff0c;它们帮助我们将大型程序分解为更小的、易于管理的部分。在Go语言中&#xff0c;函数是通过 func 关键字定义的。理想的函数应当是独立的&#xff0c;完成单一任务。如果你发现某个函数正在执行多个任务&#xff…

网络安全(黑客)3个月速成手册

&#x1f91f; 基于入门网络安全/黑客打造的&#xff1a;&#x1f449;黑客&网络安全入门&进阶学习资源包 前言 什么是网络安全 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、…

用Python实现时间序列模型实战——Day 15: 时间序列模型的选择与组合

一、学习内容 1. 模型选择的标准与方法&#xff08;如 AIC、BIC&#xff09; 在时间序列建模中&#xff0c;模型的选择是非常重要的&#xff0c;常用的模型选择标准包括 AIC (Akaike Information Criterion) 和 BIC (Bayesian Information Criterion)。 AIC (Akaike Informat…

2.安卓逆向-初识java语言

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;图灵Python学院 上一个内容&#xff1a;1.安卓逆向-说明 关于java语言的介绍就不写了没啥用直接开始 首先java语言写的代码运行说明 …