楼下小黑哥 · 2020年03月02日

架构设计|异步请求如何同步处理?

本文创意来自一次业务需求,这次需要接入一个第三方外部服务。由于这个服务只提供异步 API,为了不影响现有系统同步处理的方式,接入该外部服务时,应用对外屏蔽这种差异,内部实现异步请求同步。

全文摘要

  • 异步给现有架构带来的问题
  • Dubbo 异步转同步解决方法
  • 异步转同步架构设计方案

0x00. 前言

现有一个系统,整体架构如下所示:

这是一个很常见的同步设计方案,上游系统需要等待下游系统接口返回调用结果。

现在需要接入另外一个第三方服务 B,该服务与服务 A 最大区别在于,这是一个异步 API。调用之后,仅仅返回受理成功,处理结果后续通过异步通知返回。

接入之后,整体架构如下所示:

由于网络隔离策略,通知接收程序与通信服务需要单独分开部署。若没此要求,可以将通信服务 B 与通知接收程序合并成一个应用。

另外图中所有应用采用双节点部署。

为了不影响 OpenAPI 上游系统同步处理逻辑,通信服务 B 调用第三方服务之后,不能立刻返回,需要等待结果通知,拿到具体返回结果。这就需要通信服务 B 内部将异步转为同步。

这就是一个典型的异步转同步问题,整个过程涉及两个问题。

  1. 通信服务 B 业务线程如何进入等待状态?又如何唤醒正确等待线程?
  2. 由于通信服务 B 双节点部署,通知接收程序如何将结果转发到正在等待处理的节点?

问题 1 的解决方案参考了 Dubbo 设计思路。

我们在使用 Dubbo 调用远程服务时,默认情况下,这是一种阻塞式调用方式,即 Consumer 端代码一直阻塞等待,直到 Provider 端返回为止。

由于 Dubbo 底层基于 Netty 发送网络请求,这其是一个异步的过程。为了让业务线程能同步等待,这个过程就需要将异步转为同步。

0x01. Dubbo 异步转同步解决办法

1.1 业务线程同步阻塞

Dubbo 发起远程调用代码位于 DubboInvoker#doInvoke

Dubbo 版本为:2.6.X 版本。2,7.X 重构 DefaultFuture ,但是本质原理还是一样。

0082zybply1gc7t3m2louj31650u0u0x

默认情况下,Dubbo 支持同步调用方式,这里将会创建 DefaultFuture 对象。

这里有个非常重要逻辑,每个请求生成一个唯一 ID,然后将 IDDefaultFuture 映射关系,存入 Map 中。

这个请求 ID 在之所以这么重要,是因为消费者并发调用服务发送请求,同时将会有多个业务线程进入阻塞。当收到响应之后,我们需要唤醒正确的等待线程,并将处理结果返回。

通过 ID 这个唯一映射的关系,很自然可以找到其对应 DefaultFuture,唤醒其对应的业务线程。

来源:Dubbo 官网

业务线程调用 DefaultFuture#get方法进入阻塞。这段代码比较简单,通过调用 Condition#await 阻塞线层。

1.2 唤醒业务线程

当消费者接收到服务提供者的返回结果,将会调用 DefaultFuture#received 方法。

通过响应对象中的唯一 ID,找到其对应 DefaultFuture 对象,从而将结果设置 DefaultFuture 对象中,然后唤醒的相应的业务线程。

这里实际有个优化点,使用 done#signalAll 代替 done#signal。使用 condition 等待通知机制的时候需要注意这一点。

详情参考:https://github.com/apache/dub...

1.3 设计注意点

正常情况下,当消费者接收到响应之后,将会从 FUTURES 这个 Map 移除 DefaultFuture

但是在异常情况下,服务提供者若处理缓慢,不能及时返回响应结果,消费者业务线程将会因为超时苏醒。这种情况下 FUTURES 积压了无效 DefaultFuture 对象。如果不及时清理,极端情况下,将会发生 OOM

DefaultFuture 内部将会开启一个异步线程,定时轮询 FUTURES 判断 DefaultFuture 超时时间,及时清理已经无效(超时)的 DefaultFuture

0x02. 转发方案设计

根据 Dubbo 解决思路,问题 1 解决办法就比较简单了。具体流程如下:

  1. 通信服务 B 内部生成一个唯一请求 ID ,发给第三方服务
  2. 若请求成功,内部版使用 Map 存储对应关系,并使业务线程阻塞等待
  3. 通信服务 B 收到异步通知结果,通过 ID 查找对应业务线程,唤醒的相应的线程

这个设计过程需要注意设置合理的超时时间,这个超时时间需要考虑远程服务调用耗时,可以参考如下公式:

业务线程等待时间=通信服务 B 接口的超时时间 - 调用第三方服务 B 接口消耗时间

这里就不贴出具体的代码,详细代码参考 Dubbo DefaultFuture

接下来重点看下通知服务如何将结果转发给正确的通信服务 B 的节点。这里想到两种方案:

  1. SocketServer 方案
  2. MQ 方案

2.1 SocketServer

通信服务 B 使用 SocketServer 构建一个服务接收程序,当通知接收程序收到第三方服务 B 通知时,通过 Socket 将结果转发给通信服务 B。

整个系统架构如下所示:

由于生产服务双节点部署,通知接收程序就不能写死转发地址。这里我们将请求 ID 与通信服务 B socket 服务地址关系存入 Redis 中,然后通知接收程序通过 ID 找到正确的地址。

这个方案说实话有点复杂。

第一 SocketServer 编码难度较大,编写一个高效 SocketServer 就比较难,一不小心可能产生各种 Bug

第二通信服务 B 服务地址配置在配置文件中,由于两个节点地址不同,这就导致同一应用存在不同配置。这对于后面维护就很不友好。

第三额外引入 Redis 依赖,系统复杂度变高。

2.2 MQ 方案

相对 SocketServer 方案,MQ 方案相对简单,这里采用 MQ 广播消费的方式,架构如图所示:

通知接收程序收到异步通知之后,直接将结果发送到 MQ

通信服务 B 开启广播消费模式,拉取 MQ 消息。

通信服务 B_1 拉取消息,通过请求 ID 映射关系,没找到内部等待的线程,知道这不是自己的等待消息,于是 B_1 直接丢弃即可。

通信服务 B_2 拉取消息,通过请求 ID 映射关系,顺利找到正在等待的线程,然后可以唤醒等待线程,返回最后的结果。

对比 SocketServer 方案,MQ 方案整体流程比较简单,编程难度低,也没用存在特殊的配置。

不过这个方案十分依赖 MQ 消息实时性,若 MQ 消息投递延迟很高,这就会导致通信服务 B 业务线程超时苏醒,业务异常返回。

这里我们选择使用 RocketMQ,长轮询 Pull 方式,可保证消息非常实时,

综上,这里采用 MQ 的方案。

0x03. 总结

异步转同步我们需要解决同步阻塞,以及如何唤醒的问题。

阻塞/唤醒可以分别使用 Condition#await/signalAll。不过这个过程我们需要生成一个唯一请求 ID,并且保存这个 ID 与业务线程映射关系。后续等到结果返回我们才能通过唯一 ID 唤醒正确等待线程。

只要了解上面几点,异步转同步的问题就就可以迎刃而解。

另外,如果你也有碰到异步转同步问题,本文的方案希望对你有帮助。如果你有其他设计方案,欢迎留言,一起讨论~

参考资料

  1. http://dubbo.apache.org/zh-cn...
  2. http://dubbo.apache.org/zh-cn...

最会说一句 (求关注)

这篇文章其实写了挺久的,写的挺难得。之前很早想到写这篇文章,但是没想好到底咋写,艰难产出。

看到这里,点个关注呀,点个赞呗。别下次一定啊,大哥。写文章很辛苦的,需要来点正反馈。

才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。

感谢您的阅读,我坚持原创,十分欢迎并感谢您的关注

欢迎关注我的公众号:程序通事,获得日常干货推送。如果您对我的专题内容感兴趣,也可以关注我的博客:studyidea.cn

推荐阅读
关注数
1
文章数
24
Java 开发者,公众号@程序通事,定期分享干货文章。博客地址:[链接],抢先查看最新文章
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息