Cross-Account Kinesis Consumer Using Spring Cloud 3x Stream, Function & AWS KCL

KCL helps you consume and process data from a Kinesis data stream by taking care of many of the complex tasks associated with distributed computing. These include load balancing across multiple consumer application instances, responding to consumer application instance failures, checkpointing processed records, and reacting to resharding. The KCL takes care of all of these subtasks so that you can focus your efforts on writing your custom record-processing logic.The KCL is different from the Kinesis Data Streams APIs that are available in the AWS SDKs. The Kinesis Data Streams APIs help you manage many aspects of Kinesis Data Streams, including creating streams, resharding, and putting and getting records. The KCL provides a layer of abstraction around all these subtasks, specifically so that you can focus on your consumer application’s custom data processing logicReference: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-overview
#kinesis-consumer-role{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KinesisConsumer",
"Effect": "Allow",
"Action": [
"kinesis:Get*",
"kinesis:List*",
"kinesis:Describe*"
],
"Resource": "arn:aws:kinesis::@ProducerAccountID:stream/event-notify"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::@ConsumerAccountID:root"
},
"Action": "sts:AssumeRole",
"Condition": {}
}
]
}
#Note: you can limit to certain user or role of the consumer account rather than giving blanket access. Ex:"Principal": {
"AWS": [
"arn:aws:iam::@ConsumerAccountID:user/user-name-1",
"arn:aws:iam::@ConsumerAccountID:role/roleName2"
]
}
#local-dev-consumer
#cloud-run-consumer
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "KinesisConsumerAdditionalAccess",
"Effect": "Allow",
"Action": [
"dynamodb:*",
"cloudwatch:PutMetricData"
],
"Resource": "*"
},
{
"Sid": "CrossAccountRoleAssumeForStreamConsumerAccess",
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::@ProducerAccountID:role/kinesis-consumer-role"
}
]
}
  • Spring configuration for Spring Cloud 3x Stream with function:
#application.ymlapp:
aws-assume-role-arn: arn:aws:iam::@ProducerAccountID:role/kinesis-consumer-role
spring:
main.allow-bean-definition-overriding: true
cloud:
stream:
kinesis:
binder:
auto-create-stream: false
kpl-kcl-enabled: true #enables Spring to bring in KCL
bindings:
# events inbound
processEvent-in-0:
destination: event-notify
group: Event-Notify-Consumer-Group-1
content-type: application/json
consumer:
concurrency: 5 #defaults to 1, this will process upto 5 messages concurrently
function:
definition: processEvent;
cloud.aws:
stack.auto: false
region:
static: ${AWS_REGION}
auto: false #Region autodetection doesn't work with FARGATE deployments, https://github.com/spring-cloud/spring-cloud-aws/issues/734
#to suppress warnings if deployed using Fargate
logging.level:
com.amazonaws.util.EC2MetadataUtils: error
com.amazonaws.internal.InstanceMetadataServiceResourceFetcher: error
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ __ _ _ _ _ _ _ __ _ _ _ _ _ _ __ _ _ _ #application-LOCAL.yml#for local dev/test SPRING_PROFILES_ACTIVE set to LOCAL
#set aws credentials & config for local-dev-consumer created before
cloud.aws.credentials.profile-name: local-dev-consumer
  • Declare dependencies:
#springCloudVersion=2020.0.1
#springCloudKinesisVersion=2.1.0
dependencies {
//spring-cloud
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation "org.springframework.cloud:spring-cloud-stream-binder-kinesis:${springCloudKinesisVersion}"

//aws sts sdk for role assume
implementation "com.amazonaws:aws-java-sdk-sts:${awsSTSVersion}"
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
  • Override default Spring kinesis beans with proxy AWSCredentialsProvider to support cross-account role assume.
  • Create a java functional consumer for cloud stream:
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
import java.util.function.Consumer;
@Slf4j
@Configuration
public class StreamConfiguration {
@Bean
//for brevity, I am binding with raw JSON payload using Map, you can use the respective POJO class (payload type)
public Consumer<Map<String, Object>> processEvent() {
return eventRawData ->
log.info(
"Event received {}",
eventRawData)
}
}

Humble Request To Readers🙏

Donation😇

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Amith Kumar

Amith Kumar

Lead Full Stack & DevOps Engineer