thingsboard官网: https://thingsboard.io/
thingsboard GitHub: https://github.com/thingsboard/thingsboard
thingsboard提供的体验地址: http://demo.thingsboard.io/
BY Thingsboard team
以下内容是在原文基础上演绎的译文。除非另行注明,页面上所有内容采用知识共享-署名( CC BY 2.5 AU )协议共享。
原文地址: ThingsBoard API参考: HTTP设备API
HTTP 是可用于IoT应用程序的通用网络协议。您可以 在此处 找到有关HTTP的更多信息。HTTP协议是基于TCP的,并使用请求 - 响应模型。当然它的缺点也极为明显,HTTP对于嵌入式设备来说太重了,也不灵活。
curl
Postman
因为Thingsboard最新release,是基于微服务架构,不利用单独理解代码。
Thingsboard CoAP设备传输协议源代码: https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/http
https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-HTTP
Thingsboard的HTTP设备传输协议是基于 Spring Boot 。
Spring Boot 是 Spring 的 子项目 ,正如其名字,提供 Spring 的引导( Boot )的功能。
通过 Spring Boot ,我们开发者可以快速配置 Spring 项目,引入各种 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而无需不断重复编写繁重的 Spring 配置,降低了 Spring 的使用成本。
犹记当年,Spring XML 为主的时代,大晚上各种搜索 Spring 的配置,苦不堪言。现在有了 Spring Boot 之后,生活真美好。
Spring Boot 提供了各种 Starter 启动器,提供标准化的默认配置。例如:
spring-boot-starter-web
启动器,可以快速配置 Spring MVC 。 mybatis-spring-boot-starter
启动器,可以快速配置 MyBatis 。 并且,Spring Boot 基本已经一统 Java 项目的开发,大量的开源项目都实现了其的 Starter 启动器。例如:
incubator-dubbo-spring-boot-project
启动器,可以快速配置 Dubbo 。 rocketmq-spring-boot-starter
启动器,可以快速配置 RocketMQ 。 ├── java │ └── com │ └── sanshengshui │ └── http │ ├── controller │ │ └── DeviceApiController.java // 设备传输API接口 │ ├── HttpApiServer.java //项目启动主类 │ └── quota //API限制类包 │ ├── AbstractQuotaService.java //抽象限制服务类 │ ├── Clock.java //时钟类 │ ├── host │ │ ├── HostIntervalRegistryCleaner.java //主机API清理器 │ │ ├── HostIntervalRegistryLogger.java //主机API记录器 │ │ ├── HostRequestIntervalRegistry.java //主机API请求注册表 │ │ ├── HostRequestLimitPolicy.java //主机API请求限制条件 │ │ └── HostRequestsQuotaService.java //主机请求限制开关 │ ├── inmemory │ │ ├── IntervalCount.java //间歇计数 │ │ ├── IntervalRegistryCleaner.java //时间间隔内注册表清理器 │ │ ├── IntervalRegistryLogger.java //时间间隔内注册表记录器 │ │ └── KeyBasedIntervalRegistry.java //基础API请求逻辑 │ ├── QuotaService.java //限制服务类 │ └── RequestLimitPolicy.java //请求限制策略 └── resources └── application.yml
<dependencies> <dependency> <groupId>com.sanshengshui</groupId> <artifactId>IOT-Guide-TSL</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> </dependencies>
Spring Boot提供的web框架基于Tomcat,可以通过引入 spring-boot-starter-web
来配置依赖关系。
commons-lang3
和 guava
用于API请求限制服务。
server: port: 8080 http: request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}" quota: host: limit: "${QUOTA_HOST_LIMIT:10}" intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}" ttlMs: "${QUOTA_HOST_TTL_MS:60000}" cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}" enabled: "${QUOTA_HOST_ENABLED:true}" whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}" blacklist: "${QUOTA_HOST_BLACKLIST:}" log: topSize: 10 intervalMin: 2
server.port: 8080
: 服务器启动绑定的端口,缺省情况下是:8080。
http.request_timeout
: 请求超时时间,此处设定为60000。
quota.host.limit
和 quota.host.intervalMs
: 分别为API请求限额数和单位时间。此处为了验证方便,设定为 10
次和60s,即60s内API请求限额数为10次。
quota.host.cleanPeriodMs
和 quota.host.ttlMs
: 分别为清理周期时间和TTL时间。
quota.host.enabled
、 quota.host.whitelist
和 quota.host.blacklist
分别表示API请求开关、白名单及黑名单。
quota.host.log.topSize
和 quota.host.log.intervalMin
: 指的是高速缓存中的(近似)最大 条目
数和间隔时间。
KeyBasedIntervalRegistry:基础API请求逻辑
package com.sanshengshui.http.quota.inmemory; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** * @author james mu * @date 2019/8/10 下午4:50 */ @Slf4j public class KeyBasedIntervalRegistry { private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>(); private final long intervalDurationMs; private final long ttlMs; private final Set<String> whiteList; private final Set<String> blackList; public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) { this.intervalDurationMs = intervalDurationMs; this.ttlMs = ttlMs; this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ',')); this.blackList = Sets.newHashSet(StringUtils.split(blackList, ',')); } private void validate(String name) { if (ttlMs < intervalDurationMs) { log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs); } log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList); log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList); } public long tick(String clientHostId) { IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs)); long currentCount = intervalCount.resetIfExpiredAndTick(); if (whiteList.contains(clientHostId)) { return 0; } else if (blackList.contains(clientHostId)) { return Long.MAX_VALUE; } return currentCount; } public void clean() { hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs); } public Map<String, Long> getContent() { return hostCounts.entrySet().stream() .collect( Collectors.toMap( Map.Entry:: getKey, interval -> interval.getValue().getCount() ) ); } }
validate(string name)
: 要求 ttlMs<intervalDurationMs
,并打印出API请求的黑名单和白名单。
第42行通过 computeIfAbsent
函数对map中不存在key时的处理,在这里通过新建 intervalCount(intervalDurationMs)
的方式来处理。
resetIfExpiredAndTick()
对时间间隔内进行计数。 Long.MAX_VALUE
。
clean()
为通过时间间隔内是否大于ttlMs来过滤集合中的元素。
getContent()
为遍历hostCounts中的客户端地址的IntervalCount。
IntervalCount: 间歇时间内计数
package com.sanshengshui.http.quota.inmemory; import com.sanshengshui.http.quota.Clock; import java.util.concurrent.atomic.LongAdder; /** * @author james mu * @date 19-8-9 下午16:50 */ public class IntervalCount { private final LongAdder addr = new LongAdder(); private final long intervalDurationMs; private volatile long startTime; private volatile long lastTickTime; public IntervalCount(long intervalDurationMs) { this.intervalDurationMs = intervalDurationMs; startTime = Clock.millis(); } //计数或时间过期后重置时间 public long resetIfExpiredAndTick(){ if (isExpired()){ reset(); } tick(); return addr.sum(); } //计算已过时间 public long silenceDuration() { return Clock.millis() - lastTickTime; } public long getCount() { return addr.sum(); } //计数操作,累加一 private void tick() { addr.add(1); lastTickTime = Clock.millis(); } //重置计数时间 private void reset() { addr.reset(); lastTickTime = Clock.millis(); } //判断间隔时间是否失效 private boolean isExpired() { return (Clock.millis() - startTime) > intervalDurationMs; } }
HostIntervalRegistryCleaner
注入 quota.host.cleanPeriodMs
并继承抽象类 IntervalRegistryCleaner
。 HostIntervalRegistryLogger
注入 quota.host.log.topSize
和 quota.host.log.intervalMin
并继承 IntervalRegistryLogger
。 HostRequestIntervalRegistry
注入 quota.host.intervalMs
、 quota.host.ttlMs
、 quota.host.whitelist
和 quota.host.blacklist
并继承 KeyBasedIntervalRegistry
。 HostRequestLimitPolicy
注入 quota.host.limit
并继承 RequestLimitPolicy
。 HostRequestsQuotaService
注入 quota.host.enabled
并继承 AbstractQuotaService
。 @RestController @RequestMapping("/api/v1") @Slf4j public class DeviceApiController { @Autowired(required = false) private HostRequestsQuotaService quotaService;//API限制服务类 @RequestMapping(value = "/attributes",method = RequestMethod.POST) public DeferredResult<ResponseEntity> postDeviceAttributes( @RequestBody String json, HttpServletRequest request) { DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); if (quotaExceeded(request, responseWriter)) { return responseWriter; } responseWriter.setResult(new ResponseEntity<>(HttpStatus.ACCEPTED)); Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(json)).getAttributes(); for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){ System.out.println("属性名="+attributeKvEntry.getKey()+" 属性值="+attributeKvEntry.getValueAsString()); } return responseWriter; } @RequestMapping(value = "/telemetry",method = RequestMethod.POST) public DeferredResult<ResponseEntity> postTelemetry(@RequestBody String json, HttpServletRequest request){ DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>(); if (quotaExceeded(request, responseWriter)) { return responseWriter; } responseWriter.setResult(new ResponseEntity(HttpStatus.ACCEPTED)); Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(json)).getData(); for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) { System.out.println("key= " + entry.getKey()); for (KvEntry kvEntry: entry.getValue()) { System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString()); } } return responseWriter; } }
要将遥测数据发布到服务器节点,请将POST请求发送到以下URL:
http://localhost:8080/api/v1/telemetry
最简单的支持数据格式是:
{"key1":"value1", "key2":"value2"}
要么
[{"key1":"value1"}, {"key2":"value2"}]
请注意,在这种情况下,服务器端时间戳将分配给上传的数据!
如果您的设备能够获取客户端时间戳,您可以使用以下格式:
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
在上面的示例中,我们假设“1451649600512”是具有毫秒精度的 unix时间戳 。例如,值’1451649600512’对应于’Fri,2016年1月1日12:00:00.512 GMT’
curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
C:/Users/james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json" Note: Unnecessary use of -X or --request, POST is already inferred. * Trying ::1... * TCP_NODELAY set * Connected to localhost (::1) port 8080 (#0) > POST /api/v1/telemetry HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.55.1 > Accept: */* > Content-Type:application/json > Content-Length: 63 > * upload completely sent off: 63 out of 63 bytes < HTTP/1.1 202 < Content-Length: 0 < Date: Sun, 18 Aug 2019 16:16:07 GMT < * Connection #0 to host localhost left intact
key= 1566144967139 属性名=stringKey 属性值=value1 属性名=booleanKey 属性值=true 属性名=doubleKey 属性值=42.0 属性名=longKey 属性值=73
属性API允许设备
将属性更新发布到服务器
要将客户端设备属性发布到ThingsBoard服务器节点,请将POST请求发送到以下URL:
http://localhost:8080/api/v1/attributes
curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
C:/Users/james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json" Note: Unnecessary use of -X or --request, POST is already inferred. * Trying ::1... * TCP_NODELAY set * Connected to localhost (::1) port 8080 (#0) > POST /api/v1/attributes HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.55.1 > Accept: */* > Content-Type:application/json > Content-Length: 63 > * upload completely sent off: 63 out of 63 bytes < HTTP/1.1 202 < Content-Length: 0 < Date: Sun, 18 Aug 2019 16:21:00 GMT < * Connection #0 to host localhost left intact
属性名=stringKey 属性值=value1 属性名=booleanKey 属性值=true 属性名=doubleKey 属性值=42.0 属性名=longKey 属性值=73
为了演示方便,我们设置60s内最多API请求测试为10次,现在我们使用遥测上传API连续发起接口调用,我们会发现如下的情况出现:
属性名=longKey 属性值=73 属性名=stringKey 属性值=value1 属性名=booleanKey 属性值=true 属性名=doubleKey 属性值=42.0 属性名=longKey 属性值=73 2019-08-19 00:26:25.696 WARN 16332 --- [nio-8080-exec-1] c.s.http.controller.DeviceApiController : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect 2019-08-19 00:26:26.402 WARN 16332 --- [nio-8080-exec-2] c.s.http.controller.DeviceApiController : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
这说明了我们的API限额服务起了作用,当然你也可以测试黑白名单等功能。
当在真实情况下,通常的API限额会很大,我这里提供了一个 gatling 自动化测试来提供接口测试。地址为: https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-HTTP-Test
关于gatling的其他信息,大家可以参考:
负载,性能测试工具-Gatling
Gatling简单测试SpringBoot工程
到此,物联网时代,相信大家对IOT架构下的HTTP协议和API相关限制有所了解了,感谢大家的阅读!