转载

Sirix.io是如何基于Vert.x和Kotlin协程构建异步RESTful API

Sirix是一个存储系统,它的核心是日志结构,读取可以是随机的,并且在事务提交期间将写入批处理并同步到磁盘。数据永远不会写回到同一个地方,因此不会就地修改,相反,Sirix在记录级别使用写时复制(COW)(因此,它创建页面片段并且通常不复制整个页面),每次必须修改页面时,已更改的记录都会写入新位置,确切复制哪些记录取决于所使用的版本控制算法。

对数据库/资源​​的更改发生在资源绑定事务中。因此,必须打开ResourceManager才能创建写入事务。在任何时候,只允许与N读取事务同时进行一次写入事务。每个事务都绑定到一个修订版,而它们可以在任何修订版上打开,无论哪个修订版都无关紧要。

Vert.x在Node.js和JVM之后进行了严格的建模。Vert.x中的所有内容都应该是非阻塞的。因此,称为事件循环的单个线程可以处理大量请求。阻止调用必须在特殊的线程池上处理。默认值是每个CPU两个事件循环(多反应器模式)。

我们正在使用Kotlin,因为它简单而简洁。其中一个非常有趣的功能是协同程序。从概念上讲,它们就像非常轻量级的线程 另一方面,创建线程非常昂贵。关于协同程序的一个很酷的事情是,它们允许编写几乎像顺序的异步代码。每当一个协程将被挂起时,底层线程不会被阻塞并且可以被重用。在引擎盖下,每个挂起函数通过Kotlin编译器获得另一个参数,这是一个延续,它存储恢复函数的位置(正常恢复,恢复异常)。

Keycloak用作OAuth2授权服务器(密码凭据流量),因为我们决定不 自己 实现授权。

为了获得访问令牌,首先必须针对POST / login进行请求- 使用身份中作为JSON对象发送的用户名/密码凭证进行路由。实现代码:

post(<font>"/login"</font><font>).produces(</font><font>"application/json"</font><font>).coroutineHandler { rc ->
    val userJson = rc.bodyAsJson
    val user = keycloak.authenticateAwait(userJson)
    rc.response().end(user.principal().toString())
}
</font>

coroutine-handler是一个简单的扩展函数:

<font><i>/* An extension method for simplifying coroutines usage with Vert.x Web routers. */</i></font><font>
<b>private</b> fun Route.coroutineHandler(fn: suspend (RoutingContext) -> Unit) {
  handler { ctx ->
    launch(ctx.vertx().dispatcher()) {
      <b>try</b> {
        fn(ctx)
      } <b>catch</b> (e: Exception) {
        ctx.fail(e)
      }
    }
  }
}
</font>

协程序在Vert.x事件循环(调度程序)上启动。

这是为了执行更长的运行处理程序:

vertxContext.executeBlockingAwait(Handler < Future < Nothing >> {
  <font><i>//更长时间运行任务</i></font><font>
})
</font>

Vert.x为这类任务使用不同的线程池。因此,该任务在另一个线程中执行。请注意当协程被暂停,事件循环不会被阻止。

现在我们再次将焦点转移到我们的API,并展示它是如何设计的。我们首先需要设置我们的服务器和Keycloak。

一旦两个服务器都启动并运行,我们就能够编写一个简单的HTTP客户端。我们首先必须让/login使用指定的“用户名/密码”JSON-Object 从端点获取令牌。在Kotlin中使用异步HTTP客户端(来自Vert.x),它看起来像这样:

val server = <font>"https://localhost:9443"</font><font>

val credentials = json {
  obj(</font><font>"username"</font><font> to </font><font>"testUser"</font><font>,
      </font><font>"password"</font><font> to </font><font>"testPass"</font><font>)
}

val response = client.postAbs(</font><font>"$server/login"</font><font>).sendJsonAwait(credentials)

<b>if</b> (200 == response.statusCode()) {
  val user = response.bodyAsJsonObject()
  val accessToken = user.getString(</font><font>"access_token"</font><font>)
}
</font>

然后,必须在Authorization HTTP-Header中为每个后续请求发送此访问令牌。存储第一个资源看起来像这样(简单的HTTP PUT-Request):

val xml = <font>""</font><font></font><font>"
    <xml>
      foo
      <bar/>
    </xml>
</font><font>""</font><font></font><font>".trimIndent()

<b>var</b> httpResponse = client.putAbs(</font><font>"$server/database/resource1"</font><font>).putHeader(HttpHeaders.AUTHORIZATION.toString(), </font><font>"Bearer $accessToken"</font><font>).sendBufferAwait(Buffer.buffer(xml))

<b>if</b> (200 == response.statusCode()) {
  println(</font><font>"Stored document."</font><font>)
} <b>else</b> {
  println(</font><font>"Something went wrong ${response.message}"</font><font>)
}
</font>

首先,创建一个名称database带有一些元数据的空数据库,然后使用名称存储XML片段resource1。PUT HTTP-Request是幂等的。具有相同URL端点的另一个PUT-Request将删除以前的数据库和资源并重新创建数据库/资源​

HTTP-Response应为200,产生HTTP-body:

<<b>rest</b>:sequence xmlns:<b>rest</b>=<font>"https://sirix.io/rest"</font><font>>
  <<b>rest</b>:item>
    <xml <b>rest</b>:id=</font><font>"1"</font><font>>
      foo
      <bar <b>rest</b>:id=</font><font>"3"</font><font>/>
    </xml>
  </<b>rest</b>:item>
</<b>rest</b>:sequence>
</font>

以上是从存储系统为元素节点序列化生成ID。

然后通过GET HTTP-Request,https://localhost:9443/database/resource1我们还可以再次检索存储的资源。

然而,到目前为止,这并不是很有趣。我们可以更新资源POST-Request。假设我们像以前一样检索了访问令牌,我们可以简单地执行POST-Request并使用我们之前收集的有关node-ID的信息:

val xml = <font>""</font><font></font><font>"
    <test>
      yikes
      <bar/>
    </test>
</font><font>""</font><font></font><font>".trimIndent()

val url = </font><font>"$server/database/resource1?nodeId=3&insert=asFirstChild"</font><font>

val httpResponse = client.postAbs(url).putHeader(HttpHeaders.AUTHORIZATION
                         .toString(), </font><font>"Bearer $accessToken"</font><font>).sendBufferAwait(Buffer.buffer(xml))
</font>

有趣的部分是URL,我们用作端点。我们简单地说,选择ID为3的节点,然后将给定的XML片段作为第一个子片段插入。这将生成以下序列化XML文档:

<<b>rest</b>:sequence xmlns:<b>rest</b>=<font>"https://sirix.io/rest"</font><font>>
  <<b>rest</b>:item>
    <xml <b>rest</b>:id=</font><font>"1"</font><font>>
      foo
      <bar <b>rest</b>:id=</font><font>"3"</font><font>>
        <test <b>rest</b>:id=</font><font>"4"</font><font>>
          yikes
          <bar <b>rest</b>:id=</font><font>"6"</font><font>/>
        </test>
      </bar>
    </xml>
  </<b>rest</b>:item>
</<b>rest</b>:sequence>
</font>

每个PUT-以及POST请求都隐含commits了底层事务。因此,我们现在能够再次发送第一个GET请求来检索整个资源的内容,例如通过指定一个简单的XPath查询,在所有版本中选择根节点GET https://localhost:9443/database/resource1?query=/xml/all-time::*并获得以下XPath结果:

<<b>rest</b>:sequence xmlns:<b>rest</b>=<font>"https://sirix.io/rest"</font><font>>
  <<b>rest</b>:item <b>rest</b>:revision=</font><font>"1"</font><font> <b>rest</b>:revisionTimestamp=</font><font>"2018-12-20T18:44:39.464Z"</font><font>>
    <xml <b>rest</b>:id=</font><font>"1"</font><font>>
      foo
      <bar <b>rest</b>:id=</font><font>"3"</font><font>/>
    </xml>
  </<b>rest</b>:item>
  <<b>rest</b>:item <b>rest</b>:revision=</font><font>"2"</font><font> <b>rest</b>:revisionTimestamp=</font><font>"2018-12-20T18:44:39.518Z"</font><font>>
    <xml <b>rest</b>:id=</font><font>"1"</font><font>>
      foo
      <bar <b>rest</b>:id=</font><font>"3"</font><font>>
        <xml <b>rest</b>:id=</font><font>"4"</font><font>>
          foo
          <bar <b>rest</b>:id=</font><font>"6"</font><font>/>
        </xml>
      </bar>
    </xml>
  </<b>rest</b>:item>
</<b>rest</b>:sequence>
</font>

一般来说,我们支持几个额外的时间XPath轴:future ::,future-or-self ::,past ::,past-or-self ::,previous ::,previous-or-self ::,next ::, next-or-self ::,first ::,last ::,all-time ::

通过在GET请求中指定序列化(开始和结束修订参数)的一系列修订,可以实现相同的目的:

GET https://localhost:9443/database/resource1?start-revision=1&end-revision=2

或通过时间戳:

GET https://localhost:9443/database/resource1?start-revision-timestamp=2018-12-20T18:00:00&end-revision-timestamp=2018-12-20T19:00:00

我们肯定也能够通过更新XQuery表达式(不是非常RESTful)或使用简单的DELETEHTTP请求来删除资源或其任何子树:

val url = <font>"$server/database/resource1?nodeId=3"</font><font>

val httpResponse = client.deleteAbs(url).putHeader(HttpHeaders.AUTHORIZATION
                         .toString(), </font><font>"Bearer $accessToken"</font><font>).sendAwait()

<b>if</b> (200 == httpResponse.statusCode()) {
  ...
}
</font>

这将删除ID为3的节点,在我们的例子中,因为它是整个子树的元素节点。肯定它已作为修订版3提交,因此所有旧版本仍然可以查询整个子树(或者在第一个修订版中,它只是名称为“bar”而没有任何子树的元素)。

如果我们想得到一个差异,目前以XQuery Update语句的形式(但我们可以以任何格式序列化它们),只需调用XQuery函数sdb:diff,该函数定义为:

sdb:diff($coll as xs:string, $res as xs:string, $rev1 as xs:int, $rev2 as xs:int) as xs:string

例如,通过我们上面创建的数据库/ resource1这样的GET请求,我们可以发出以下请求:

GET https://localhost:9443/?query= sdb%3Adiff%28%27database%27%2C%27resource1%27%2C1%2C2%29

请注意,query-String必须进行URL编码,然后对其进行解码

sdb:diff('database','resource1',1,2)

在我们的示例中,diff的输出是包含在封闭sequence-element中的XQuery-Update语句:

<<b>rest</b>:sequence xmlns:<b>rest</b>=<font>"https://sirix.io/rest"</font><font>>
  let $doc := sdb:doc('database','resource1', 1)
  <b>return</b> (
    insert nodes <xml>foo<bar/></xml> as first into sdb:select-node($doc, 3)
  )
</<b>rest</b>:sequence>
</font>

这意味着resource1从database第一次修订中打开,然后将子树<xml>foo<bar/></xml>附加到具有稳定节点ID 3作为第一子节点的节点。

原文  https://www.jdon.com/51226
正文到此结束
Loading...