飞哥教你使用异步编程提升服务性能

2018/5/13 posted in  Java

https://mp.weixin.qq.com/s/kchPiySDDu1FMF3tuEggiw

作者:肖飞,2011 年加入京东,目前在交易平台,主导交易平台核心系统的架构优化和技术攻关,以及公共技术组件和平台的建设。

庞大复杂的系统通常会采用服务化组件来实现。系统越复杂,组件之间的依赖和调用关系也会越复杂。对于处于底层的基础服务,直接和间接的调用所带来的流量压力非常大。处于中间层的聚合型服务,面对的挑战则是依赖的服务太多,后端个别服务的性能延迟就会影响其吞吐量。性能优化是我们系统稳定性中的重要一环,这其中,调用所依赖的 RPC 服务或后端数据是重点之一。

目前,除了传统 JDBC 这样从 API 到主流驱动实现就是阻塞式的类库之外,其他常用的 RPC/HTTP 服务、MQ、Redis、Mongodb、Kafka 等系统都提供了成熟的基于 NIO 的客户端库,也有相应的异步 API。

但是目前交易平台的大多数中台服务系统,还在习惯性使用着这些库的同步 API,并不能充分的利用 CPU,这也给我们带来了一定的优化空间。从 16 年开始我们在一些核心的但是服务逻辑相对简单的系统中使用异步方式来实现,虽然暂时还做不到完全的异步化,但是也取得了比较好的效果。这篇文章虽然更多是一个简介性质,但是也涵盖了我们在异步编程中需要关注的要点。希望大家能够习惯和拥抱异步编程。

一、相关概念介绍

同步 (Synchronous)/ 异步 (Asynchronous),通常是指函数调用中的消息通信的两种不同模式。

1、异步和同步的区别

函数调用发生时,消息 (参数) 从 caller 传递到 callee,控制权 (指令执行) 从 caller 转移到 callee。调用返回时,控制权从 callee 转移到 caller。两者的区别在于,callee 是否需要等待执行完成才将控制权转移给 caller。

在 RPC 这种更复杂的场景下,本质上并没有不同。

◆ 同步

1.callee 执行完成才返回
2. 返回值即结果

◆ 异步

1.callee 不需要执行完成就可返回
2.caller 要获取结果,需要通过轮询、回调等机制

◆ 同步 RPC

◆ 异步 RPC

可以看到,在异步 RPC 的场景下,客户端和服务端用于处理 IO 的 CPU 能得到充分利用,通常只需要远低于 caller 请求数量的线程就可以了,这就是多路复用。

2、callee 执行机制

上图中 callee 的 background execute 通常是采用池化线程来完成的,比如 ThreadPoolExecutor 或 EventLoop1。

3、caller 获取执行结果

caller 调用 callee 时,如果需要获取执行结果(消息双向传递),或者获知执行是否完成(消息单向传递无返回值),在异步模式下,主要依靠下面两种机制。

◆ 轮询 (Polling)

比如 Java 的 Future 就提供了 isDone() 这种询问机制。

//Caller.java
void call() {
     Future<Void> f = callee.asyncCall(param);
     // do some other things
     while (true) {
          if (f.isDone()) break;
        //do some other things or sleep or timeout
    }
}

或阻塞版本

//Caller.java
void call() {
    Future<Void> f = callee.asyncCall(param);
    // do some other things
    f.get(timeout, TimeUnit.SECONDS);
}

轮询的控制逻辑在 caller 端。

◆ 回调 (Callback)

caller 设置一个回调函数,供 callee 执行完成后调用这个函数。回调的控制是反转的,通常由 callee 端控制。

//Caller.java
void call() {
    callee.asyncCall(param, new AsyncHandler<Response<Message>>() {
        @Override public void handleResponse(Response<Message> response) {
            msg = response.get();
            // process the msg...
        }
    });
    // do some other things
}

4、异步模式的场景

◆ 阻塞

阻塞 (Blocking)/ 非阻塞(Non-Blocking) 是用来描述,在等待调用结果时 caller 线程的状态。阻塞,通常意味着 caller 线程不再使用 CPU 时间,处于可被 OS 调度的状态(注意与 Java 线程状态 2 的区别)。 磁盘 IO 和网络 IO 是常见的会引起线程阻塞的场景 3。受制于底层 OS 的同步阻塞式 IO 系统函数,调用 Java OIO(Old blocking IO) API 无疑是会阻塞的。对于 DiskIO,Java NIO2 提供了异步 API。对于 SocketIO,Java NIO2 以及 NIO 框架 Netty,都提供了异步 API。

◆ Linux 提供了异步 IO 系统函数,只能用于 DiskIO,还有一些限制 4,Java NIO2 AsynchronousFileChannel 内部仍然使用线程池 + 阻塞式 API 的实现。

◆ Linux 为 SocketIO 准备就绪阶段提供了非阻塞式 API(select/poll/epoll),但是 IO 执行阶段仍然是同步阻塞的,因此主流的 Java NIO 框架的 Reactor 模式内部实现使用了线程池。

◆ 并行

比如需要调用多个没有依赖关系的服务,或者访问分散在多个存储分片中的数据,如果服务接口或数据访问接口实现了异步 API,那么就很方便实现并行调用,减少总体调用耗时。

◆ 速度不匹配

使用中间队列解偶 caller 和 callee 的速度不匹配问题,削峰填谷。

◆ 批量

使用中间队列解偶 caller 和 callee 的速度不匹配问题,削峰填谷。

二、异步 API 的几种风格

1、Callback

这个比较传统,比如 zookeeper 客户端提供的基于回调的异步 API:

try {
    zookeeper.create(path, data, acl, createMode, new StringCallback() {
        public void processResult(int rc, String path, Object ctx, String name) {
            if (rc != KeeperException.Code.OK.intValue()) {
                // error handle
            } else {
                // success process
                // 如果需要在成功后再发起基于回调的异步调用,会形成callback hell
            }
        }
    }, ctx);
} catch( Throwable e) {
    // error handle
}

◆ Callback 通常是无状态的 

◆ 要获取 Callback 的计算结果,通常需要 closure 

◆ 异常处理比较分散 

◆ 在有多个异步调用链的时候,容易造成 Callback hell

2、Future/Promise

Promise 是 callee 给 caller 的凭证,代表未完成但承诺完成(成功或失败)的结果。Promise 本身是有状态的,通常由 callee 端维护。其状态转移如下(术语参考 Promise/A + 和 ES6):

Promise 的状态只能转移一次,因此如果有 callback,那么. then(callback) 或. catch(callback) 也只被执行一次。

JDK5 的 Future 只能用轮询或者阻塞的方式获取结果,caller 端处理比较繁琐。Guava 的 ListenableFuture,特别是 JDK8 的 CompletableFuture,则是完整实现了 Promise 风格的异步 API。 个人认为 Promise 是更好的 Callback,ListenableFuture 接口只是比 Future 多了一个 void addListener(Runnable, Executor) 方法。 Promise 提供了比 Callback 更易用更清晰的编程模式,尤其是涉及多个异步 API 的串行调用(chaining 或 pipelining )、组合调用(并行、合并)、异常处理等方面有很大的优势。

引申阅读 

◆ 这篇文章谈到了 Future 和 Promise 的细微区别、相关历史和技术。 

◆ 这里有一些讨论:Aren’t promises just callbacks?,Is there really a fundamental difference between callbacks and Promises?。 

◆ Promise 借鉴了函数式中的一些概念: 从函数式编程到 Promise。 

◆ 这篇文章简要对比了几种语言中的 Promise 框架。

3、ReactiveX

其官方网站的介绍

An API for asynchronous programming with observable streams

其关键的概念 Observable 比较 Promise 来说:

◆ Promise 代表一个异步计算值,而 Observable 代表着一系列值 (stream)。 

◆ Promise 的值只能产生一次,而 Observable 的事件可以不断产生。因此 Rx 首先流行在前端 UI 场景:事件来源多,数据变化影响多个 UI 组件的变更。

Rx 的学习曲线比 Promise 要高得多,而且目前 Promise 风格的异步编程能够满足我们大部分的服务端开发场景,因此我们这里主要关注 Promise。

三、Promise 在服务端的应用

下面穿插着以 JDK8 的 CompletableFuture 和 Guava 的 ListenableFuture(适用 JDK6)为例介绍 Promise 的用法。

1、符合 Promise 风格的方法签名

Promise 风格的方法签名,有个不成文的规则是不抛出异常,因为异常是 Promise 对象本身就能携带的两种状态之一。比如我们想把一个 Callback 风格的异步 API 包装成 Promise 风格的(通常在使用一个较老的类库时需要这样的包装),可以这样:

//caller.java
CompletableFuture<String> asyncCall(final String msg) {
    CompletableFuture<String> promise = new CompletableFuture<>();
    try {
        callee.asyncCall(msg, new Callback<String>() {
            public void onSuccess(String r) { promise.complete(r); }
            public void onFail(Throwable t) { promise.completeExceptionally(t); }
        });
    } catch (Throwable e) {
        promise.completeExceptionally(t);
    }
    return promise;
}

下面是使用 ListenableFuture 的实现异步发送消息的 API。

//LocalMessageEngine.java
static class WriteTask {
    // SettableFuture是Guava中一种可设置状态的Promise类型。
    final SettableFuture<Boolean> promise = SettableFuture.create();
    final byte[] message;
    WriteTask(byte[] message) {
        this.message = message;
    }
}
public Producer createProducer() {
    return new Producer() {
        public ListenableFuture<Boolean> asyncProduce(byte[] message) {
            if (!Engine.this.started) {
                // 返回前已完成
                return Futures.immediateFailedFuture(new IllegalStateException("Message engine was stopped or not started"));
            }

            WriteTask task = new WriteTask(message);
            boolean queued = writeTaskQueue.offer(task);
            if (!queued) {
                task.promise.set(Boolean.FALSE);
            }
            return task.promise;
        }
    };
}

上面两个例子,描述了如何创建一个 Promise 对象返回给 caller,以及如何在 callee 端 fulfill 或 reject 这个 Promise。你可能会发现,返回给 caller 之前 Promise 是可以处于完成状态的。在继续下面的使用介绍前,先简单的看下 ListenableFuture 和 CompletableFuture 的几个主要 API。

2、串行调用

ListenableFuture 没有提供 then 方法,而是通过 Futures 的一系列静态方法来实现 Promise 风格的 API。由于两者有大部分的 API 是可以相互转化的,限于篇幅下面就不全部演示了。Promse 的 callback 就是其 then 或 catch 方法的函数型参数,当 Promise 被 resolve 时执行这个 callback 函数。这个 callback 函数的输入,就是 Promise 的 resolved 值。根据 callback 函数的输出的不同,需要采取不同的 then 方法。

◆ callback 无输出

Futures.addCallback 和 CompletableFuture.thenAccept 接受无输出的 callback。

// Guava
ListenableFuture<QueryResult> promise1 = ...;
Futures.addCallback(promise1, new FutureCallback<QueryResult>() {
    public void onSuccess(QueryResult result) {
        storeInLocalCache(result);
    }
    public void onFailure(Throwable t) {
        reportError(t);
    }
});
// CompletableFuture
CompletableFuture<QueryResult> promise1 = ...;
CompletableFuture<Void> promise2 = promise1.thenAccept(result -> storeInLocalCache(result));
return promise2;
//return promise1.thenAccept(result -> storeInLocalCache(result));//thenAccept返回另一个Promise实例

先忽略异常处理。对比下这种场景下的 ListenableFuture 和 CompletableFuture:前者采取了更传统的 callback 风格,后者则返回一个新的 Promise 实例,callback 计算完毕则 promise2 被 fulfilled,很容易通过 promise2 来获取 callback 执行完毕与否,不需要 closure。

◆ callback 输出一个普通计算值

这种情况下 callback 就是一个转换函数,输入是前一个 Promise 的 fulfilled 值,输出则作为新 Promise 的 fulfilled 值。Futures.transform 和 CompletableFuture.thenApply 接收这样的 callback 函数。

// CompletableFuture
CompletableFuture<QueryResult> queryFuture = ...;
CompletableFuture<List<Row>> rowsFuture = queryFuture.thenApply(result -> result.getRows());
return rowsFuture;

◆ callback 输出一个异步计算值,即一个 Promise

乍一看,这种情况下的输出跟上一种好像没什么区别。但实际上,输出一个 Promise 值和输出一个普通的值有根本的区别。还记得吧,Promise 代表着一个未完成的并且承诺完成的值。通常这种情况下,意味着 callback 里调用了另外一个 Promise 风格的异步 API。比如下面的例子中 indexService.lookUp 和 dataService.read 方法,由于涉及到 IO,都设计为异步 API。

//Guava
ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction = new AsyncFunction<RowKey, QueryResult>() {
    public ListenableFuture<QueryResult> apply(RowKey rowKey) {
        return dataService.read(rowKey);
    }
};
ListenableFuture<QueryResult> queryFuture = Futures.transformAsync(rowKeyFuture, queryFunction);
return queryFuture;

Futures.transformAsync 和 CompletableFuture.thenCompose 接收这样的 callback 函数。

设想一下,如果某个逻辑中需要调用的多个 Promise 风格的异步方法(比如多个 RPC 调用),并且有先后依赖关系,即上一个方法的执行结果作为下一个方法的输入。就可以用 thenCompose 把他们串起来。

//CompletableFuture
CompletableFuture<RPC4Result> promise4 = rpc1.call(input) //promise1
    .thenCompose(rpc1Result -> rpc2.call(rpc1Result)) //promise2
    .thenCompose(rpc2Result -> rpc3.call(rpc2Result)) //promise3
    .thenCompose(rpc3Result -> rpc4.call(rpc3Result)) //promise4
return promise4;

不要被链式调用给忽悠了,你还是可以正常使用普通的风格。

单纯看来,上述的串行调用场景下使用 Promise 风格的 API 好像只是消除了 Callback hell。那么采用同步 API 就既没有 Callback hell 的问题,又符合数据依赖关系。可是,你会发现,上面的举例中结尾都返回了 Promise,就是说,包含这段代码的方法被设计为异步 API。而使用同步 API,则会强制这个方法的调用者只能使用同步方式调用。

3、并行调用

异步 API 很适合并行调用。caller 在调用多个没有依赖关系的异步 API 时,可以先依次发起调用而不用等待每个调用真正执行完成,从 callee 的角度来讲,执行是并行的。caller 可以对调用结果进行合并处理,关键是,合并也是异步风格的。

//Guava
List<ListenableFuture<QueryResult>> partialPromises = new ArrayList<ListenableFuture<QueryResult>>(nodes.size());
for (Node node : nodes) {
    partialPromises.add(lookupHandler(node).query());
}
ListenableFuture<List<Row>> mergedPromise = Futures.transform(Futures.allAsList(partialPromises), new Function<List<List<Row>>, List<Row>>() {
    @Override public Long apply(List<List<Row>> input) {
        return merge(input);
    }
})
return mergedPromise;

Futures.allAsList 是并行执行所有的 promises,若有一个 promise 异常完成则尝试 reject 尚未 resolved 的 promise。也可以使用 Futures.successfulAsList,区别在于后者并不会 reject 尚未 resolved 的 promise。CompletableFuture 的对应物是 allOf 和 anyOf。

4、调用编排

合并结果设计为异步风格的好处在于,很方便做合并、串行混合调用编排,比如某个逻辑中需要调用四个个 RPC 服务 A、B、C、D,其中:A 的输出作为 B、C 的输入,B、C 可并行,B、C 的输出合并后作为 D 的输入。

//CompletableFuture
CompletableFuture<AResult> promiseA = rpcA.call(input);
CompletableFuture<DResult> promiseD = promiseA.thenCompose(aResult -> {
    CompletableFuture<BResult> promiseB = rpcB.call(aResult);
    CompletableFuture<CResult> promiseC = rpcC.call(aResult);
    CompletableFuture<MergedResult> mergedPromise = promiseB.thenCombine(promiseC, (bResult, cResult) -> {
        return merge(bResult, cResult)
    });
    return mergedPromise;
}).thenCompose(mergedResult -> rpcD.call(mergedResult));
return promiseD;

5、异常处理

上面提到过 Callback 风格的异步 API,异常处理比较分散。而 Promise 风格的异常处理则优雅得多。我们需要记住,异常是 Promise 携带的两种状态之一。那么异常可以作为 callback 函数的输入。

◆ 通用异常处理

Futures.catching 和 CompletableFuture.exceptionally 接收异常值为参数的 callback 函数。

//Guava
ListenableFuture<Integer> fetchCounterPromise = ...;
// Falling back to a zero counter in case an exception happens when
// processing the RPC to fetch counters.
ListenableFuture<Integer> faultTolerantPromise = Futures.catching(
    fetchCounterPromise, FetchException.class,
    new Function<FetchException, Integer>() {
        public Integer apply(FetchException e) {
            return 0;
        }
    });

我们再看一个更复杂的例子。

//CompletableFuture
CompletableFuture<FaultTolerantResult> faultTolerantPromise = rpc1.call(input) //promise1
    .thenCompose(rpc1Result -> rpc2.call(rpc1Result)) //promise2
    .thenCompose(rpc2Result -> rpc3.call(rpc2Result)) //promise3
    .thenCompose(rpc3Result -> rpc4.call(rpc3Result)) //promise4
    .exceptionally(err -> {
        // process err
        return faultTolerantValue;
    });  //faultTolerantPromise
return faultTolerantPromise;

再提醒一遍,不要被链式调用迷惑了。这个例子里面,rpc1/rpc2/rpc3/rpc4 都有可能发生异常,但我们只需要在最后统一处理(返回异常值或转换为一个默认正常值)。比如 rpc2 发生异常,那么 rpc3/rpc4 的逻辑(接收正常值的 callback 函数)都不会执行,但是 rpc2 的异常会传递给 promise3/promise4。

◆ 恢复

假设在读取某个数据存储发生异常,我们需要某种恢复机制,比如读取另一个 backup 的数据存储(某种重试),那么可以使用 Futures.cachingAsync 和 CompletableFuture.handle。

//CompletableFuture
public <V> CompletableFuture<V> dispatch(final Command<V> command) {
    final CompletableFuture<V> dispatched = loadbalance.selectHandler().dispatch(command);
    if (maxTries > 0) {
        final AtomicInteger leftTries = new AtomicInteger(maxTries);
        final BiFunction<CompletableFuture<V>, Throwable, CompletableFuture<V>> fallback = new BiFunction<CompletableFuture<V>, Throwable, CompletableFuture<V>>() {
            @Override
            public CompletableFuture<V> apply(CompletableFuture<V> input, Throwable cause) {
                if (cause == null) return input;
                if (cause instanceof RecoverableException && leftTries.getAndDecrement() > 0) {
                    final CompletableFuture<V> next = loadbalance.selectHandler().dispatch(new Command<V>(command.type, command.args));
                    return next.handle((v, err) -> err).thenCompose(err -> apply(next, err));
                }
                CompletableFuture<V> errFuture = new CompletableFuture<>();
                errFuture.completeExceptionally(cause);
                return errFuture;
            }
        };
        return dispatched.handle((v, err) -> err).thenCompose(err -> fallback.apply(dispatched, err));
    }
    return dispatched;
}

◆ 超时

Promise 是一个承诺完成(成功或失败)的结果,但是并不承诺完成时间。所以,通常需要一种超时机制,幸运的是 ListenableFuture 和 CompletableFuture 都实现了 Future 接口。

CompletableFuture<FaultTolerantResult> faultTolerantPromise = ...;
try {
    FaultTolerantResult result = faultTolerantPromise.get(1000, TimeUnit.SECONDS);
    // process result
} catch (TimeoutException e) {
    //尝试取消执行尚未开始的callback函数
    faultTolerantPromise.cancel(false);` 

}

四、Promise 异步编程的注意点

异步编程比同步编程困难。异步编程通常主要解决一小部分问题,比如阻塞。Promise 借鉴了函数式编程的风格,大量的逻辑会分散在各种 callback 函数来实现。因此对于习惯了同步编程的 OO 式或命令式编程风格的开发人员,需要一定的习惯时间。

上面谈到 callee 执行机制的时候,谈到了线程池,那么 callee 计算完成时,callback 函数的执行通常是池中 resolve Promise 的线程执行。但是,如果 caller 在设置 callback 的时候,Promise 已经完成,那么 callback 的执行线程则是 caller 线程。因此,请特别关注 callback 函数的执行线程的差别。请遵循:

◆ callback 尽量轻量 

◆ callback 避免阻塞 

◆ 否则请指定执行线程

如果 callback 执行了大量的计算,甚至执行了阻塞式操作,那么就很有可能阻塞住 Promise 的 resolve 线程,通常这类线程都是极少的,比如执行 IO 的 EventLoop 线程,有可能造成其他 Promise 得不到执行。

摘自 ListenableFuture 的文档: 

Note: For fast, lightweight listeners that would be safe to execute in any thread, consider MoreExecutors.directExecutor. Otherwise, avoid it. Heavyweight directExecutor listeners can cause problems, and these problems can be difficult to reproduce because they depend on timing. For example: 

◆ The listener may be executed by the caller of addListener. That caller may be a UI thread or other latency-sensitive thread. This can harm UI responsiveness. 

◆ The listener may be executed by the thread that completes this Future. That thread may be an internal system thread such as an RPC network thread. Blocking that thread may stall progress of the whole system. It may even cause a deadlock. 

◆ The listener may delay other listeners, even listeners that are not themselves directExecutor listeners.

我们也的确碰到过使用 MoreExecutors.directExecutor 时,由于编写了太过复杂的 callback 链,导致线程死锁的问题。

CompletableFuture 和 ListenableFuture 都有指定 callback 执行线程的方法:

//使用内置的通用ForkJoin线程池
completableFuture.thenAcceptAsync(callback);
//使用指定的线程执行器
completableFuture.thenAcceptAsync(callback, executor);
//使用指定的线程执行器
Futures.transform(input, callback, executor);

正因为异步编程的复杂性,因此目前我们也尽量在业务逻辑相对简单的应用上进行异步化改造。后续,我们也会评估 Quasar 等协程框架。

完~