使用Firehose写数据 II

本节将使用python写数据库到firehose,然后firehose自动将数据插入到OpenSearch Serveless Collections

img

添加Data Access Policy

上一节我们提到:

  1. Firehose的IAM Role上要有写入OpenSearch Serverless的权限(已添加)
  2. OpenSearch Serverlessdata access policy上要允许Firehose的IAM Role来访问它。

我们来完成第二步。在OpenSearch Serverless集群上点击Manage data access:

image-20230818092324599

进入该集群使用的policy:

image-20230818092348201

点击Edit:

image-20230818092407353

额外添加一个role,principal选择Firehose的IAM Role,然后点击Grant:

image-20230818094615495

为Firehose IAM Role授予对集群的所有访问权限:

image-20230818092845974

点击保存:

image-20230818092906818

使用python脚本往firehose写数据

创建一个firehose-generator.py,内容如下:

import sys, getopt
import datetime
import json
import boto3
import random

def get_data():
    return {
            'event_time': datetime.datetime.now().isoformat(),
            'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
            'price': round(random.random() * 100, 2)}

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
                DeliveryStreamName=stream_name,
                Record={
                    "Data": json.dumps(data)
                    }
                    )

def main(argv):
    stream_name = ''
    region_name = 'us-east-1'

    opts, args = getopt.getopt(argv, "s:r:")
    for opt, arg in opts:
        if opt == '-s':
            stream_name = arg
        elif opt == '-r':
            region_name = arg
    if stream_name == '':
        print("required argument -s <firehose delivery stream name>")
        exit()
    print(f"sending to Kinesis Firehose Deliver Stream {stream_name} in Region {region_name} \n")
    generate(stream_name, boto3.client('firehose', region_name=region_name))

if __name__ == '__main__':
    main(sys.argv[1:])

运行脚本:

python firehose-generator.py -s firehose-serverless-delivery -r us-east-1

脚本会随时生成股票代码及脚本价格,并写入到firehose:

image-20230818093119514

等待1-2min。

进入OpensSearch Dashboard,进入Dev Tools

image-20230818093234334

查询文档:

GET firehose-ingest/_search
{
    "query": {
        "match_all": {}
    }
}

我们发现Firehose的数据已经成功写入到Opensearch Serverless中:

image-20230818110057189