Setting up AWS MSK (Managed Streaming for Kafka) with Terraform

Learn how to provision and manage AWS MSK clusters using Terraform, including configuration, monitoring, and security best practices

Setting up AWS MSK (Managed Streaming for Kafka) with Terraform

AWS MSK is a fully managed Apache Kafka service that makes it easy to build and run applications using Apache Kafka. This guide demonstrates how to set up and manage MSK using Terraform.

Video Tutorial

Learn more about managing AWS MSK with Terraform in this comprehensive video tutorial:

Prerequisites

  • AWS CLI configured with appropriate permissions
  • Terraform installed (version 1.0.0 or later)
  • Basic understanding of Apache Kafka
  • VPC with private subnets

Project Structure

terraform-msk/
├── main.tf
├── variables.tf
├── outputs.tf
├── modules/
│   └── msk/
│       ├── main.tf
│       ├── variables.tf
│       └── outputs.tf
└── config/
    └── server.properties

MSK Configuration

Create modules/msk/main.tf:

# MSK Cluster
resource "aws_msk_cluster" "main" {
  cluster_name           = "${var.project_name}-cluster"
  kafka_version          = var.kafka_version
  number_of_broker_nodes = var.number_of_broker_nodes

  broker_node_group_info {
    instance_type   = var.instance_type
    client_subnets  = var.subnet_ids
    security_groups = [aws_security_group.msk.id]

    storage_info {
      ebs_storage_info {
        volume_size = var.volume_size
      }
    }
  }

  encryption_info {
    encryption_at_rest_kms_key_arn = aws_kms_key.msk.arn
    encryption_in_transit {
      client_broker = "TLS"
      in_cluster    = true
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.main.arn
    revision = aws_msk_configuration.main.latest_revision
  }

  client_authentication {
    sasl {
      iam = true
    }
    tls {
      certificate_authority_arns = [aws_acmpca_certificate_authority.msk.arn]
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.msk.name
      }
      firehose {
        enabled         = true
        delivery_stream = aws_kinesis_firehose_delivery_stream.msk.name
      }
      s3 {
        enabled = true
        bucket  = aws_s3_bucket.logs.id
        prefix  = "msk-logs/"
      }
    }
  }

  open_monitoring {
    prometheus {
      jmx_exporter {
        enabled_in_broker = true
      }
      node_exporter {
        enabled_in_broker = true
      }
    }
  }

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-cluster"
    }
  )
}

# MSK Configuration
resource "aws_msk_configuration" "main" {
  name              = "${var.project_name}-config"
  kafka_versions    = [var.kafka_version]
  description       = "MSK cluster configuration"

  server_properties = <<PROPERTIES
auto.create.topics.enable=true
default.replication.factor=3
min.insync.replicas=2
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000
PROPERTIES
}

# Security Group
resource "aws_security_group" "msk" {
  name        = "${var.project_name}-msk"
  description = "Security group for MSK cluster"
  vpc_id      = var.vpc_id

  ingress {
    description = "Kafka plaintext"
    from_port   = 9092
    to_port     = 9092
    protocol    = "tcp"
    cidr_blocks = var.allowed_cidr_blocks
  }

  ingress {
    description = "Kafka TLS"
    from_port   = 9094
    to_port     = 9094
    protocol    = "tcp"
    cidr_blocks = var.allowed_cidr_blocks
  }

  ingress {
    description = "Kafka SASL"
    from_port   = 9096
    to_port     = 9096
    protocol    = "tcp"
    cidr_blocks = var.allowed_cidr_blocks
  }

  ingress {
    description = "ZooKeeper"
    from_port   = 2181
    to_port     = 2181
    protocol    = "tcp"
    cidr_blocks = var.allowed_cidr_blocks
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-msk"
    }
  )
}

# KMS Key
resource "aws_kms_key" "msk" {
  description             = "KMS key for MSK cluster encryption"
  deletion_window_in_days = 7
  enable_key_rotation     = true

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "Enable IAM User Permissions"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
        }
        Action   = "kms:*"
        Resource = "*"
      }
    ]
  })

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-msk"
    }
  )
}

# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "msk" {
  name              = "/aws/msk/${var.project_name}"
  retention_in_days = 30

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-logs"
    }
  )
}

# S3 Bucket for Logs
resource "aws_s3_bucket" "logs" {
  bucket = "${var.project_name}-msk-logs"

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-logs"
    }
  )
}

resource "aws_s3_bucket_versioning" "logs" {
  bucket = aws_s3_bucket.logs.id
  versioning_configuration {
    status = "Enabled"
  }
}

# Kinesis Firehose
resource "aws_kinesis_firehose_delivery_stream" "msk" {
  name        = "${var.project_name}-msk-logs"
  destination = "s3"

  s3_configuration {
    role_arn   = aws_iam_role.firehose.arn
    bucket_arn = aws_s3_bucket.logs.arn
    prefix     = "msk-logs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
  }

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-msk-logs"
    }
  )
}

Monitoring and Alerts

  1. CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "cpu_utilization" {
  alarm_name          = "${var.project_name}-cpu-utilization"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = "2"
  metric_name        = "CpuUser"
  namespace          = "AWS/Kafka"
  period             = "300"
  statistic          = "Average"
  threshold          = "75"
  alarm_description  = "CPU utilization is too high"
  alarm_actions      = [aws_sns_topic.alerts.arn]

  dimensions = {
    Cluster = aws_msk_cluster.main.cluster_name
  }
}

resource "aws_cloudwatch_metric_alarm" "disk_space" {
  alarm_name          = "${var.project_name}-disk-space"
  comparison_operator = "LessThanThreshold"
  evaluation_periods  = "2"
  metric_name        = "KafkaDataLogsDiskFree"
  namespace          = "AWS/Kafka"
  period             = "300"
  statistic          = "Average"
  threshold          = "20"
  alarm_description  = "Disk space is running low"
  alarm_actions      = [aws_sns_topic.alerts.arn]

  dimensions = {
    Cluster = aws_msk_cluster.main.cluster_name
  }
}

Authentication and Authorization

  1. IAM Authentication
resource "aws_msk_scram_secret_association" "main" {
  cluster_arn     = aws_msk_cluster.main.arn
  secret_arn_list = [aws_secretsmanager_secret.msk.arn]
}

resource "aws_secretsmanager_secret" "msk" {
  name = "${var.project_name}-msk-secret"

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-secret"
    }
  )
}

resource "aws_secretsmanager_secret_version" "msk" {
  secret_id     = aws_secretsmanager_secret.msk.id
  secret_string = jsonencode({
    username = "kafka"
    password = var.kafka_password
  })
}
  1. ACL Configuration
resource "aws_msk_configuration" "acls" {
  name           = "${var.project_name}-acls"
  kafka_versions = [var.kafka_version]

  server_properties = <<PROPERTIES
allow.everyone.if.no.acl.found=false
super.users=User:CN=admin
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
PROPERTIES
}

Schema Registry

  1. Glue Schema Registry
resource "aws_glue_registry" "main" {
  registry_name = "${var.project_name}-registry"

  tags = merge(
    var.tags,
    {
      Name = "${var.project_name}-registry"
    }
  )
}

resource "aws_glue_schema" "main" {
  schema_name       = "example-schema"
  registry_arn      = aws_glue_registry.main.arn
  data_format       = "AVRO"
  compatibility     = "BACKWARD"
  schema_definition = <<EOF
{
  "type": "record",
  "name": "ExampleRecord",
  "namespace": "com.example",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "timestamp",
      "type": "long"
    },
    {
      "name": "data",
      "type": "string"
    }
  ]
}
EOF
}

Integration Examples

  1. Producer Application
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=aws_msk_cluster.main.bootstrap_brokers_tls,
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='kafka',
    sasl_plain_password=var.kafka_password,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send('example-topic', {'key': 'value'})
producer.flush()
  1. Consumer Application
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'example-topic',
    bootstrap_servers=aws_msk_cluster.main.bootstrap_brokers_tls,
    security_protocol='SASL_SSL',
    sasl_mechanism='SCRAM-SHA-512',
    sasl_plain_username='kafka',
    sasl_plain_password=var.kafka_password,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    group_id='example-group'
)

for message in consumer:
    print(message.value)

Best Practices

  1. Performance Optimization

    • Choose appropriate instance type
    • Configure proper partitioning
    • Monitor throughput
    • Tune broker settings
  2. High Availability

    • Use multi-AZ deployment
    • Configure replication factor
    • Enable auto recovery
    • Regular monitoring
  3. Security

    • Enable encryption
    • Implement authentication
    • Configure ACLs
    • Regular updates
  4. Cost Optimization

    • Use appropriate broker types
    • Monitor cluster usage

Conclusion

You’ve learned how to set up and manage AWS MSK using Terraform. This setup provides:

  • Kafka cluster management
  • Security and encryption
  • Monitoring and logging
  • Schema management

Remember to:

  • Monitor cluster health
  • Implement security best practices
  • Optimize performance
  • Maintain backups