物联网时代-跟着Thingsboard学IOT架构-HTTP设备协议及API相关限制

thingsboard官网: https://thingsboard.io/

thingsboard GitHub: https://github.com/thingsboard/thingsboard

thingsboard提供的体验地址: http://demo.thingsboard.io/

BY Thingsboard team

以下内容是在原文基础上演绎的译文。除非另行注明,页面上所有内容采用知识共享-署名(CC BY 2.5 AU)协议共享。

原文地址: ThingsBoard API参考:HTTP设备API


HTTP

协议介绍

HTTP是可用于IoT应用程序的通用网络协议。您可以在此处找到有关HTTP的更多信息。HTTP协议是基于TCP的,并使用请求 - 响应模型。当然它的缺点也极为明显,HTTP对于嵌入式设备来说太重了,也不灵活。

协议特点

http

  1. 支持客户/服务器模式。
  2. 简单快速: 客户向服务器请求服务时,只需传送请求方法和路径。请求方法常用的有GET、PUT、POST。每种方法规定了客户与服务器联系的类型不同。由于HTTP协议简单,使得HTTP服务器的程序规模小,因此通信速度很快。
  3. 灵活: HTTP允许传输任意类型的数据对象。正在传输的类型由Content-Type加以标记。
  4. 无连接:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。
  5. 无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。

客户端设置

Thingsboard的HTTP传输协议架构

因为Thingsboard最新release,是基于微服务架构,不利用单独理解代码。

Thingsboard CoAP设备传输协议源代码:https://github.com/thingsboard/thingsboard/tree/release-2.0/transport/http

本文基于上面源代码后,剔除相关的安全验证和处理之后搭建简易的讲解项目:

https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-HTTP


Spring Boot框架

Thingsboard的HTTP设备传输协议是基于Spring Boot

Spring Boot 是 Spring 的子项目,正如其名字,提供 Spring 的引导( Boot )的功能。

通过 Spring Boot ,我们开发者可以快速配置 Spring 项目,引入各种 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而无需不断重复编写繁重的 Spring 配置,降低了 Spring 的使用成本。

犹记当年,Spring XML 为主的时代,大晚上各种搜索 Spring 的配置,苦不堪言。现在有了 Spring Boot 之后,生活真美好。

Spring Boot 提供了各种 Starter 启动器,提供标准化的默认配置。例如:

并且,Spring Boot 基本已经一统 Java 项目的开发,大量的开源项目都实现了其的 Starter 启动器。例如:

项目解读

项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
├── java
│   └── com
│   └── sanshengshui
│   └── http
│   ├── controller
│   │   └── DeviceApiController.java // 设备传输API接口
│   ├── HttpApiServer.java //项目启动主类
│   └── quota //API限制类包
│   ├── AbstractQuotaService.java //抽象限制服务类
│   ├── Clock.java //时钟类
│   ├── host
│   │   ├── HostIntervalRegistryCleaner.java //主机API清理器
│   │   ├── HostIntervalRegistryLogger.java  //主机API记录器
│   │   ├── HostRequestIntervalRegistry.java //主机API请求注册表
│   │   ├── HostRequestLimitPolicy.java  //主机API请求限制条件
│   │   └── HostRequestsQuotaService.java  //主机请求限制开关
│   ├── inmemory
│   │   ├── IntervalCount.java  //间歇计数
│   │   ├── IntervalRegistryCleaner.java //时间间隔内注册表清理器
│   │   ├── IntervalRegistryLogger.java  //时间间隔内注册表记录器
│   │   └── KeyBasedIntervalRegistry.java  //基础API请求逻辑
│   ├── QuotaService.java //限制服务类
│   └── RequestLimitPolicy.java //请求限制策略
└── resources
└── application.yml

项目代码

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependencies>
<dependency>
<groupId>com.sanshengshui</groupId>
<artifactId>IOT-Guide-TSL</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
  • Spring Boot提供的web框架基于Tomcat,可以通过引入spring-boot-starter-web来配置依赖关系。

  • commons-lang3guava用于API请求限制服务。

参数配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:
port: 8080


http:
request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"


quota:
host:
limit: "${QUOTA_HOST_LIMIT:10}"
intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
enabled: "${QUOTA_HOST_ENABLED:true}"
whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
blacklist: "${QUOTA_HOST_BLACKLIST:}"
log:
topSize: 10
intervalMin: 2
  • server.port: 8080: 服务器启动绑定的端口,缺省情况下是:8080。

  • http.request_timeout : 请求超时时间,此处设定为60000。

  • quota.host.limitquota.host.intervalMs: 分别为API请求限额数和单位时间。此处为了验证方便,设定为10次和60s,即60s内API请求限额数为10次。

  • quota.host.cleanPeriodMsquota.host.ttlMs : 分别为清理周期时间和TTL时间。

  • quota.host.enabledquota.host.whitelistquota.host.blacklist分别表示API请求开关、白名单及黑名单。

  • quota.host.log.topSizequota.host.log.intervalMin: 指的是高速缓存中的(近似)最大条目数和间隔时间。

API限制服务类

KeyBasedIntervalRegistry:基础API请求逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.sanshengshui.http.quota.inmemory;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* @author james mu
* @date 2019/8/10 下午4:50
*/
@Slf4j
public class KeyBasedIntervalRegistry {

private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
private final long intervalDurationMs;
private final long ttlMs;
private final Set<String> whiteList;
private final Set<String> blackList;

public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) {
this.intervalDurationMs = intervalDurationMs;
this.ttlMs = ttlMs;
this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ','));
this.blackList = Sets.newHashSet(StringUtils.split(blackList, ','));

}

private void validate(String name) {
if (ttlMs < intervalDurationMs) {
log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs);
}
log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList);
log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList);
}

public long tick(String clientHostId) {
IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
long currentCount = intervalCount.resetIfExpiredAndTick();
if (whiteList.contains(clientHostId)) {
return 0;
} else if (blackList.contains(clientHostId)) {
return Long.MAX_VALUE;
}
return currentCount;
}

public void clean() {
hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
}

public Map<String, Long> getContent() {
return hostCounts.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry:: getKey,
interval -> interval.getValue().getCount()
)
);
}
}
  • validate(string name): 要求ttlMs<intervalDurationMs,并打印出API请求的黑名单和白名单。

  • 第42行通过computeIfAbsent函数对map中不存在key时的处理,在这里通过新建intervalCount(intervalDurationMs)的方式来处理。

  • 第43行通过intervalCount的resetIfExpiredAndTick()对时间间隔内进行计数。
  • 第44-48行通过判断API请求客户端地址是否在黑白名单中,如果在白名单,返回0,如果在黑名单中,返回Long.MAX_VALUE
  • clean()为通过时间间隔内是否大于ttlMs来过滤集合中的元素。

  • getContent()为遍历hostCounts中的客户端地址的IntervalCount。

IntervalCount: 间歇时间内计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.sanshengshui.http.quota.inmemory;

import com.sanshengshui.http.quota.Clock;

import java.util.concurrent.atomic.LongAdder;

/**
* @author james mu
* @date 19-8-9 下午16:50
*/
public class IntervalCount {

private final LongAdder addr = new LongAdder();
private final long intervalDurationMs;
private volatile long startTime;
private volatile long lastTickTime;

public IntervalCount(long intervalDurationMs) {
this.intervalDurationMs = intervalDurationMs;
startTime = Clock.millis();
}

//计数或时间过期后重置时间
public long resetIfExpiredAndTick(){
if (isExpired()){
reset();
}
tick();
return addr.sum();
}

//计算已过时间
public long silenceDuration() {
return Clock.millis() - lastTickTime;
}

public long getCount() {
return addr.sum();
}

//计数操作,累加一
private void tick() {
addr.add(1);
lastTickTime = Clock.millis();
}

//重置计数时间
private void reset() {
addr.reset();
lastTickTime = Clock.millis();
}

//判断间隔时间是否失效
private boolean isExpired() {
return (Clock.millis() - startTime) > intervalDurationMs;
}

}

剩下的处理类,留给读者去自己研究了!

  1. 主机API清理器: HostIntervalRegistryCleaner注入quota.host.cleanPeriodMs并继承抽象类IntervalRegistryCleaner
  2. 主机API记录器: HostIntervalRegistryLogger注入quota.host.log.topSizequota.host.log.intervalMin并继承IntervalRegistryLogger
  3. 主机API请求注册表: HostRequestIntervalRegistry注入quota.host.intervalMsquota.host.ttlMsquota.host.whitelistquota.host.blacklist并继承KeyBasedIntervalRegistry
  4. 主机API请求限制条件: HostRequestLimitPolicy注入quota.host.limit并继承RequestLimitPolicy
  5. 主机请求限制开关: HostRequestsQuotaService注入quota.host.enabled并继承AbstractQuotaService

属性API和遥测数据上传API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@RestController
@RequestMapping("/api/v1")
@Slf4j
public class DeviceApiController {

@Autowired(required = false)
private HostRequestsQuotaService quotaService;//API限制服务类

@RequestMapping(value = "/attributes",method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postDeviceAttributes(
@RequestBody String json, HttpServletRequest request) {
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
responseWriter.setResult(new ResponseEntity<>(HttpStatus.ACCEPTED));
Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(json)).getAttributes();
for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
System.out.println("属性名="+attributeKvEntry.getKey()+" 属性值="+attributeKvEntry.getValueAsString());
}
return responseWriter;
}

@RequestMapping(value = "/telemetry",method = RequestMethod.POST)
public DeferredResult<ResponseEntity> postTelemetry(@RequestBody String json, HttpServletRequest request){
DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
if (quotaExceeded(request, responseWriter)) {
return responseWriter;
}
responseWriter.setResult(new ResponseEntity(HttpStatus.ACCEPTED));
Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(json)).getData();
for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
System.out.println("key= " + entry.getKey());
for (KvEntry kvEntry: entry.getValue()) {
System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
}
}
return responseWriter;
}
}

项目演示

遥测上传API

要将遥测数据发布到服务器节点,请将POST请求发送到以下URL:

1
http://localhost:8080/api/v1/telemetry

最简单的支持数据格式是:

1
{"key1":"value1", "key2":"value2"}

要么

1
[{"key1":"value1"}, {"key2":"value2"}]

请注意,在这种情况下,服务器端时间戳将分配给上传的数据!

如果您的设备能够获取客户端时间戳,您可以使用以下格式:

1
{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}

在上面的示例中,我们假设“1451649600512”是具有毫秒精度的unix时间戳。例如,值’1451649600512’对应于’Fri,2016年1月1日12:00:00.512 GMT’

例子:

1
curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
Note: Unnecessary use of -X or --request, POST is already inferred.
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /api/v1/telemetry HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 63
>
* upload completely sent off: 63 out of 63 bytes
< HTTP/1.1 202
< Content-Length: 0
< Date: Sun, 18 Aug 2019 16:16:07 GMT
<
* Connection #0 to host localhost left intact

结果:

1
2
3
4
5
key= 1566144967139
属性名=stringKey 属性值=value1
属性名=booleanKey 属性值=true
属性名=doubleKey 属性值=42.0
属性名=longKey 属性值=73

属性API

属性API允许设备

  • 将客户端设备属性上载到服务器。
  • 从服务器请求客户端和共享设备属性。

将属性更新发布到服务器

要将客户端设备属性发布到ThingsBoard服务器节点,请将POST请求发送到以下URL:

1
http://localhost:8080/api/v1/attributes

例子:

1
curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
Note: Unnecessary use of -X or --request, POST is already inferred.
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /api/v1/attributes HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 63
>
* upload completely sent off: 63 out of 63 bytes
< HTTP/1.1 202
< Content-Length: 0
< Date: Sun, 18 Aug 2019 16:21:00 GMT
<
* Connection #0 to host localhost left intact

结果:

1
2
3
4
属性名=stringKey 属性值=value1
属性名=booleanKey 属性值=true
属性名=doubleKey 属性值=42.0
属性名=longKey 属性值=73

API限额服务

为了演示方便,我们设置60s内最多API请求测试为10次,现在我们使用遥测上传API连续发起接口调用,我们会发现如下的情况出现:

1
2
3
4
5
6
7
属性名=longKey 属性值=73
属性名=stringKey 属性值=value1
属性名=booleanKey 属性值=true
属性名=doubleKey 属性值=42.0
属性名=longKey 属性值=73
2019-08-19 00:26:25.696 WARN 16332 --- [nio-8080-exec-1] c.s.http.controller.DeviceApiController : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
2019-08-19 00:26:26.402 WARN 16332 --- [nio-8080-exec-2] c.s.http.controller.DeviceApiController : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect

这说明了我们的API限额服务起了作用,当然你也可以测试黑白名单等功能。

当在真实情况下,通常的API限额会很大,我这里提供了一个gatling自动化测试来提供接口测试。地址为:https://github.com/sanshengshui/IOT-Technical-Guide/tree/master/IOT-Guide-HTTP-Test

关于gatling的其他信息,大家可以参考:

到此,物联网时代,相信大家对IOT架构下的HTTP协议和API相关限制有所了解了,感谢大家的阅读!

------ 本文结束感谢您的阅读-------------
给我加点油吧!