Dubbo 压测插件已开源,本文涉及代码详见 gatling-dubbo
Gatling 是一个开源的基于 Scala、Akka、Netty 实现的高性能压测框架,较之其他基于线程实现的压测框架,Gatling 基于 AKKA Actor 模型实现,请求由事件驱动,在系统资源消耗上低于其他压测框架(如内存、连接池等),使得单台施压机可以模拟更多的用户。此外,Gatling 提供了一套简单高效的 DSL(领域特定语言)方便我们编排业务场景,同时也具备流量控制、压力控制的能力并提供了良好的压测报告,所以有赞选择在 Gatling 基础上扩展分布式能力,开发了自己的全链路压测引擎 MAXIM。全链路压测中我们主要模拟用户实际使用场景,使用 HTTP 接口作为压测入口,但有赞目前后端服务中 Dubbo 应用比重越来越高,如果可以知道 Dubbo 应用单机水位将对我们把控系统后端服务能力大有裨益。基于 Gatling 的优势和在有赞的使用基础,我们扩展 Gatling 开发了 gatling-dubbo 压测插件。
实现 Dubbo 压测插件,需实现以下四部分内容:
Json Path
检查请求结果,这里我们实现了一样的检查逻辑。CheckBuild 则为 DSL 使用 Check 的辅助类
协议部分由 5 个属性组成,这些属性将在 Action 初始化 Dubbo 客户端时使用,分别是:
dubbo
true
,有赞优化了泛化调用的性能,为了使用该特性,引入了一个新值 result_no_change
(去掉优化前泛化调用的序列化开销以提升性能) dubbo://IP地址:端口
ETCD3
如果是测试 Dubbo 单机水位,则设置 url,注册中心设置为空;如果是测试 Dubbo 集群水位,则设置注册中心(目前支持 ETCD3),url 设置为空。由于目前注册中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏灵活性,所以我们又实现了客户端层面的负载均衡,如此便可抛开特定的注册中心来测试 Dubbo 集群水位。该特性目前正在内测中。
object DubboProtocol { val DubboProtocolKey = new ProtocolKey { type Protocol = DubboProtocol type Components = DubboComponents def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]] def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can't provide a default value for DubboProtocol") def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = { dubboProtocol => DubboComponents(dubboProtocol) } } } case class DubboProtocol( protocol: String, //dubbo generic: String, //泛化调用? url: String, //use url or registryProtocol: String, //use registry registryAddress: String //use registry ) extends Protocol { type Components = DubboComponents }
为了方便 Action 中使用上面这些属性,我们将其装进了 Gatling 的 ProtocolComponents:
case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents { def onStart: Option[Session => Session] = None def onExit: Option[Session => Unit] = None }
以上就是关于 Protocol 的定义。为了能在 DSL 中配置上述 Protocol,我们定义了 DubboProtocolBuilder,包含了 5 个方法分别设置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 个属性。
object DubboProtocolBuilderBase { def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol) } case class DubboProtocolBuilderGenericStep(protocol: String) { def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic) } case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) { def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url) } case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) { def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol) } case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) { def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress) } case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) { def build = DubboProtocol( protocol = protocol, generic = generic, url = url, registryProtocol = registryProtocol, registryAddress = registryAddress ) }
DubboAction 包含了 Duboo 请求逻辑、请求结果校验逻辑以及压力控制逻辑,需要扩展 ExitableAction 并实现 execute 方法。
DubboAction 类的域 argTypes、argValues 分别是泛化调用请求参数类型和请求参数值,需为 Expression[] 类型,这样当使用数据 Feeder 作为压测脚本参数输入时,可以使用类似 ${args_types}
、 ${args_values}
这样的表达式从数据 Feeder 中解析对应字段的值。
execute 方法必须以异步方式执行 Dubbo 请求,这样前一个 Dubbo 请求执行后但还未等响应返回时虚拟用户就可以通过 AKKA Message 立即发起下一个请求,如此一个虚拟用户可以在很短的时间内构造大量请求。请求方式方面,相比于泛化调用,原生 API 调用需要客户端载入 Dubbo 服务相应的 API 包,但有时候却拿不到,此外,当被测 Dubbo 应用多了,客户端需要载入多个 API 包,所以出于使用上的便利性,Dubbo 压测插件使用泛化调用发起请求。
异步请求响应后会执行 onComplete 方法,校验请求结果,并根据校验结果记录请求成功或失败日志,压测报告就是使用这些日志统计计算的。
为了控制压测时的 RPS,则需要实现 throttle 逻辑。实践中发现,高并发情况下,泛化调用性能远不如原生 API 调用性能,且响应时间成倍增长(如此不能表征 Dubbo 应用的真正性能),导致 Dubbo 压测插件压力控制不准,解决办法是优化泛化调用性能,使之与原生 API 调用的性能相近,请参考 dubbo 泛化调用性能优化 。
class DubboAction( interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], genericService: GenericService, checks: List[DubboCheck], coreComponents: CoreComponents, throttled: Boolean, val objectMapper: ObjectMapper, val next: Action ) extends ExitableAction with NameGen { override def statsEngine: StatsEngine = coreComponents.statsEngine override def name: String = genName("dubboRequest") override def execute(session: Session): Unit = recover(session) { argTypes(session) flatMap { argTypesArray => argValues(session) map { argValuesArray => val startTime = System.currentTimeMillis() val f = Future { try { genericService.$invoke(method, argTypes(session).get, argValues(session).get) } finally { } } f.onComplete { case Success(result) => val endTime = System.currentTimeMillis() val resultMap = result.asInstanceOf[JMap[String, Any]] val resultJson = objectMapper.writeValueAsString(resultMap) val (newSession, error) = Check.check(resultJson, session, checks) error match { case None => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None) throttle(newSession(session)) case Some(Failure(errorMessage)) => statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage)) throttle(newSession(session).markAsFailed) } case FuFailure(e) => val endTime = System.currentTimeMillis() statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage)) throttle(session.markAsFailed) } } } } private def throttle(s: Session): Unit = { if (throttled) { coreComponents.throttler.throttle(s.scenario, () => next ! s) } else { next ! s } } }
DubboActionBuilder 则是获取 Protocol 属性并初始化 Dubbo 客户端:
case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder { private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents = protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey) override def build(ctx: ScenarioContext, next: Action): Action = { import ctx._ val protocol = components(protocolComponentsRegistry).dubboProtocol //Dubbo客户端配置 val reference = new ReferenceConfig[GenericService] val application = new ApplicationConfig application.setName("gatling-dubbo") reference.setApplication(application) reference.setProtocol(protocol.protocol) reference.setGeneric(protocol.generic) if (protocol.url == "") { val registry = new RegistryConfig registry.setProtocol(protocol.registryProtocol) registry.setAddress(protocol.registryAddress) reference.setRegistry(registry) } else { reference.setUrl(protocol.url) } reference.setInterface(interface) val cache = ReferenceConfigCache.getCache val genericService = cache.get(reference) val objectMapper: ObjectMapper = new ObjectMapper() new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next) } }
LambdaProcessBuilder 则提供了设置 Dubbo 泛化调用入参的 DSL 以及接下来要介绍的 Check 部分的 DSL
case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport { def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes) def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues) def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList) def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks) }
全链路压测中,我们都使用 Json Path
校验 HTTP 请求结果,Dubbo 压测插件中,我们也实现了基于 Json Path
的校验。实现 Check,必须实现 Gatling check 中的 Extender 和 Preparer:
package object dubbo { type DubboCheck = Check[String] val DubboStringExtender: Extender[DubboCheck, String] = (check: DubboCheck) => check val DubboStringPreparer: Preparer[String, String] = (result: String) => Success(result) }
基于 Json Path
的校验逻辑:
trait DubboJsonPathOfType { self: DubboJsonPathCheckBuilder[String] => def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers) } object DubboJsonPathCheckBuilder { val CharsParsingThreshold = 200 * 1000 def preparer(jsonParsers: JsonParsers): Preparer[String, Any] = response => { if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson) jsonParsers.safeParseJackson(response) else jsonParsers.safeParseBoon(response) } def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType } class DubboJsonPathCheckBuilder[X: JsonFilter]( private[check] val path: Expression[String], private[check] val jsonParsers: JsonParsers )(implicit extractorFactory: JsonPathExtractorFactory) extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X]( DubboStringExtender, DubboJsonPathCheckBuilder.preparer(jsonParsers) ) { import extractorFactory._ def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence)) def findAllExtractor = path.map(newMultipleExtractor[X]) def countExtractor = path.map(newCountExtractor) }
DubboCheckSupport 则提供了设置 jsonPath 表达式的 DSL
trait DubboCheckSupport { def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) = DubboJsonPathCheckBuilder.jsonPath(path) }
trait AwsDsl
提供顶层 DSL。我们还定义了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 两个 Scala 隐式方法,以自动构造 DubboProtocol 和 ActionBuilder。
此外,泛化调用中使用的参数类型为 Java 类型,而我们的压测脚本使用 Scala 编写,所以这里需要做两种语言间的类型转换,所以我们定义了 transformJsonDubboData 方法
trait DubboDsl extends DubboCheckSupport { val Dubbo = DubboProtocolBuilderBase def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method) implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build() def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = { session.set(argTypeName, toArray(session(argTypeName).as[JList[String]])) .set(argValueName, toArray(session(argValueName).as[JList[Any]])) } private def toArray[T:ClassTag](value: JList[T]): Array[T] = { value.asScala.toArray } }
object Predef extends DubboDsl
import io.gatling.core.Predef._ import io.gatling.dubbo.Predef._ import scala.concurrent.duration._ class DubboTest extends Simulation { val dubboConfig = Dubbo .protocol("dubbo") .generic("true") //直连某台Dubbo机器,只单独压测一台机器的水位 .url("dubbo://IP地址:端口") //或设置注册中心,压测该Dubbo应用集群的水位,支持ETCD3注册中心 .registryProtocol("") .registryAddress("") val jsonFileFeeder = jsonFile("data.json").circular //数据Feeder val dubboScenario = scenario("load test dubbo") .forever("repeated") { feed(jsonFileFeeder) .exec(session => transformJsonDubboData("args_types1", "args_values1", session)) .exec(dubbo("com.xxx.xxxService", "methodName") .argTypes("${args_types1}") .argValues("${args_values1}") .check(jsonPath("$.code").is("200")) ) } setUp( dubboScenario.inject(atOnceUsers(10)) .throttle( reachRps(10) in (1 seconds), holdFor(30 seconds)) ).protocols(dubboConfig) }
[ { "args_types1": ["com.xxx.xxxDTO"], "args_values1": [{ "field1": "111", "field2": "222", "field3": "333" }] } ]
我的系列博客 混沌工程 - 软件系统高可用、弹性化的必由之路 异步系统的两种测试方法
我的其他测试相关开源项目 捉虫记:方便产品、开发、测试三方协同自测的管理工具
招聘有赞测试组在持续招人中,大量岗位空缺,只要你来,就能帮你点亮全栈开发技能树,有意向换工作的同学可以发简历到 sunjun【@】youzan.com
欢迎关注我们的公众号