本节将使用python写数据库到firehose,然后firehose自动将数据插入到OpenSearch Serveless Collections
:
上一节我们提到:
OpenSearch Serverless
的权限(已添加)OpenSearch Serverless
的data access policy
上要允许Firehose的IAM Role来访问它。我们来完成第二步。在OpenSearch Serverless
集群上点击Manage data access
:
进入该集群使用的policy:
点击Edit:
额外添加一个role,principal选择Firehose的IAM Role
,然后点击Grant:
为Firehose IAM Role授予对集群的所有访问权限:
点击保存:
创建一个firehose-generator.py
,内容如下:
import sys, getopt
import datetime
import json
import boto3
import random
def get_data():
return {
'event_time': datetime.datetime.now().isoformat(),
'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
'price': round(random.random() * 100, 2)}
def generate(stream_name, kinesis_client):
while True:
data = get_data()
print(data)
kinesis_client.put_record(
DeliveryStreamName=stream_name,
Record={
"Data": json.dumps(data)
}
)
def main(argv):
stream_name = ''
region_name = 'us-east-1'
opts, args = getopt.getopt(argv, "s:r:")
for opt, arg in opts:
if opt == '-s':
stream_name = arg
elif opt == '-r':
region_name = arg
if stream_name == '':
print("required argument -s <firehose delivery stream name>")
exit()
print(f"sending to Kinesis Firehose Deliver Stream {stream_name} in Region {region_name} \n")
generate(stream_name, boto3.client('firehose', region_name=region_name))
if __name__ == '__main__':
main(sys.argv[1:])
运行脚本:
python firehose-generator.py -s firehose-serverless-delivery -r us-east-1
脚本会随时生成股票代码及脚本价格,并写入到firehose:
等待1-2min。
进入OpensSearch Dashboard,进入Dev Tools
:
查询文档:
GET firehose-ingest/_search
{
"query": {
"match_all": {}
}
}
我们发现Firehose的数据已经成功写入到Opensearch Serverless中: