Blog
Data Streams with AWS Kinesis
- September 10, 2022
- Posted by: techjediadmin
- Category: AWS Cloud Applications Data Streams Kinesis
When talking about messaging and asynchronous data processing, it is very common you hear about the jargon “data stream”. Let us start with the basic question.
What is a data stream?
The data stream is nothing but data that is continuously generated by thousands of data sources, which typically send data records simultaneously for processing. The data stream includes a wide variety of data such as log files generated in mobile or web applications, e-commerce purchases, in-game player activity, information from social networks, financial trading, and so on.
This data needs to be processed sequentially and incrementally on a record-by-record basis or over sliding time windows. This is used for a wide variety of analytics including correlations, aggregations, filtering, and sampling. Let us discuss the related AWS offering ‘Kinesis’.
What is AWS Kinesis?
Per AWS documentation, “Amazon Kinesis Data Streams is a scalable and durable real-time data streaming service that can continuously capture gigabytes of data per second from hundreds of thousands of sources.”
You can continuously add various types of data such as clickstreams, application logs, and social media to an Amazon Kinesis data stream from hundreds of thousands of sources. Within seconds, the data will be available for your Amazon Kinesis Applications to read and process from the stream.
How does it work?
There will be producers generating huge data, Kinesis acts as an intermediate layer that lets consumers consume this data. Kinesis also has a concept of shard. You can think of it as a “box for data”, which can work independently. This is mainly for scaling purposes. To process more data, you need to configure more shards.
Coding time:
Producer:
Apart from the stream of standard data (click-stream logs, database events, etc), we can have a custom producer component. A sample python snippet:
import boto3
STREAM_NAME = “MyStream”
client = boto3.client(“kinesis”, region_name=”eu-west-1")
def publish_message(partition_key, data):
data = bytes(data.encode(“utf-8”))
response = client.put_record(StreamName=STREAM_NAME, Data=data, PartitionKey=partition_key)
if __name__ == ‘__main__’:
partition_key = “TEST” # This will help in choosing which shard the data should go.
print(publish_message(partition_key, “Hello world”))
Consumer:
Next, we will see how to consume messages from shard/s in Kinesis.
import boto3 STREAM_NAME = “MyFirstStream” client = boto3.client(“kinesis”, region_name=”eu-west-1") def consume(): # Get the shard_id from the default shard — 0th index stream = client.describe_stream(StreamName=STREAM_NAME) shard_id = stream[“StreamDescription”][“Shards”][0][“ShardId”] iterator = client.get_shard_iterator( StreamName=STREAM_NAME, ShardId=shard_id, ShardIteratorType=”LATEST” )[“ShardIterator”] # Read data from the default shard response = client.get_records(ShardIterator=iterator, Limit=1) while “NextShardIterator” in response: time.sleep(1) data = response[“Records”] if len(data) < 1: print(“No data received”) else: data = data[0][“Data”] print(f”Received {data=}”) # Read the next set of records response = client.get_records(ShardIterator=response[“NextShardIterator”], Limit=1) if __name__ == ‘__main__’: consume()
Read Similar Blogs: