Kinesis Data Streams¶
Lambda reads records from the data stream and invokes your function synchronously with an event that contains stream records.
Amazon Kinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources.
TIP: Kinesis streams vs firehose
Read AWS Kinesis Data Streams vs Kinesis Data Firehose for when to use Kinesis streams vs Kinesis Data Firehose.
Request¶
Request fields¶
Records
- An array of records.
awsRegion
(String)- AWS region where the event originated eg: us-east-1
eventID
(String)- A globally unique identifier for the event that was recorded in this stream record.
eventName
(String)- Event type eg: aws:kinesis:record
eventSource
(String)- The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis
eventSourceARN
(String)- The Amazon Resource Name (ARN) of the event source
eventVersion
(String)- The eventVersion key value contains a major and minor version in the form
. . invokeIdentityArn
(String)- The ARN for the identity used to invoke the Lambda function
kinesis
(Object)- Kinesis payload
approximateArrivalTimestamp
(Number) - The approximate time that the record was inserted into the streamdata
(String) - The data contained in the recordkinesisSchemaVersion
(String) - The version of the Kinesis data record formatpartitionKey
(String) - The partition key of the recordsequenceNumber
(String) - The sequence number of the record
Request Example¶
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1545084650.987
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
},
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
"data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
"approximateArrivalTimestamp": 1545084711.166
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
}
Response¶
Resources¶
Typing by language
- PHP - KinesisEvent - Composer
bref/bref
- Python - KinesisEvent - Pip
aws-lambda-powertools
- Rust - KinesisEvent - Cargo
aws-lambda-events
- Java - KinesisEvent - Maven
aws-lambda-java-events
- Typescript - KinesisEvent - NPM
@types/aws-lambda
- Go - KinesisEvent -
github.com/aws/aws-lambda-go/events
Handlers by language
- Python - BatchProcessor - Pip
aws-lambda-powertools
- Ruby - kinesis_event - GEM
jets
- Python - on_kinesis_record - Pip
chalice
Code Examples¶
Batch Processing via AWS Lambda Powertools
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
logger.info(record.kinesis.data_as_text)
payload: dict = record.kinesis.data_as_json()
...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()