Setting up AWS MSK (Managed Streaming for Kafka) with Terraform
Learn how to provision and manage AWS MSK clusters using Terraform, including configuration, monitoring, and security best practices
Setting up AWS MSK (Managed Streaming for Kafka) with Terraform
AWS MSK is a fully managed Apache Kafka service that makes it easy to build and run applications using Apache Kafka. This guide demonstrates how to set up and manage MSK using Terraform.
Video Tutorial
Learn more about managing AWS MSK with Terraform in this comprehensive video tutorial:
Prerequisites
- AWS CLI configured with appropriate permissions
- Terraform installed (version 1.0.0 or later)
- Basic understanding of Apache Kafka
- VPC with private subnets
Project Structure
terraform-msk/
├── main.tf
├── variables.tf
├── outputs.tf
├── modules/
│ └── msk/
│ ├── main.tf
│ ├── variables.tf
│ └── outputs.tf
└── config/
└── server.properties
MSK Configuration
Create modules/msk/main.tf
:
# MSK Cluster
resource "aws_msk_cluster" "main" {
cluster_name = "${var.project_name}-cluster"
kafka_version = var.kafka_version
number_of_broker_nodes = var.number_of_broker_nodes
broker_node_group_info {
instance_type = var.instance_type
client_subnets = var.subnet_ids
security_groups = [aws_security_group.msk.id]
storage_info {
ebs_storage_info {
volume_size = var.volume_size
}
}
}
encryption_info {
encryption_at_rest_kms_key_arn = aws_kms_key.msk.arn
encryption_in_transit {
client_broker = "TLS"
in_cluster = true
}
}
configuration_info {
arn = aws_msk_configuration.main.arn
revision = aws_msk_configuration.main.latest_revision
}
client_authentication {
sasl {
iam = true
}
tls {
certificate_authority_arns = [aws_acmpca_certificate_authority.msk.arn]
}
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.msk.name
}
firehose {
enabled = true
delivery_stream = aws_kinesis_firehose_delivery_stream.msk.name
}
s3 {
enabled = true
bucket = aws_s3_bucket.logs.id
prefix = "msk-logs/"
}
}
}
open_monitoring {
prometheus {
jmx_exporter {
enabled_in_broker = true
}
node_exporter {
enabled_in_broker = true
}
}
}
tags = merge(
var.tags,
{
Name = "${var.project_name}-cluster"
}
)
}
# MSK Configuration
resource "aws_msk_configuration" "main" {
name = "${var.project_name}-config"
kafka_versions = [var.kafka_version]
description = "MSK cluster configuration"
server_properties = <<PROPERTIES
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
PROPERTIES
}
# Security Group
resource "aws_security_group" "msk" {
name = "${var.project_name}-msk"
description = "Security group for MSK cluster"
vpc_id = var.vpc_id
ingress {
description = "Kafka plaintext"
from_port = 9092
to_port = 9092
protocol = "tcp"
cidr_blocks = var.allowed_cidr_blocks
}
ingress {
description = "Kafka TLS"
from_port = 9094
to_port = 9094
protocol = "tcp"
cidr_blocks = var.allowed_cidr_blocks
}
ingress {
description = "Kafka SASL"
from_port = 9096
to_port = 9096
protocol = "tcp"
cidr_blocks = var.allowed_cidr_blocks
}
ingress {
description = "ZooKeeper"
from_port = 2181
to_port = 2181
protocol = "tcp"
cidr_blocks = var.allowed_cidr_blocks
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
tags = merge(
var.tags,
{
Name = "${var.project_name}-msk"
}
)
}
# KMS Key
resource "aws_kms_key" "msk" {
description = "KMS key for MSK cluster encryption"
deletion_window_in_days = 7
enable_key_rotation = true
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "Enable IAM User Permissions"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
}
Action = "kms:*"
Resource = "*"
}
]
})
tags = merge(
var.tags,
{
Name = "${var.project_name}-msk"
}
)
}
# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "msk" {
name = "/aws/msk/${var.project_name}"
retention_in_days = 30
tags = merge(
var.tags,
{
Name = "${var.project_name}-logs"
}
)
}
# S3 Bucket for Logs
resource "aws_s3_bucket" "logs" {
bucket = "${var.project_name}-msk-logs"
tags = merge(
var.tags,
{
Name = "${var.project_name}-logs"
}
)
}
resource "aws_s3_bucket_versioning" "logs" {
bucket = aws_s3_bucket.logs.id
versioning_configuration {
status = "Enabled"
}
}
# Kinesis Firehose
resource "aws_kinesis_firehose_delivery_stream" "msk" {
name = "${var.project_name}-msk-logs"
destination = "s3"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.logs.arn
prefix = "msk-logs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
}
tags = merge(
var.tags,
{
Name = "${var.project_name}-msk-logs"
}
)
}
Monitoring and Alerts
- CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "cpu_utilization" {
alarm_name = "${var.project_name}-cpu-utilization"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = "2"
metric_name = "CpuUser"
namespace = "AWS/Kafka"
period = "300"
statistic = "Average"
threshold = "75"
alarm_description = "CPU utilization is too high"
alarm_actions = [aws_sns_topic.alerts.arn]
dimensions = {
Cluster = aws_msk_cluster.main.cluster_name
}
}
resource "aws_cloudwatch_metric_alarm" "disk_space" {
alarm_name = "${var.project_name}-disk-space"
comparison_operator = "LessThanThreshold"
evaluation_periods = "2"
metric_name = "KafkaDataLogsDiskFree"
namespace = "AWS/Kafka"
period = "300"
statistic = "Average"
threshold = "20"
alarm_description = "Disk space is running low"
alarm_actions = [aws_sns_topic.alerts.arn]
dimensions = {
Cluster = aws_msk_cluster.main.cluster_name
}
}
Authentication and Authorization
- IAM Authentication
resource "aws_msk_scram_secret_association" "main" {
cluster_arn = aws_msk_cluster.main.arn
secret_arn_list = [aws_secretsmanager_secret.msk.arn]
}
resource "aws_secretsmanager_secret" "msk" {
name = "${var.project_name}-msk-secret"
tags = merge(
var.tags,
{
Name = "${var.project_name}-secret"
}
)
}
resource "aws_secretsmanager_secret_version" "msk" {
secret_id = aws_secretsmanager_secret.msk.id
secret_string = jsonencode({
username = "kafka"
password = var.kafka_password
})
}
- ACL Configuration
resource "aws_msk_configuration" "acls" {
name = "${var.project_name}-acls"
kafka_versions = [var.kafka_version]
server_properties = <<PROPERTIES
allow.everyone.if.no.acl.found=false
super.users=User:CN=admin
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
PROPERTIES
}
Schema Registry
- Glue Schema Registry
resource "aws_glue_registry" "main" {
registry_name = "${var.project_name}-registry"
tags = merge(
var.tags,
{
Name = "${var.project_name}-registry"
}
)
}
resource "aws_glue_schema" "main" {
schema_name = "example-schema"
registry_arn = aws_glue_registry.main.arn
data_format = "AVRO"
compatibility = "BACKWARD"
schema_definition = <<EOF
{
"type": "record",
"name": "ExampleRecord",
"namespace": "com.example",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "timestamp",
"type": "long"
},
{
"name": "data",
"type": "string"
}
]
}
EOF
}
Integration Examples
- Producer Application
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=aws_msk_cluster.main.bootstrap_brokers_tls,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='kafka',
sasl_plain_password=var.kafka_password,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('example-topic', {'key': 'value'})
producer.flush()
- Consumer Application
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'example-topic',
bootstrap_servers=aws_msk_cluster.main.bootstrap_brokers_tls,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='kafka',
sasl_plain_password=var.kafka_password,
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
group_id='example-group'
)
for message in consumer:
print(message.value)
Best Practices
-
Performance Optimization
- Choose appropriate instance type
- Configure proper partitioning
- Monitor throughput
- Tune broker settings
-
High Availability
- Use multi-AZ deployment
- Configure replication factor
- Enable auto recovery
- Regular monitoring
-
Security
- Enable encryption
- Implement authentication
- Configure ACLs
- Regular updates
-
Cost Optimization
- Use appropriate broker types
- Monitor cluster usage
Conclusion
You’ve learned how to set up and manage AWS MSK using Terraform. This setup provides:
- Kafka cluster management
- Security and encryption
- Monitoring and logging
- Schema management
Remember to:
- Monitor cluster health
- Implement security best practices
- Optimize performance
- Maintain backups