案例中心

  • 首页 i(name 使用基于CDC的UPSERT通过开放表格式同步数据湖,AWS Glue和Amazon MSK 大数据

使用基于CDC的UPSERT通过开放表格式同步数据湖,AWS Glue和Amazon MSK 大数据

2026-01-27 13:07:06

实现数据湖的CDC驱动UPSERT与AWS Glue和Amazon MSK的协调

关键要点

数据湖是现代数据架构的重要组成部分。变更数据捕捉CDC用于识别并捕捉数据库中数据的变化。使用Amazon MSK和AWS Glue可以高效同步和管理数据湖中的数据。通过集成Debezium和Delta Lake,实现CDC数据和UPSERT操作。

在当今的行业环境中,数据湖已经成为现代数据架构的基石,为大量结构化和非结构化数据提供了存储解决方案。变更数据捕捉CDC是识别和捕捉数据库中数据变化的过程,并将这些变化传递到下游系统。通过捕获源数据库中每次事务的变化并将其转移到目标系统,可以保持系统的同步,并帮助实现分析用例及零停机时间的数据库迁移。

然而,高效地管理和同步这些数据湖中的数据仍然面临巨大挑战。确保跨分布式数据湖的数据一致性和完整性对于决策和分析至关重要。不准确或过时的数据可能导致错误的洞察和商业决策。企业需要同步的数据以获得可行的洞察,并迅速响应不断变化的市场情况。可扩展性是数据湖的关键考虑因素,因为它们需要能够处理不断增长的数据量,而不影响性能或产生过高的成本。

为有效解决这些问题,我们建议使用Amazon Managed Streaming for Apache Kafka (Amazon MSK),这是一个完全托管的Apache Kafka服务,提供了一种无缝的方法来获取和处理流数据。我们使用AWS管理的MSK Connect服务来部署和运行Kafka Connect,构建一个端到端的CDC应用程序,使用Debezium MySQL连接器处理、插入、更新和删除MySQL记录,并通过Confluent的Amazon Simple Storage Service (Amazon S3)接收连接器将数据作为原始数据写入Amazon S3,以便其他下游应用程序进一步使用。为了有效处理批数据,我们使用AWS Glue,这是一个无服务器的数据集成服务,利用Spark框架处理来自S3的数据并将其复制到开放表格式层。开放表格式可管理大量文件作为表,并支持现代分析数据湖操作,如逐行插入、更新、删除和时间旅行查询。我们选择使用Delta Lake作为开放表格式的示例,但您也可以使用Apache Iceberg或Apache Hudi获得相同的结果。

使用基于CDC的UPSERT通过开放表格式同步数据湖,AWS Glue和Amazon MSK 大数据

本文展示了一个综合CDC系统的构建,使得能够处理来源于Amazon关系数据库服务(Amazon RDS)的MySQL的CDC数据。最初,我们使用Amazon MSK创建了所有修改的记录的原始数据湖,实时写入Amazon S3。然后,我们使用AWS Glue进行CDC原始数据湖的批量处理。这种设置的一个关键优势是,您可以完全控制整个过程,从捕获数据库中的变化到根据您的特定需求转换数据。这种灵活性使您能够将系统调整到不同的用例中。

这个过程通过与MSK Connect的集成实现,使用Debezium MySQL连接器捕获数据,然后通过Confluent S3接收连接器将数据写入Amazon S3。随后,使用AWS Glue ETL作业对来自S3的数据进行处理并存储在数据湖层中。最后,使用Amazon Athena查询Delta Lake表。

注意:如果您需要对CDC数据进行实时处理,可以跳过批处理方法,直接使用AWS Glue流处理作业。该作业将直接连接到MSK中的Kafka主题,在变化发生时立即抓取数据。然后可以按照需要对数据进行处理和转换,在Amazon S3上创建反映您业务最新更新的Delta Lake。这种方法确保您拥有最新的数据以进行实时分析。

解决方案概述

以下图示说明了您在本博客文章中实现的架构。每个数字表示解决方案的一个主要组件。

工作流程包括以下步骤:

来自MySQL的近实时数据捕获及流入Amazon S3

过程从Amazon RDS中的数据开始。使用Debezium连接器捕获RDS实例中数据的变化,几乎实时处理。Debezium是一个分布式平台,将现有数据库中的信息转换为事件流,使应用程序能够检测并立即响应数据库中的逐行变化。Debezium构建在Apache Kafka之上,提供了一组兼容Kafka Connect的连接器。捕获的数据变化流被推送到Amazon MSK主题。MSK是一个托管服务,使在AWS上运行Apache Kafka变得简单。经过处理的数据流主题在JSON格式下从MSK流向Amazon S3。Confluent S3接收连接器允许从MSK集群到S3存储桶的近实时数据传输。

对CDC原始数据进行批处理并写入数据湖

设置AWS Glue ETL作业处理原始CDC数据。此作业读取来自S3原始存储桶的已标记数据,并将数据按开放文件格式Delta写入数据湖。该作业也在AWS Glue数据目录中创建Delta Lake表。Delta Lake是建立在现有数据湖之上的开源存储层。它增加了ACID事务和版本控制等功能,以提高数据的可靠性和可管理性。

利用无服务器交互查询服务分析数据

Athena是一种无服务器的交互式查询服务,可以用于查询在Glue数据目录中创建的Delta Lake表。这允许没有管理基础设施的情况下一次性分析数据。

本文通过使用AWS CloudFormation模板在useast1 AWS区域生成解决方案资源。在接下来的部分,我们将向您展示如何配置资源并实现解决方案。

使用AWS CloudFormation配置资源

在这篇文章中,您将使用以下两个CloudFormation模板。使用两个不同模板的好处是,您可以根据用例拆分CDC管道和AWS Glue处理的资源创建,如果您希望仅创建特定的处理资源。

vpcmskmskconnectrdsclientyaml 此模板设置CDC管道资源,如虚拟私有云VPC、子网、安全组、AWS身份与访问管理IAM角色、NAT、互联网网关、Amazon弹性计算云Amazon EC2客户端、Amazon MSK、MSKConnect、RDS和S3。

gluejobsetupyaml 此模板设置数据处理资源,如AWS Glue表、数据库和ETL。

西部世界vqn

配置MSK和MSK Connect

首先,您将使用Debezium连接器配置MSK和MSK Connect,以捕获表中的增量变化,并使用S3接收连接器写入Amazon S3。vpcmskmskconnectrdsclientyaml堆栈创建了VPC、公共和私有子网、安全组、S3存储桶、Amazon MSK集群、具有Kafka客户端的EC2实例、RDS数据库和MSK连接器及其工作配置。

启动vpcmskmskconnectrdsclient堆栈,使用CloudFormation模板:

提供如下参数值:

ABC参数描述示例值EnvironmentName为资源名称添加前缀的环境名。mskdeltacdcpipelineDatabasePassword数据库管理员帐户密码。S3cretPwd99InstanceTypeMSK客户端的EC2实例类型。t2microLatestAmiIdEC2实例的Amazon Linux 2023的最新AMI ID。您可以使用默认值。/aws/service/amiamazonlinuxlatest/al2023amikernel61x8664VpcCIDR该VPC的IP范围CIDR表示法。1019200/16PublicSubnet1CIDR第一个可用区的公共子网的IP范围CIDR表示法。10192100/24PublicSubnet2CIDR第二个可用区的公共子网的IP范围CIDR表示法。10192110/24PrivateSubnet1CIDR第一个可用区的私有子网的IP范围CIDR表示法。10192200/24PrivateSubnet2CIDR第二个可用区的私有子网的IP范围CIDR表示法。10192210/24PrivateSubnet3CIDR第三个可用区的私有子网的IP范围CIDR表示法。10192220/24堆栈创建过程大约需要一个小时。创建堆栈后,检查输出标签。

接下来,您需要设置AWS Glue数据处理资源,如AWS Glue数据库、表和ETL作业。

使用AWS Glue在S3数据湖中实现UPSERT与Delta Lake

gluejobsetupyaml CloudFormation模板创建数据库、IAM角色和AWS Glue ETL作业。从vpcmskmskconnectrdsclient堆栈的输出标签中提取S3BucketNameForOutput和S3BucketNameForScript的值,以便在该模板中使用。完成以下步骤:

启动gluejobsetup堆栈。

提供如下参数值:

ABC参数描述示例值EnvironmentName为资源名称添加前缀的环境名。gluejobsetupGlueDataBaseName数据目录数据库的名称。gluecdcblogdbGlueTableName数据目录表的名称。blogcdctblS3BucketForGlueScript用于AWS Glue ETL脚本的存储桶名称。使用上一个堆栈中的S3存储桶名称。例如,awsgluescript{AWSAccountId}{AWSRegion}{EnvironmentName}GlueWorkerTypeAWS Glue作业的工作类型。例如,G1XG1XNumberOfWorkersAWS Glue作业中的工人数。3S3BucketForOutput从AWS Glue作业写入数据的存储桶名称。awsglueoutput{AWSAccountId}{AWSRegion}{EnvironmentName}S3ConnectorTargetBucketnameAmazon MSK S3接收连接器从Kafka主题写入数据的存储桶名称。msklab{AWSAccountId}targetbucket堆栈创建过程大约需要2分钟。创建堆栈后,检查输出标签。

在gluejobsetup堆栈中,我们创建了AWS Glue数据库和AWS Glue作业。为了更清晰,您可以检查使用CloudFormation模板生成的AWS Glue数据库和作业。

成功创建CloudFormation堆栈后,您可以继续使用AWS Glue ETL作业处理数据。

运行AWS Glue ETL作业

为了处理从Amazon MSK创建的S3存储桶中的数据,使用您在前一部分设置的AWS Glue ETL作业,完成以下步骤:

在CloudFormation控制台中,选择堆栈gluejobsetup。

在输出标签下,从GlueJobName中获取AWS Glue ETL作业的名称。在以下截图中,名称为GlueCDCJobgluedeltacdc。

在AWS Glue控制台中,选择导航窗格中的ETL作业。

搜索AWS Glue ETL作业名称GlueCDCJobgluedeltacdc。选择作业名称以打开详细信息页面。

选择运行以启动作业。在运行标签页下确认作业是否成功运行。

从gluejobsetup模板输出中获取OutputBucketName。

在Amazon S3控制台中,导航到S3存储桶以验证数据。

注意:我们启用了AWS Glue 作业书签,这将确保作业在每次运行中处理新数据。

使用Athena查询Delta Lake表

在AWS Glue ETL作业成功为数据目录中的处理数据创建Delta Lake表后,按照以下步骤使用Athena验证数据:

在Athena控制台中,导航到查询编辑器。选择数据源为数据目录。选择数据库gluecdcblogdb,该数据库由gluejobsetup堆栈创建。运行以下查询以验证数据,预览数据并查找总计。

sqlSELECT FROM gluecdcblogdbblogcdctbl ORDER BY custid DESC LIMIT 40SELECT COUNT() FROM gluecdcblogdbblogcdctbl

以下截图显示了示例查询的输出。

上传增量CDC数据进行进一步处理

在我们处理初始完整加载后,让我们在MySQL中执行插入、更新和删除记录,这些记录将被Debezium MySQL连接器处理,并通过Confluent S3接收连接器写入Amazon S3。

在Amazon EC2控制台中,转到使用CloudFormation模板创建的名为KafkaClientInstance的EC2实例。

使用SSM登录EC2实例。选择KafkaClientInstance,然后选择连接。

运行以下命令将数据插入到RDS表中。使用CloudFormation堆栈参数标签中的数据库密码。

shellsudo su ec2userRDSAURORAENDPOINT=aws rds describedbinstances region useast1 jq r DBInstances[] select(DBName == salesdb) EndpointAddressmysql f u master h RDSAURORAENDPOINT password

现在在CUSTOMER表中执行插入。

sqluse salesdbINSERT into CUSTOMER values(8887Customer Name 8887Market segment 8887)INSERT into CUSTOMER values(8888Customer Name 8888Market segment 8888)INSERT into CUSTOMER values(8889Customer Name 8889Market segment 8889)

重新运行AWS Glue作业以更新Delta Lake表中的新记录。

使用Athena控制台验证数据。

在CUSTOMER表中执行插入、更新和删除。

sqlUPDATE CUSTOMER SET NAME=Customer Name update 8888MKTSEGMENT=Market segment update 8888 where CUSTID = 8888UPDATE CUSTOMER SET NAME=Customer Name update 8889MKTSEGMENT=Market segment update 8889 where CUSTID = 8889DELETE FROM CUSTOMER where CUSTID = 8887INSERT into CUSTOMER values(9000Customer Name 9000Market segment 9000)

重新运行AWS Glue作业以更新Delta Lake表中的插入、更新和删除记录。

使用Athena控制台验证数据,以验证Delta Lake表中更新和删除的记录。

清理

要清