Skip to content

Kinesis Data Streams

Lambda reads records from the data stream and invokes your function synchronously with an event that contains stream records.

Amazon Kinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources.

TIP: Kinesis streams vs firehose

Read AWS Kinesis Data Streams vs Kinesis Data Firehose for when to use Kinesis streams vs Kinesis Data Firehose.

Request

Request fields

Records - An array of records.

awsRegion (String)
AWS region where the event originated eg: us-east-1
eventID (String)
A globally unique identifier for the event that was recorded in this stream record.
eventName (String)
Event type eg: aws:kinesis:record
eventSource (String)
The AWS service from which the Kinesis event originated. For Kinesis, this is aws:kinesis
eventSourceARN (String)
The Amazon Resource Name (ARN) of the event source
eventVersion (String)
The eventVersion key value contains a major and minor version in the form ..
invokeIdentityArn (String)
The ARN for the identity used to invoke the Lambda function
kinesis (Object)
Kinesis payload
  • approximateArrivalTimestamp (Number) - The approximate time that the record was inserted into the stream
  • data (String) - The data contained in the record
  • kinesisSchemaVersion (String) - The version of the Kinesis data record format
  • partitionKey (String) - The partition key of the record
  • sequenceNumber (String) - The sequence number of the record

Request Example

{
  "Records": [
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
        "approximateArrivalTimestamp": 1545084650.987
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    },
    {
      "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
        "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
        "approximateArrivalTimestamp": 1545084711.166
      },
      "eventSource": "aws:kinesis",
      "eventVersion": "1.0",
      "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
      "eventName": "aws:kinesis:record",
      "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
      "awsRegion": "us-east-2",
      "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
    }
  ]
}

Response

Reporting batch item failures
{
   "batchItemFailures":[
      {
         "itemIdentifier":"<id>"
      }
   ]
}

Resources

Typing by language

Handlers by language

Code Examples

Batch Processing via AWS Lambda Powertools
import json

from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
from aws_lambda_powertools.utilities.typing import LambdaContext


processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
tracer = Tracer()
logger = Logger()


@tracer.capture_method
def record_handler(record: KinesisStreamRecord):
    logger.info(record.kinesis.data_as_text)
    payload: dict = record.kinesis.data_as_json()
    ...

@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
    return processor.response()

Documentation


Last update: 2022-09-13