DynamoDb Streams

DynamoDb Streams

In the example below we fetch pending items from the stream, we try to read for 20 consecutive times in case no new items are being read we stop

client = boto3.client('dynamodbstreams')
deserializer = boto3.dynamodb.types.TypeDeserializer()


def process_item(item):
    obj = {k: deserializer.deserialize(
        v) for k, v in item.items()}
    print("python data:",  obj)


def main():
    count = 0

    stream = 'arn:aws:dynamodb:*:*:table/posts/stream/*'
    response = client.describe_stream(
        StreamArn=stream,
    )

    for shard in response["StreamDescription"]["Shards"]:
        iterator = client.get_shard_iterator(
            StreamArn=stream,
            ShardId=shard['ShardId'],
            ShardIteratorType='TRIM_HORIZON',
        )

        ShardIterator = iterator["ShardIterator"]
        while ShardIterator:
            items = client.get_records(
                ShardIterator=ShardIterator,
            )
            for item in items["Records"]:
                print("seq num:", item["dynamodb"]["SequenceNumber"])
                process_item(item["dynamodb"]["NewImage"])

            ShardIterator = items["NextShardIterator"]

            if len(items["Records"]) == 0:
                count = count + 1
            else:
                count = 0

            if(count > 20):
                break
    client.close()


main()

Related Posts