大家好,我是yangyang.接下来带来mqtt基础第五篇:身份认证
概述
身份认证是物联网应用的重要组成部分,可以帮助有效阻止非法客户端的连接。为了有更好的安全保障,了解认识多种认证机制是必要的,接下来要主要以emqx官方提供的认证机制作为学习
X.509 证书认证
什么是X.509
X.509 是密码学里公钥证书的格式标准。X.509 标准规定了证书可以包含什么信息,并说明了记录信息的方法(数据格式)。X.509 证书里含有公钥、身份信息(比如网络主机名,组织的名称或个体名称等)和签名信息(可以是证书签发机构CA的签名,也可以是自签名)。对于一份经由可信的证书签发机构签名或者可以通过其它方式验证的证书,证书的拥有者就可以用证书及相应的私钥来创建安全的通信,对文档进行数字签名。另外除了证书本身功能,X.509还附带了证书吊销列表和用于从最终对证书进行签名的证书签发机构直到最终可信点为止的证书合法性验证算法。X.509是ITU-T标准化部门基于他们之前的ASN.1定义的一套证书标准。|参考:ntent="mp" data-source="outerlink" href="https://www.cnblogs.com/w-pound/p/11344852.html" rel="noopener noreferrer noopener noreferrer" target="_blank">X.509数字证书的结构与解析 - 骑牛射雕 - 博客园
工作原理
EMQX 允许客户端使用 X.509 证书进行 TLS/SSL 连接,并支持将证书信息与客户端进行绑定,以实现 X.509 证书认证。其使用与工作流程如下:
- 签发服务端证书,为 EMQX 启用 TLS/SSL,并设置其为双向认证。
- 签发客户端证书,将证书与私钥文件烧录到设备中,并使用其进行 TLS/SSL 连接。
- 客户端将在 TLS 握手阶段发送证书给服务器,以证明其身份的合法性。
- EMQX 收到客户端的证书后,会对证书进行验证,以确认客户端的身份。
- 如果验证通过,服务器会继续完成 TLS 握手,建立安全连接。
连接成功后,EMQX 支持将证书信息映射到客户端属性,实现证书与客户端的绑定。除此之外,还可以搭配其他应用层的认证方式,如 JWT、密码认证,实现多种认证方式的组合。
特性与优势
- 安全性:X.509 提供了一种安全可靠的认证机制,通过使用数字证书和公钥加密技术,确保了通信的机密性、完整性和身份验证。它可以防止未经授权的设备接入网络或进行恶意操作。
- 互操作性:X.509 是一种通用的标准,被广泛支持和采用。许多物联网设备都支持 X.509 证书的使用,这使得设备之间的认证和安全通信更加简单和可靠。
- 可扩展性:X.509 可以支持大规模的物联网部署。它提供了灵活的证书链和证书管理机制,可以适应复杂的物联网环境,并支持大量设备和实体的身份验证。
- 可信任的第三方验证:X.509 证书通常由可信任的证书颁发机构(Certificate Authority)签发,这些 CA 经过严格的安全审查和验证。设备可以使用由受信任的 CA 签发的证书,确保其身份和证书的合法性。
- 强大的加密算法支持:X.509 支持广泛的加密算法和密钥长度,包括常用的对称加密算法和非对称加密算法。这使得物联网设备可以使用强大的密码学算法来保护通信的安全性。
- 灵活的证书配置和管理:X.509 具有灵活的证书配置和管理选项。设备可以根据需求选择适当的证书属性和扩展字段,以满足特定的物联网应用需求。此外,证书的吊销和更新也可以通过证书管理机制进行有效管理。
这些特性使得 X.509 成为物联网安全的理想选择,EMQX 提供了完整的 X.509 证书认证支持,可以帮助您轻松实现物联网设备的安全接入和通信。
JWT 认证
JSON Web Token (JWT) 是一种基于 Token 的认证机制。它不需要服务器来保留客户端的认证信息或会话信息。EMQX 支持基于 JWT 进行用户认证,满足用户个性化安全设置的需求。
认证原理
客户端在连接请求中携带 JWT,将使用预先配置的密钥或公钥对 JWT 签名进行验证。如果用户配置了 JWKS 端点,EMQX 将通过从 JWKS 端点查询到的公钥列表对 JWT 签名进行验证。
如果签名验证成功,EMQX 会继续检查 Claims。如果存在 iat、nbf 或 exp 等 Claims,EMQX 会主动根据这些 Claims 检查 JWT 的合法性。之外,EMQX 也支持用户自定义的 Claims 检查。签名验证和 Claims 检查均通过后,EMQX 才会接受客户端的连接请求。
推荐用法
由于 EMQX JWT 认证器只会检查 JWT 的签名,无法对客户端身份的合法性提供担保,因此推荐用户部署一个独立的认证服务器用来为客户端颁发 JWT。
此时,客户端将首先访问该认证服务器,由该认证服务器验证客户端的身份,并为合法的客户端签发 JWT,之后客户端将使用签发的 JWT 来连接 EMQX。
由于 JWT 中的 Payload 仅仅进行了 base64 编码,因此不建议用户在 JWT 的 Payload 中存放敏感数据。为了减少 JWT 泄漏和被盗的可能,除设置合理的有效期外,还建议结合 TLS 加密来保证客户端连接的安全性。
示例demo(伪代码为主,提供思路)
下图整个用意是用于模拟jwt通过账户登录(说到这里,建议前端加密登录密码)成功后得到token,而如果验证失败,就按业务退出到上级页面就行了.那对于emqx和自己搭建的mqtt服务端应该如何对接呢,接下来,我给大家分别说明.
demo
{ "code": 0, "data": { "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJieXQiLCJpYXQiOjE3MDg5MzE0NTQsImV4cCI6MTcwODkzNTA1NH0.snGICI0eDP0CViXvb_BpOkOmZwnmFiEj-TDNx_Bw8Qg", "expire": 1708935054 }, "message": "success"}一、 emqx JWT
假定你本地已经搭建好了emqx,那么登录后台进入客户端认证,然后添加jwt认证方式,下图展示了主要的设置页面
其中关键字段有:
- jwt来自于: 意思是基于mqtt connect,使用username 还是password字段作为比对校验token,如果是passoword,那么业务系统生成的token,就需要赋值给password
- 加密方式:jwt常用加密算法,要求:emqx与业务服务 都必须要设置一样的算法
- Secret: 密钥,要求:emqx与业务服务 都必须要设置一样的密钥
- Payload: 如果emqx设置了自定义的payload,同样的业务系统也需要配置
二、自建mqtt服务器
咱们用nodejs举例,在我之前的文章《ntent="mp" data-source="innerlink" href="https://www.toutiao.com/article/7330936324520116786/" rel="noopener noreferrer noopener noreferrer" target="_blank">nodejs搭建mqtt服务端》已经介绍了几种方式,大家可以参考下.我们可以基于express或者其它api框架搭建业务服务.按照emqx的经验,我们只需要在express中实现jwt认证,然后在mqtt服务端调用express的接口完成认证.
当然,对于自建服务对接业务api,我们可以高度与自身业务结合,比如一个token只能用于一次或多次校验等.
密码认证
通过密码进行身份验证,这种最简单,也是使用最多的认证方式。此时,客户端需要提供能够表明身份的凭据,例如用户名、客户端 ID 以及对应的密码并将其存储在特定数据源(数据库)中。除简单便捷的内置数据库外,EMQX 还支持通过与多类后端数据库的集成提供密码认证,包括 MySQL、PostgreSQL、MongoDB 和 Redis。
对接 HTTP API 认证
通过构建业务api,对接业务平台api来实现认证,这种方式与jwt方式类似.
更多认证方式
提供一个基于workerman/mqtt的客户端demo
<?phpnamespace process;use Biz\Constants;use Biz\Iot\Service\IotService;use support\utils\ArrayToolkit;use Workerman\Worker;use Workerman\Mqtt\Client;class MqttClient extends AbstractProcess{ private $subscribes = [ '/bike/status/update', '/bike/info/update', // 单车定时发布信息数据:位置和剩余电量,美团是通过手机app GPS定位作为位置;青桔单车是通过单车定位作为位置 '/bike/lock/result', '/bike/unlock/result' ]; private $topicActionMap = [ '/bike/status/update' => 'handleStatusUpdateMessage', '/bike/info/update' => 'handleInfoUpdateMessage', '/bike/lock/result' => 'handleLockResultMessage', '/bike/unlock/result' => 'handleUnLockResultMessage', ]; private $client; private $connectTimestamp = 0; private $retryConnectCount = 0; private $maxRetryConnectCount = 10; private $msgMap = []; public function onWorkerStart(Worker $worker) { $mqttConfig = config('mqtt.default'); if (empty($mqttConfig['listen'])) { return; } $this->client = new Client($mqttConfig['listen']); $this->client->onConnect = [$this, 'onMqttConnect'];// $this->client->onReconnect = [$this, 'onMqttReConnect']; $this->client->onMessage = [$this, 'onMqttMessage']; $this->client->onError = [$this, 'onMqttError']; $this->client->connect(); // Channel客户端连接到Channel服务端 \Channel\Client::connect('127.0.0.1', 2206); // 订阅broadcast事件,并注册事件回调 \Channel\Client::on('mqtt', function ($event_data) use ($worker, $mqttConfig) { // 向当前worker进程的所有客户端广播消息 if (!empty($event_data['token']) && $event_data['authType'] === 'jwt') { $this->client->close(); $this->client = new Client($mqttConfig['jwt']['listen'], [ 'username' => $mqttConfig['jwt']['username'], 'password' => $event_data['token'] ]); $this->client->onConnect = [$this, 'onMqttConnect']; $this->client->onMessage = [$this, 'onMqttMessage']; $this->client->onError = [$this, 'onMqttError']; $this->client->connect(); } }); } public function onMqttConnect(Client $client) { $this->connectTimestamp = time(); foreach ($this->subscribes as $subscribe) { $client->subscribe($subscribe); } } public function onMqttReConnect() { } public function onMqttError(\Throwable $error) { $this->retryConnectCount += 1; if ($this->retryConnectCount > $this->maxRetryConnectCount) { $this->client->close(); $this->retryConnectCount = 0; echo '[MQ]连接失败,超出重连次数', PHP_EOL; } } public function onMqttMessage($topic, $content) { $json = json_decode($content, true); if (empty($json) || !ArrayToolkit::requireds($json, ['did', 'timestamp'])) { echo '[MQ]数据非法,格式必须是标准json', PHP_EOL; return false; } $motorcycle = $this->getIotService()->getIotDeviceByDid($json['did']); if (empty($motorcycle)) { echo '[MQ]数据非法,设备不存在', PHP_EOL; return false; } $now = new \DateTime(); $currentTimestamp = $now->getTimestamp() * 1000; if ($currentTimestamp - $json['timestamp'] >= 1000 * 60 * 5) { echo '[MQ]收到过期数据', PHP_EOL; return false; } $uniqueKey = sprintf("%s_%s_%s", $topic, $json['did'], $json['status']); if (isset($this->msgMap[$uniqueKey]) && $json['timestamp'] <= $this->msgMap[$uniqueKey]['timestamp']) { echo '[MQ]收到重复数据', PHP_EOL; return false; } else { $this->msgMap[$uniqueKey] = $json; } echo '[MQ]接收到主题:', $topic, ',消息:', $content, PHP_EOL; if (isset($this->topicActionMap[$topic]) && method_exists($this, $this->topicActionMap[$topic])) { return call_user_func([$this, $this->topicActionMap[$topic]], $motorcycle, $topic, $json); } } protected function handleStatusUpdateMessage($motorcycle, string $topic, array $json) { try { $currentTotalPower = $json['totalPower'] ?? 100; $updRows = [ 'id' => $motorcycle['id'], 'totalPower' => $currentTotalPower, 'usedPower' => $currentTotalPower - $motorcycle['totalPower'], 'lastedConnTime' => $motorcycle['lastedConnTime'], 'connStatus' => $json['status'] === 'online' ? 1 : -1, 'usedStatus' => 0 ]; if ($json['status'] === 'online') { $updRows['lastedConnTime'] = intval(substr($json['timestamp'], 0, -3)); } $this->getIotService()->updateIotDevice($motorcycle['id'], $updRows); return true; } catch (\Throwable $e) { echo '[MQ]handleStatusUpdateMessage出错,', $e->getMessage(), PHP_EOL; return false; } } protected function handleLockResultMessage($motorcycle, string $topic, array $json) { try { $currentTotalPower = $json['totalPower'] ?? $motorcycle['totalPower']; $updRows = [ 'id' => $motorcycle['id'], 'totalPower' => $currentTotalPower, 'usedPower' => $currentTotalPower - $motorcycle['totalPower'], 'lastedConnTime' => $motorcycle['lastedConnTime'], 'usedStatus' => $json['status'] === 'fail' ? 0 : 1 ]; $this->getIotService()->updateIotDevice($motorcycle['id'], $updRows); echo '[MQ]handleLockResultMessage, ', $motorcycle['did'], '更新开锁信息', PHP_EOL; return true; } catch (\Throwable $e) { echo '[MQ]handleLockResultMessage, ', $motorcycle['did'], $e->getMessage(), PHP_EOL; return false; } } protected function handleUnLockResultMessage($motorcycle, string $topic, array $json) { try { if ($json['status'] === 'ok') { $currentTotalPower = $json['totalPower'] ?? $motorcycle['totalPower']; $updRows = [ 'id' => $motorcycle['id'], 'totalPower' => $currentTotalPower, 'usedPower' => $currentTotalPower - $motorcycle['totalPower'], 'lastedConnTime' => $motorcycle['lastedConnTime'], 'usedStatus' => 0 ]; $this->getIotService()->updateIotDevice($motorcycle['id'], $updRows); } return true; } catch (\Throwable $e) { echo '[MQ]handleUnLockResultMessage,', $e->getMessage(), PHP_EOL; return false; } } protected function handleInfoUpdateMessage($motorcycle, string $topic, array $json) { try { $currentTotalPower = $json['totalPower'] ?? $motorcycle['totalPower']; $updRows = [ 'id' => $motorcycle['id'], 'totalPower' => $currentTotalPower, 'usedPower' => $currentTotalPower - $motorcycle['totalPower'], 'lastedConnTime' => $motorcycle['lastedConnTime'], 'position' => $json['position'] ?? $motorcycle['position'] ]; $this->getIotService()->updateIotDevice($motorcycle['id'], $updRows); return true; } catch (\Throwable $e) { echo '[MQ]handleInfoUpdateMessage,', $e->getMessage(), PHP_EOL; return false; } } protected function getMotorcycles() { $items = $this->getIotService()->searchDevices([ 'type' => Constants::IOT_DEVICE_TYPE_MOTORCYCLE, 'noStatus' => Constants::IOT_DEVICE_STATUS_DISABLED ], ['id' => 'DESC'], 0, PHP_INT_MAX, ['id', 'did', 'sim', 'lastedConnTime', 'totalPower', 'usedStatus', 'connStatus', 'position']); return $items; } protected function getIotService() { return $this->getBiz()->service('Iot:IotService'); }}
