Kinesis Data Streams¶
Lambda reads records from the data stream and invokes your function synchronously with an event that contains stream records.
Amazon Kinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources.
TIP: Kinesis streams vs firehose
Read AWS Kinesis Data Streams vs Kinesis Data Firehose for when to use Kinesis streams vs Kinesis Data Firehose.
Request¶
Request fields¶
Records - An array of records.
- awsRegion(String)
- AWS region where the event originated eg: us-east-1
- eventID(String)
- A globally unique identifier for the event that was recorded in this stream record.
- eventName(String)
- Event type eg: aws:kinesis:record
- eventSource(String)
- The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis
- eventSourceARN(String)
- The Amazon Resource Name (ARN) of the event source
- eventVersion(String)
- The eventVersion key value contains a major and minor version in the form . . 
- invokeIdentityArn(String)
- The ARN for the identity used to invoke the Lambda function
- kinesis(Object)
- Kinesis payload
- approximateArrivalTimestamp(Number) - The approximate time that the record was inserted into the stream
- data(String) - The data contained in the record
- kinesisSchemaVersion(String) - The version of the Kinesis data record format
- partitionKey(String) - The partition key of the record
- sequenceNumber(String) - The sequence number of the record
Request Example¶
{
  "Records": [
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
        "approximateArrivalTimestamp": 1545084650.987
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    },
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
        "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
        "approximateArrivalTimestamp": 1545084711.166
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    }
  ]
}
Response¶
Resources¶
Typing by language
- PHP - KinesisEvent - Composer bref/bref
- Python - KinesisEvent - Pip aws-lambda-powertools
- Rust - KinesisEvent - Cargo aws-lambda-events
- Java - KinesisEvent - Maven aws-lambda-java-events
- Typescript - KinesisEvent - NPM @types/aws-lambda
- Go - KinesisEvent - github.com/aws/aws-lambda-go/events
Handlers by language
- Python - BatchProcessor - Pip aws-lambda-powertools
- Ruby - kinesis_event - GEM jets
- Python - on_kinesis_record - Pip chalice
Code Examples¶
Batch Processing via AWS Lambda Powertools
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()