In the example below we fetch pending items from the stream, we try to read for 20 consecutive times in case no new items are being read we stop
client = boto3.client('dynamodbstreams')
deserializer = boto3.dynamodb.types.TypeDeserializer()
def process_item(item):
obj = {k: deserializer.deserialize(
v) for k, v in item.items()}
print("python data:", obj)
def main():
count = 0
stream = 'arn:aws:dynamodb:*:*:table/posts/stream/*'
response = client.describe_stream(
StreamArn=stream,
)
for shard in response["StreamDescription"]["Shards"]:
iterator = client.get_shard_iterator(
StreamArn=stream,
ShardId=shard['ShardId'],
ShardIteratorType='TRIM_HORIZON',
)
ShardIterator = iterator["ShardIterator"]
while ShardIterator:
items = client.get_records(
ShardIterator=ShardIterator,
)
for item in items["Records"]:
print("seq num:", item["dynamodb"]["SequenceNumber"])
process_item(item["dynamodb"]["NewImage"])
ShardIterator = items["NextShardIterator"]
if len(items["Records"]) == 0:
count = count + 1
else:
count = 0
if(count > 20):
break
client.close()
main()