飞鱼加速器官方网下载
  • 37

使用 AWS Step Functions 和 MQTT 互动调用本地资源 计算博客

使用 AWS Step Functions 和 MQTT 互动调用本地资源

作者:James Beswick,发布日期:2024年1月30日,分类:AWS IoT核心AWS StepFunctions无服务器架构永久链接 分享

关键要点

使用 AWS Step Functions 和 MQTT,可以安全高效地访问存储在本地的资源,而无需更改现有网络配置。本文展示了如何通过连接AWS IoT Core的 MQTT 协议,管理本地工作程序来访问本地数据。采用无服务器架构,可以避免在本地资源上开放入站端口或需要公共端点。

本地工作负载在 AWS 上有时需要访问存储在本地数据库和存储位置的数据。传统的解决方案涉及防火墙入站规则、VPN 隧道或公共端点。

在这篇博客文章中,我们将演示如何通过使用 MQTT 协议 (AWS IoT核心) 与 AWS StepFunctions 来将任务派发给本地工作者,以访问或检索存储在本地的数据。状态机可以在不开放入站端口或公共端点的情况下与本地工作者进行通信。工作者可以在 网络地址转换(NAT) 路由器后运行,同时与 AWS 云保持双向连接。这种方式提供了更安全且具有成本效益的方法来访问存储在本地的数据。

概述

通过将 Step Functions 与 AWSLambda 及 AWS IoTCore 结合使用,你可以在不改变现有网络配置的情况下安全地访问存储在本地的数据。

AWS IoT Core 允许你连接物联网设备并将消息路由到 AWS 服务,而无需管理基础设施。通过使用在本地运行的 Docker 容器作为代理 IoTThing,你可以利用 AWS IoT Core 的完全托管的 MQTT 消息代理服务,适用于非物联网用例。

MQTT 订阅者通过 MQTT 主题接收信息。MQTT 主题作为发布者和订阅者之间的匹配机制。概念上,MQTT 主题表现得像一个临时通知通道。你几乎可以无限制地大规模创建主题。在 SaaS 应用程序中,例如,你可以每个 租户 创建主题。了解更多关于 MQTT 主题设计的信息,请访问 这里。

飞鱼加速器官方网下载

以下参考架构展示了如何使用 AWS 无服务器应用程序模型 (AWS SAM) 进行部署,使用 Step Functions 来协调工作流,使用 AWSLambda 来发送和接收本地消息,并使用 AWS IoT Core 提供 MQTT 消息代理、证书和平政策管理,以及发布/订阅主题。

启动状态机,可以选择“按需”或按计划。状态:“Lambda 调用分派作业到本地”将消息发布到 AWS IoT Core 中的 MQTT 消息代理。消息代理将消息发送到与在本地容器中运行作业的工作者租户相对应的主题。本地容器接收到消息并开始执行工作。使用客户端证书进行身份验证,并且附加的策略限制了工作者对仅租户主题的访问。本地容器中的工作者可以访问本地资源,如数据库或存储位置。本地容器将结果和作业状态发送回另一个 MQTT 主题。AWS IoT Core 规则调用“TaskToken Done”Lambda 函数。Lambda 函数通过 SendTaskSuccess 或 SendTaskFailure API 将结果提交给 Step Functions。

部署和测试示例

确保你可以通过终端管理 AWS 资源,并且:

已安装最新版本的 AWS CLI 和 AWS SAM CLI。你有一个 AWS 账户。如果没有,请访问 此 页面。你的用户具有足够的权限来管理 AWS 资源。Git 已安装。Python 版本 311 或更高版本已安装。Docker 已安装。

你可以在 GitHub 仓库 获取访问,并遵循 这些步骤 部署示例。

使用 AWS Step Functions 和 MQTT 互动调用本地资源 计算博客

awsresources 目录包含所需的 AWS 资源,包括状态机、Lambda 函数、主题和策略。onpremworker 目录包含 Docker 容器镜像工件。你可以使用它在本地运行本地工作者。

在这个示例中,工作者容器添加两个数字,作为以下格式的输入:

json{ a 15 b 42}

在实际场景中,你可以用业务逻辑替代这个操作。例如,从本地数据库检索数据,生成汇总,然后将结果提交回你的状态机。

按照 这些步骤 进行端到端测试示例。

在没有物联网设备下使用 AWS IoT Core

在示例用例中没有物联网设备。然而,完全托管的 MQTT 消息代理 在 AWS IoT Core 中允许你将消息路由到 AWS 服务,而无需管理基础设施。

AWS IoT Core 使用 X509 客户端证书 进行客户端身份验证。你可以 附加一个策略 到客户端证书,允许客户端仅发布和订阅特定主题。此方法不需要在本地工作者容器中使用 IAM 凭证。

AWS IoT Core 的安全性、成本效率、托管基础设施和可扩展性,使其非常适合许多超出典型物联网用例的混合应用程序。

从 Step Functions 派发作业并等待响应

当状态机到达将作业派发到本地工作者的状态时,执行会暂停并等待该作业完成。Step Functions 支持 三种集成模式: 请求响应、同步运行作业 和 等待回调任务令牌。示例使用 “等待回调任务令牌”集成。这允许状态机暂停并等待长达 1 年 的回调。

当本地工作者完成作业时,它会发布一条消息到 AWS IoT Core 中的主题。AWS IoT Core 中的规则随后调用一个 Lambda 函数,该函数通过调用 SendTaskSuccess 或 SendTaskFailure API 将结果发送回状态机。

你可以通过在 Amazon States Language (ASL) 的任务中添加 HeartbeatSeconds 来防止状态机超时。当作业冻结并且没有调用 SendTaskFailure API 时,将发生超时。HeartbeatSeconds 通过 SendTaskHeartbeat API 调用从工作者发送心跳,并且应少于指定的 TimeoutSeconds。

要为你的状态机中的任务创建一个等待回调令牌的 ASL,你可以使用以下代码:

json{ Type Task Resource arnawsstateslambdainvokewaitForTaskToken Parameters { FunctionName {LambdaNotifierToWorkerArn} Payload { Input TaskToken TaskToken } }}

“waitForTaskToken” 后缀表示任务必须等待回调。状态机生成一个唯一的回调令牌,可通过 TaskToken 内置变量访问,并将其作为输入传递给在 FunctionName 中定义的 Lambda 函数。

然后,Lambda 函数通过 AWS IoT Core 的主题将该令牌发送到本地工作者。

Lambda 并不是唯一一个支持 等待回调 集成的服务 完整列表可以在 这里 查看。

除了派发任务并获取结果之外,你还可以实现进度跟踪和关机机制。要跟踪进度,工作者通过单独的主题发送指标。

根据你当前的实现,你有几种选择:

将工作者的进度数据存储在 Amazon DynamoDB 中,并通过 Lambda 函数的 REST API 调用进行可视化,Lambda 函数从 DynamoDB 表中读取数据。有关如何直接从主题存储数据到 DynamoDB 的更多信息,请参考 本教程。

为了获得反应式用户体验,当新的进度数据到达时创建一个规则来调用 Lambda 函数。打开一个 WebSocket 连接到后端。Lambda 函数通过 WebSocket 将进度数据直接发送到前端。

为实现关机机制,可以在工作者上使用单独线程运行作业,并订阅状态机发布关机消息的主题。如果接收到关机消息,则结束工作者线程,并发送返回状态,包括任务的回调令牌。

使用 AWS IoT Core 规则和 Lambda 函数

工作者的作业结果消息并不会直接到达 Step Functions API。相反,AWS IoT Core 规则和一个专用的 Lambda 函数将状态消息转发到 Step Functions。这使得 AWS IoT Core 策略中可以更细粒度的权限,进而提高了安全性,因为工作者容器只能发布和订阅特定主题,并且不在本地存在 IAM 凭证。

Lambda 函数的 执行角色 包含对 SendTaskSuccess、SendTaskHeartbeat 和 SendTaskFailure API 调用的权限。

另一种方式是,工作者可以直接在 Step Functions 工作流中运行 API 调用,从而替代 AWS IoT Core 中的主题、规则和 Lambda 函数调用 Step Functions API 的需要。这种方法需要在工作者容器中使用 IAM 凭证。你可以使用 AWS Identity and Access Management Roles Anywhere 来获取临时安全凭证。随着工作者功能的逐步演变,你可以增加更多的 AWS API 调用,同时在 IAM 执行角色中添加权限。

清理

在此解决方案中使用的服务符合 AWS 免费套餐。要清除存储库中 awsresources/ 目录的资源,请运行:

bashsam delete

这将删除所有通过 templateyml 文件配置的资源。

要从 AWS 中删除客户端证书,请导航到 AWS IoT Core 证书 并删除在手动部署步骤中添加的证书。

最后,停止本地的 Docker 容器并将其删除:

bashdocker rm force mqttlocalclient

最后,删除容器镜像:

bashdocker rmi mqttclientwaitfortoken

结论

通过使用 MQTT 和 AWS IoT Core 控制的工作者访问本地资源,并使用 Step Functions 进行管理,是运行本地作业的一种安全、反应迅速且具成本效益的方法。考虑将你的混合工作负载从低效的轮询或调度器更新为本文所述的反应式方法。这将提供更好的用户体验,快速派发和跟踪云外的作业。

欲获取更多无服务器学习资源,请访问 无服务器网站

标签 众筹无服务器