亚马逊AWS官方博客

基于 Amazon ECS Fargate 实现终端设备通过 REST API 与 Amazon MSK 完成海量消息流式交互

一、 业务背景

当客户端或 IoT 设备的应用程序需要传递日志等消息时,传统方式一般要通过 HTTP 的方式请求到服务端应用程序进行接受,处理,转发或存储,再进行分析处理(架构如下图)。此架构方案,对实时性、扩展性以及安全性上都有很大的挑战。

原始架构图

二、改造后架构图

改造后架构图

三、架构优势

  1. 客户端及 IoT 设备的应用程序,无需改造,可保持使用 HTTP 协议的方式。
  2. 不支持 REST API 调用,将 Kafka API 转换为 REST API。
  3. 从原始的离线处理方式处理转换为实时分析处理。
  4. 使用 Kafka REST Proxy 镜像部署于 Amazon ECS Fargate 的方式,使用 ELB 提供对外的 HTTP REST 服务解决上述问题,并且可以根据流量负载的情况实现自动伸缩,承载终端设备的海量请求。
  5. 通过使用托管服务提升安全性。

四、环境准备

1. 在 cn-northwest-1 控制台创建对应的 VPC(kafka-ecs-proxy)。

2.在 VPC(kafka-ecs-proxy)中准备一台 EC2 实例,并使用 aws configure 配置 Access Key 和 Secret Access Key。

五、MSK Cluster

1.在 VPC(kafka-ecs-proxy)中按需创建 MSK 集群。

2.创建完成后点击查看客户端信息获取集群 bootstrap-server。

3.配置 MSK 集群安全组以使 EC2 可访问。

4.在 EC2 中,安装并使用命令行工具创建对应 Topic。

$ sudo yum install java
$ wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
$ tar -zxf kafka_2.12-2.7.2.tgz   
$ kafka_2.12-2.7.2/bin/kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num}  

六、ECR

1.在 ECR 控制台创建存储库。

2.在 EC2 中安装 docker,拉取 confluentinc/cp-kafka-rest 镜像并推送至 ECR(官方镜像地址:https://hub.docker.com/r/confluentinc/cp-kafka-rest)。

$ sudo yum update 
$ sudo yum install docker   
$ sudo systemctl start docker  
$ sudo docker pull confluentinc/cp-kafka-rest 
$ sudo docker tag confluentinc/cp-kafka-rest:latest <YOUR_ACCOUNT_ID>.dkr.ecr.cn-northwest-1.amazonaws.com.cn/kafka-rest:latest
$ sudo docker push <YOUR_ACCOUNT_ID>.dkr.ecr.cn-northwest-1.amazonaws.com.cn/kafka-rest:latest

七、ALB

1.在控制台中创建 ALB

2.创建目标组

3.返回 ALB 创建页面刷新目标组后完成创建

4.ALB 安全组需添加入站 8082 端口

八、ECS

1.在 ECS 控制台中创建 ECS 集群

2.创建任务定义

3.添加容器后,完成创建任务定义

4.创建 ECS 服务

a)创建基于 Fargate 的服务。

b)选择私有子网,并配置安全组。

c)配置步骤 5 中创建的 ALB 及注册目标组。

d)配置 AutoScaling,并选择基于哪项指标扩展(可配置多个策略)。

e)完成创建。

f)配置 MSK 安全组,添加 ECS Service 的安全组 Searvi-4828 入站规则。

九、测试

1.在公网环境下进行公网测试访问 ALB 地址:

# Get a list of topics
$ curl http://<YOUR_ALB_URL>:8082/topics
# Produce a message with JSON data
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
  	--data '{"records":[{"value":{"name": "testUser"}}]}' \
      "http://<YOUR_ALB_URL>:8082/topics/http-logs"

2.使用压力测试工具 Siege 进行压测,并观测 AutoScaling 情况(默认为 2,最小任务数 2,最大任务数 20)。

#安装siege
$ sudo amazon-linux-extras install epel
$ sudo yum install siege
#并发200持60秒
$ siege -c200 -t60S --content-type "application/vnd.kafka.json.v2+json" 'http:// <YOUR_ALB_URL>:8082/topics/http-logs POST {"records":[{"value":{"name": "testUser"}}]}' 

十、监控

1.打开 CloudWatch 控制台,导航 Container Insights,即可对 ECS 集群、服务以及任务进行监控。

总结

这篇文章呈现了使用 ALB 配合 Amazon ECS Fargate为Amazon MSK 设置 REST API 的快速构建方式。 该解决方案可以帮助您从任何 IoT 设备、客户端或任意编程语言向 Amazon MSK 生成或使用消息,并且可以使用 Amazon ECS Fargate 自动伸缩的能力灵活扩展,以满足海量实时消息的流失处理。

本篇作者

胡靖麟

西云数据解决方案架构师,10 年以上计算广告、微服务、大数据等领域的互联网研发、架构设计实践经验。