使用 Amazon MSK 和 Amazon Redshift 简化数据流摄取以进行分析 大数据博客
简化大数据流的注入过程:使用 Amazon MSK 和 Amazon Redshift
关键要点
在此文中,我们介绍了如何利用 Amazon MSK管理 Apache Kafka 流服务 和 Amazon Redshift 进行实时数据处理和分析,简化数据流的注入过程。此集成允许以每秒数百 MB 的速度将流数据低延迟注入 Amazon Redshift,消除了将数据暂存于 S3 的需求。该解决方案特别适合希望简化数据管道并降低运营成本的数据工程师。
在 2022 年底,AWS 宣布了实时流数据注入到 Amazon Redshift 的全面可用性,这使得将 Amazon Kinesis 数据流 和 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 的数据直接注入 Redshift 成为可能,免去了将流数据先暂存到 Amazon S3 的步骤。
从 Amazon MSK 到 Amazon Redshift 的数据流注入代表了当前实时数据处理和分析的一种前沿方式。Amazon MSK 是一个高度可伸缩和完全管理的 Apache Kafka 服务,能够无缝收集和处理大量的数据流。将流数据集成到 Amazon Redshift 中为组织提供了巨大的价值,使其能够利用实时分析和数据驱动的决策来实现更高效的运营。
这一集成使您能够在数秒内实现低延迟地将每秒数百 MB 的流数据注入到 Amazon Redshift,同时确保最新的信息可供分析使用。由于不需要在 Amazon S3 中暂存数据,因此 Amazon Redshift 能以更低的延迟和没有中间存储成本的方式进行数据注入。
您可以使用 SQL 语句在 Redshift 集群上配置 Amazon Redshift 的流数据注入,以便与 MSK 主题进行认证和连接。这个解决方案是希望简化数据管道和降低运营成本的数据工程师的理想选择。
在本文中,我们将详细介绍如何从 Amazon MSK 配置 Amazon Redshift 的流数据注入。
解决方案概述
下图展示了您将使用的 AWS 服务和功能架构。
工作流程包括以下步骤:
首先配置 Amazon MSK Connect 源连接器,创建一个 MSK 主题,生成模拟数据,并将其写入 MSK 主题。在本文中,我们使用的是模拟客户数据。接下来使用 Query Editor v2 连接到 Redshift 集群。最后,您需要在 Amazon Redshift 中配置外部架构并创建一个物化视图,以从 MSK 主题中消费数据。这个解决方案不依赖于 MSK Connect 的接收连接器来将数据导出到 Amazon Redshift。详细的解决方案架构图如下所示。
工作流程的步骤包括:
您在 VPC 的私有子网中部署 MSK Connect 源连接器、MSK 集群和 Redshift 集群。MSK Connect 源连接器使用定义在 AWS 身份和访问管理 (IAM) 中的细粒度权限 内联策略,以允许源连接器在 MSK 集群上执行操作。MSK Connect 源连接器的日志将被捕获并发送到 Amazon CloudWatch 日志组。MSK 集群使用 自定义 MSK 集群配置,以允许 MSK Connect 连接器在 MSK 集群上创建主题。MSK 集群的日志被捕获并发送到 Amazon CloudWatch 日志组。Redshift 集群使用 IAM 内联策略中定义的细粒度权限,允许该集群在 MSK 集群上执行操作。您可以使用 Query Editor v2 连接到 Redshift 集群。前提条件
为了简化前提资源的配置,您可以使用以下 AWS CloudFormation 模板:

在启动堆栈时完成以下步骤:
在 堆栈名称 中,输入有意义的堆栈名称,例如 prerequisites。选择 下一步。选择 下一步。选中 我确认 AWS CloudFormation 可能会创建带有自定义名称的 IAM 资源。选择 提交。CloudFormation 堆栈会创建以下资源:
资源描述自定义 VPC customvpc在三个可用区创建,包含三个 公共子网 和三个 私有子网。互联网网关附加到 Amazon VPC。NAT 网关与 弹性 IP 关联,并在公共子网中部署。三个 安全组mskconnectsg、redshiftsg、mskclustersg。两个 CloudWatch 日志组mskconnectlogs 和 mskclusterlogs。两个 IAM 角色mskconnectrole 和 redshiftrole,分别包含针对 MSK Connect 和 Amazon Redshift 的 IAM 权限。自定义 MSK 集群配置允许 MSK Connect 连接器在 MSK 集群上创建主题。MSK 集群在 customvpc 的三个私有子网中部署了三个 broker。Redshift 集群子网组使用 customvpc 的三个私有子网。Redshift 集群在 Redshift 集群子网组中的私有子网中部署了单个节点。创建 MSK Connect 自定义插件
在本文中,我们将使用 Amazon MSK 数据生成器 部署在 MSK Connect 中,以生成模拟客户数据,并将其写入 MSK 主题。
完成以下步骤:
从 GitHub 下载 Amazon MSK 数据生成器 JAR 文件及其依赖项。
将 JAR 文件上传到您 AWS 账户中的 S3 存储桶。
在 Amazon MSK 控制台,选择导航窗格中的 自定义插件。
选择 创建自定义插件。选择 浏览 S3,搜索并选择您上传到 Amazon S3 的 Amazon MSK 数据生成器 JAR 文件,然后选择 选择。在 自定义插件名称 中,输入 mskdatagenplugin。选择 创建自定义插件。自定义插件创建后,其状态将显示为 活动,然后您可以进行到下一步。
创建 MSK Connect 连接器
完成以下步骤以创建您的连接器:
在 Amazon MSK 控制台,选择导航窗格中的 连接器。选择 创建连接器。对于 自定义插件类型,选择 使用现有插件。选择 mskdatagenplugin,然后选择 下一步。在 连接器名称 中,输入 mskdatagenconnector。对于 集群类型,选择 自管理的 Apache Kafka 集群。对于 VPC,选择 customvpc。对于 子网 1,选择您第一个可用区内的私有子网。为 customvpc 创建的 CloudFormation 模板,我们使用奇数 CIDR 范围作为公共子网,偶数 CIDR 范围作为私有子网:
飞鱼加速器官方网下载公共子网 CIDRs:101010/24,101030/24 和 101050/24私有子网 CIDRs:101020/24,101040/24 和 101060/24
对于 子网 2,选取您第二个可用区内的私有子网。
对于 子网 3,选择您第三个可用区内的私有子网。对于 引导服务器,输入您的 MSK 集群的 TLS 验证引导服务器列表。要检索您的 MSK 集群引导服务器,请导航到 Amazon MSK 控制台,选择 集群,选择 mskcluster,然后选择 查看客户端信息,复制 TLS 的引导服务器值。
对于 安全组,选择 使用访问该集群的特定安全组,并选择 mskconnectsg。在 连接器配置 中,将默认设置替换为:
connectorclass=comamazonawsmskdatagenGeneratorSourceConnectortasksmax=2genkpcustomerwith=#{Codeisbn10}genvcustomernamewith=#{Namefullname}genvcustomergenderwith=#{Demographicsex}genvcustomerfavoritebeerwith=#{Beername}genvcustomerstatewith=#{Addressstate}genkporderwith=#{Codeisbn10}genvorderproductidwith=#{numbernumberbetween 101109}genvorderquantitywith=#{numbernumberbetween 15}genvordercustomeridmatching=customerkeyglobalthrottlems=2000globalhistoryrecordsmax=1000valueconverter=orgapachekafkaconnectjsonJsonConvertervalueconverterschemasenable=false
对于连接器容量,选择 配置。
对于 每个工作者的 MCU 数量,选择 1。对于 工作者数量,选择 1。对于 工作者配置,选择 使用 MSK 默认配置。对于 访问权限,选择 mskconnectrole。选择 下一步。对于加密,选择 TLS 加密流量。选择 下一步。对于 日志传输,选择 传送到 Amazon CloudWatch Logs。选择 浏览,选择 mskconnectlogs,然后选择 选择。选择 下一步。检查并选择 创建连接器。自定义连接器创建后,其状态将显示为 运行中,您可以进行下一步。
配置 Amazon Redshift 流数据注入
完成以下步骤以设置流数据注入:
使用 Query Editor v2 连接到您的 Redshift 集群,使用数据库用户名 awsuser 和密码 Awsuser123 进行身份验证。使用以下 SQL 语句从 Amazon MSK 创建外部架构。在下面的代码中,填入 redshiftrole IAM 角色和值,和 mskcluster 的 集群 ARN。
sqlCREATE EXTERNAL SCHEMA mskexternalschemaFROM MSKIAMROLE lt插入您的 redshiftrole arngtAUTHENTICATION iamCLUSTERARN lt插入您的 mskcluster arngt
选择 运行 执行 SQL 语句。使用以下 SQL 语句创建 物化视图:sqlCREATE MATERIALIZED VIEW mskmview AUTO REFRESH YES ASSELECT kafkapartition kafkaoffset kafkatimestamptype kafkatimestamp kafkakey JSONPARSE(kafkavalue) as Data kafkaheadersFROM devmskexternalschemacustomer
选择 运行 执行 SQL 语句。您现在可以使用以下 SQL 语句查询物化视图:sqlselect from mskmview LIMIT 100
选择 运行 执行 SQL 语句。要监控通过流数据注入加载的记录进度,您可以使用 SYSSTREAMSCANSTATES 监控视图,执行以下 SQL 语句:sqlselect from SYSSTREAMSCANSTATES
选择 运行 执行 SQL 语句。要监控通过流数据注入遇到的记录错误,您可以使用 SYSSTREAMSCANERRORS 监