Run Prefect on ECS Part 1

Complexity:4 stars rating
Time: 1-2 hours
guide orchestration

Introduction

Orchestrating workloads is a common challenge in data engineering. Prefect is an excellent tool that simplifies this task with its robust orchestration capabilities. In this guide, I'll walk you through deploying Prefect on AWS ECS and integrating it with Prefect Cloud for front-end management. We’ll also explore using GitHub Actions for CI/CD and Terraform for infrastructure as code.

Architecture

At a high level, our architecture will enable the deployment of new tasks on ECS using Prefect. Workflows are defined using Prefect flows, committed to GitHub, and deployed using GitHub Actions. Prefect Cloud will serve as the user interface for monitoring and management, while AWS ECS provides the compute environment to run the tasks.

Project Structure

Our project comprises three main components:

  1. CI/CD Pipeline: Implemented with GitHub Actions.
  2. Infrastructure: Managed on AWS via Terraform.
  3. Prefect Flows: The actual data processing workflows.

The project directory looks like this:

├── .github
│   ├── deploy.yml
├── Dockerfile
├── Dockerfile_agent
├── infra
│   ├── main.tf
│   ├── provider.tf
│   └── variables.tf
├── pyproject.toml
└── src
    ├── create_deployment.py
    └── flows
        └── prefect_flow.py

CI/CD with Github actions

Lets start with describing deploy.yml file:


name: Deploy to AWS

on:
  push:
    branches:
      - main

jobs:
  terraform-apply:
    name: Apply Terraform
    runs-on: ubuntu-latest

    concurrency:
      group: ${{ github.ref }}
      cancel-in-progress: true

    defaults:
      run:
        working-directory: infra
    
    env:
      AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
      AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
      TF_VAR_prefect_account_id: ${{ vars.PREFECT_ACCOUNT_ID }}
      TF_VAR_prefect_api_key: ${{ secrets.PREFECT_API_KEY }}
      TF_VAR_prefect_workspace_id: ${{ vars.PREFECT_WORKSPACE_ID }}
      TF_VAR_prefect_work_pool_name: ${{ vars.PREFECT_WORK_POOL_NAME }}
      TF_VAR_ecr_repository_name: ${{ vars.AWS_ECR_REPOSITORY_NAME }}
      TF_VAR_account_number: ${{ vars.AWS_ACCOUNT_NUMBER }}
      TF_VAR_region: ${{ vars.AWS_REGION }}
      TF_VAR_docker_hub_username: ${{ vars.DOCKER_HUB_USERNAME }}
      TF_VAR_docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}

    steps:
      - name: Checkout code
        uses: actions/checkout@v2

      - name: Set up Terraform
        uses: hashicorp/setup-terraform@v1
        with:
            terraform_version: 1.9.0

      - name: Install Prefect CLI
        run: pip install prefect

      - name: Login to Prefect Cloud
        run: prefect cloud login --key ${{ secrets.PREFECT_API_KEY }} --workspace ${{ vars.PREFECT_WORKSPACE_NAME }}

      - name: Terraform Init
        run: terraform init

      - name: Terraform Apply
        run: terraform apply -auto-approve

  build-prefect-docker:
    name: Build and Push Docker Image
    runs-on: ubuntu-latest
    needs: terraform-apply

    env:
      AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
      AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

    steps:
      - name: Checkout code
        uses: actions/checkout@v2

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v1

      - name: Log in to Amazon ECR
        run: |
          aws ecr get-login-password --region ${{ vars.AWS_REGION }} | docker login --username AWS --password-stdin ${{ vars.AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ vars.AWS_REGION }}.amazonaws.com

      - name: Build Docker Image
        run: |
          docker build -t prefect-task-image:latest .

      - name: Tag Docker Image
        run: |
          docker tag prefect-task-image:latest ${{ vars.AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ vars.AWS_REGION }}.amazonaws.com/${{ vars.AWS_ECR_REPOSITORY_NAME }}:latest

      - name: Push Docker Image to ECR
        run: |
          docker push ${{ vars.AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ vars.AWS_REGION }}.amazonaws.com/${{ vars.AWS_ECR_REPOSITORY_NAME }}:latest

  deploy-prefect-flow:
    name: Deploy Prefect Flow
    runs-on: ubuntu-latest
    needs: build-prefect-docker

    concurrency:
      group: ${{ github.ref }}
      cancel-in-progress: true

    steps:
      - name: Checkout code
        uses: actions/checkout@v2

      - name: Install Prefect CLI
        run: pip install prefect

      - name: Login to Prefect Cloud
        run: prefect cloud login --key ${{ secrets.PREFECT_API_KEY }} --workspace ${{ vars.PREFECT_WORKSPACE_NAME }}

      - name: Deploy Prefect Flow
        working-directory: src
        run: python create_deployment.py --image=${{ vars.AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ vars.AWS_REGION }}.amazonaws.com/${{ vars.AWS_ECR_REPOSITORY_NAME }}:latest --name="ai-consultant-ecs-flow-deployment" --work_pool_name=${{ vars.PREFECT_WORK_POOL_NAME }}

We have three main steps here "terraform-apply" to deploy AWS resources, "build-prefect-docker" and "deploy-prefect-flow". In github repository we would need respectively create all necessary secrets and variables.

Infrastructure on AWS

In this section, we will use Terraform to provision the necessary AWS infrastructure. The following resources will be created:

  • A VPC with both public and private subnets.
  • An ECS cluster to run Prefect workloads.
  • ECR repositories to store Docker images for the Prefect flows and agents.
  • IAM roles and policies to provide secure access to ECS, ECR, and Secrets Manager.
  • VPC endpoints for securely accessing AWS services like Secrets Manager and ECR.

Below is the Terraform code to configure the VPC, ECS, and ECR:


##############################################################################
# VPC
##############################################################################

resource "aws_vpc" "cds" {
  cidr_block = "10.0.0.0/16"

  enable_dns_support   = true
  enable_dns_hostnames = true
}

resource "aws_subnet" "private" {
  vpc_id                  = aws_vpc.cds.id
  cidr_block              = "10.0.1.0/24"
  availability_zone       = "${var.region}a"
  map_public_ip_on_launch = false
}

resource "aws_subnet" "public" {
  vpc_id                  = aws_vpc.cds.id
  cidr_block              = "10.0.2.0/24"
  availability_zone       = "${var.region}a"
  map_public_ip_on_launch = true
}

resource "aws_internet_gateway" "cds" {
  vpc_id = aws_vpc.cds.id
}

resource "aws_eip" "cds" {
}

resource "aws_route_table" "public" {
  vpc_id = aws_vpc.cds.id

  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = aws_internet_gateway.cds.id
  }
}

resource "aws_route_table_association" "prefect" {
  subnet_id      = aws_subnet.public.id
  route_table_id = aws_route_table.public.id
}

resource "aws_nat_gateway" "cds" {
  allocation_id = aws_eip.cds.id
  subnet_id     = aws_subnet.public.id
}

resource "aws_route_table" "private" {
  vpc_id = aws_vpc.cds.id

  route {
    cidr_block = aws_vpc.cds.cidr_block
    gateway_id = "local"
  }

  route {
    cidr_block     = "0.0.0.0/0"
    nat_gateway_id = aws_nat_gateway.cds.id
  }
}

resource "aws_route_table_association" "private" {
  subnet_id      = aws_subnet.private.id
  route_table_id = aws_route_table.private.id
}

resource "aws_security_group" "cds" {
  name        = "ecs-security-group"
  description = "Allow traffic from ECS workers"
  vpc_id      = aws_vpc.cds.id

  ingress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["10.0.0.0/16"]
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
}

resource "aws_vpc_endpoint" "secrets_manager" {
  vpc_id             = aws_vpc.cds.id
  service_name       = "com.amazonaws.${var.region}.secretsmanager"
  vpc_endpoint_type  = "Interface"
  subnet_ids         = [aws_subnet.private.id]
  security_group_ids = [aws_security_group.cds.id]
}

resource "aws_vpc_endpoint" "ecr_dkr" {
  vpc_id             = aws_vpc.cds.id
  service_name       = "com.amazonaws.${var.region}.ecr.dkr"
  vpc_endpoint_type  = "Interface"
  subnet_ids         = [aws_subnet.private.id]
  security_group_ids = [aws_security_group.cds.id]
}

resource "aws_vpc_endpoint" "ecr_api" {
  vpc_id             = aws_vpc.cds.id
  service_name       = "com.amazonaws.${var.region}.ecr.api"
  vpc_endpoint_type  = "Interface"
  subnet_ids         = [aws_subnet.private.id]
  security_group_ids = [aws_security_group.cds.id]
}

##############################################################################
# ECR and ECS
##############################################################################

resource "aws_ecr_repository" "prefect_agent" {
  name = "${var.ecr_repository_name}-agent"
}

resource "aws_ecr_lifecycle_policy" "prefect_agent" {
  repository = aws_ecr_repository.prefect_agent.name

  policy = jsonencode({
    rules = [
      {
        rulePriority = 1
        description  = "Limit number of images to 2"
        selection = {
          tagStatus   = "any"
          countType   = "imageCountMoreThan"
          countNumber = 2
        }
        action = {
          type = "expire"
        }
      }
    ]
  })
}

resource "aws_ecr_repository" "prefect" {
  name = var.ecr_repository_name
}

resource "aws_ecr_lifecycle_policy" "prefect" {
  repository = aws_ecr_repository.prefect.name

  policy = jsonencode({
    rules = [
      {
        rulePriority = 1
        description  = "Limit number of images to 2"
        selection = {
          tagStatus   = "any"
          countType   = "imageCountMoreThan"
          countNumber = 2
        }
        action = {
          type = "expire"
        }
      }
    ]
  })
}

resource "aws_ecs_cluster" "prefect_cluster" {
  name = "prefect-cluster"
}

resource "aws_ecs_task_definition" "prefect_work_pool_task" {
  family = "prefect-work-pool-agent-task"
  container_definitions = jsonencode([
    {
      name      = "prefect-container",
      image     = "${var.account_number}.dkr.ecr.${var.region}.amazonaws.com/${var.ecr_repository_name}:latest",
      cpu       = 256,
      memory    = 512,
      essential = true,
    }
  ])
  cpu                      = "256"
  memory                   = "512"
  requires_compatibilities = ["FARGATE"]
  network_mode             = "awsvpc"
  execution_role_arn       = "arn:aws:iam::${var.account_number}:role/ecs_task_execution_role"
  # TODO task_role_arn, where task itself should have access to
}

resource "aws_iam_role" "prefect_work_pool_task" {
  name = "prefect-work-pool-task"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "ecs-tasks.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_policy" "prefect_work_pool_task" {
  name        = "prefect-work-pool-task"
  description = "Policy for ECS tasks to pull images from ECR and manage tasks"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "ecr:GetAuthorizationToken",
          "ecr:BatchCheckLayerAvailability",
          "ecr:GetDownloadUrlForLayer",
          "ecr:BatchGetImage",
          "ecs:RunTask",
          "ecs:DescribeTasks",
          "ecs:StopTask"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "prefect_work_pool_task" {
  policy_arn = aws_iam_policy.prefect_work_pool_task.arn
  role       = aws_iam_role.prefect_work_pool_task.name
}


resource "aws_iam_role" "prefect_work_pool_execution" {
  name = "prefect-work-pool-execution"
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "ecs-tasks.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_policy" "prefect_work_pool_execution_secret_policy" {
  name        = "prefect-work-pool-execution-secret-policy"
  description = "Policy to allow ECS tasks to access the secret"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue",
          "secretsmanager:DescribeSecret"
        ]
        Resource = [
          aws_secretsmanager_secret.prefect_api_key.arn
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "attach_prefect_task_secret_policy" {
  policy_arn = aws_iam_policy.prefect_work_pool_execution_secret_policy.arn
  role       = aws_iam_role.prefect_work_pool_execution.name
}

resource "aws_iam_role_policy_attachment" "attach_ecs_task_execution_policy" {
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy"
  role       = aws_iam_role.prefect_work_pool_execution.name
}

resource "aws_cloudwatch_log_group" "prefect_work_pool_agent_logs" {
  name              = "/ecs/prefect-work-pool-agent"
  retention_in_days = 7
}

resource "local_file" "work_pool_template" {
  content = jsonencode({
    "job_configuration" : {
      "cluster" : "${aws_ecs_cluster.prefect_cluster.id}",
      "task_definition" : "${aws_ecs_task_definition.prefect_work_pool_task.arn}",
      "subnets" : ["${aws_subnet.private.id}"],
      "security_groups" : ["${aws_security_group.cds.id}"],
      "launch_type" : "FARGATE"
    },
    "variables" : {
      "properties" : {
        "image" : {
          "anyOf" : [
            {
              "type" : "string"
            },
            {
              "type" : "null"
            }
          ],
          "default" : "${var.account_number}.dkr.ecr.${var.region}.amazonaws.com/${var.ecr_repository_name}:latest",
          "description" : "The image to use for the Prefect container in the task. If this value is not null, it will override the value in the task definition. This value defaults to a Prefect base image matching your local versions.",
          "title" : "Image"
        }
      }
    }
  })
  filename = abspath("${path.module}/work_pool_template.json")
}

resource "null_resource" "create_prefect_work_pool" {
  provisioner "local-exec" {
    command = "prefect work-pool create clouddatastack-ecs-worker-pool --type ecs --base-job-template ${local_file.work_pool_template.filename}"
  }
  depends_on = [local_file.work_pool_template]
}

resource "aws_ecs_task_definition" "prefect_work_pool_agent" {
  family = "prefect-work-pool-agent-task"
  container_definitions = jsonencode([
    {
      name      = "prefect-container",
      image     = "${var.account_number}.dkr.ecr.${var.region}.amazonaws.com/${var.ecr_repository_name}-agent:latest",
      cpu       = 256,
      memory    = 512,
      essential = true,
      command = [
        "sh", "-c", "prefect worker start --pool ${var.prefect_work_pool_name}"
      ]
      environment = [
        {
          name  = "PREFECT_API_KEY",
          value = aws_secretsmanager_secret_version.prefect_api_key.secret_string
        },
        {
          name  = "PREFECT_API_URL",
          value = "https://api.prefect.cloud/api/accounts/${var.prefect_account_id}/workspaces/${var.prefect_workspace_id}"
        }
      ],
      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = aws_cloudwatch_log_group.prefect_work_pool_agent_logs.name
          "awslogs-region"        = var.region
          "awslogs-stream-prefix" = "ecs"
        }
      }
    }
  ])
  cpu                      = "256"
  memory                   = "512"
  requires_compatibilities = ["FARGATE"]
  network_mode             = "awsvpc"
  execution_role_arn       = aws_iam_role.prefect_work_pool_execution.arn
  task_role_arn            = aws_iam_role.prefect_work_pool_task.arn

  depends_on = [aws_cloudwatch_log_group.prefect_work_pool_agent_logs, null_resource.create_prefect_work_pool]
}

resource "aws_secretsmanager_secret" "prefect_api_key" {
  name                    = "prefect-api-key"
  recovery_window_in_days = 0 # TODO not for prod
}

resource "aws_secretsmanager_secret_version" "prefect_api_key" {
  secret_id     = aws_secretsmanager_secret.prefect_api_key.id
  secret_string = var.prefect_api_key
}

resource "aws_ecs_service" "prefect_service" {
  name            = "prefect-service"
  cluster         = aws_ecs_cluster.prefect_cluster.id
  task_definition = aws_ecs_task_definition.prefect_work_pool_agent.arn
  desired_count   = 0
  launch_type     = "FARGATE"
  network_configuration {
    subnets          = [aws_subnet.private.id]
    security_groups  = [aws_security_group.cds.id]
    assign_public_ip = false
  }
}

Building and Pushing the Prefect Agent Docker Image

To deploy Prefect agents on ECS (Elastic Container Service), we need to build a Docker image that contains the agent code and push it to AWS ECR (Elastic Container Registry).

Dockerfile for the Prefect Agent

First, create a file named Dockerfile_agent with the following content:


FROM prefecthq/prefect:3-latest

RUN pip install prefect-aws

Build and Push the Docker Image to ECR

Once the Dockerfile_agent is created, follow these steps to build and push the Docker image to ECR:

docker build -f Dockerfile_agent --platform linux/amd64 -t prefect-agent .
docker tag prefect-agent 742491319596.dkr.ecr.eu-central-1.amazonaws.com/prefect-repository-agent:latest  
docker push 742491319596.dkr.ecr.eu-central-1.amazonaws.com/prefect-repository-agent:latest  

Prefect: Creating a Deployment

To deploy Prefect flows on ECS, we will build a Docker image in the second step of our CI/CD pipeline. This Docker image contains the necessary dependencies and the flow code.

The Dockerfile that will be used to build the image is as follows:


# Use a Python base image
FROM python:3.9-slim

# Set the working directory
WORKDIR /app

# Install Poetry
RUN pip install --no-cache-dir poetry

# Copy the Poetry files and install dependencies
COPY pyproject.toml poetry.lock ./
RUN poetry install --no-root --no-dev

# Copy the flow code into the container
COPY src/flows/prefect_flow.py ./

# Set the entry point to run the Prefect flow
CMD ["poetry", "run", "python", "prefect_flow.py"]

Prefect Flows

Prefect flows define the data processing tasks. In this example, we use a basic flow that prints a message to confirm the deployment’s success. You can extend this flow with more complex logic tailored to your needs. The ecs_flow in prefect_flow.py looks like this:

from prefect import flow, task

@task
def say_hello():
    print("Hello, Prefect on ECS!")

@flow
def ecs_flow():
    say_hello()

if __name__ == "__main__":
    ecs_flow()

Creating a Prefect Deployment

The deployment process is handled by the create_deployment.py script, which registers the flow with Prefect Cloud using the Prefect CLI. This ensures that the ECS infrastructure is ready to execute flows at scale (./scr/create_deployment.py):

import argparse
from flows.prefect_flow import ecs_flow

def create_deployment(name, work_pool_name, image, build, push):
    ecs_flow.deploy(
        name,
        work_pool_name=work_pool_name,
        image=image,
        build=build,
        push=push,
    )

if __name__ == "__main__":
    # Parse input parameters
    parser = argparse.ArgumentParser()
    parser.add_argument("--name", required=True, help="Name of the flow")
    parser.add_argument("--work_pool_name", required=True, help="Name of the work pool")
    parser.add_argument("--image", required=False, help="Docker image")
    parser.add_argument("--build", type=bool, default=False, help="Build flag")
    parser.add_argument("--push", type=bool, default=False, help="Push flag")
    args = parser.parse_args()

    # Call create_deployment with parsed arguments
    create_deployment(args.name, args.work_pool_name, args.image, args.build, args.push)

Conclusion

By integrating Prefect with AWS ECS and using Terraform for infrastructure automation, we've created a highly scalable, flexible orchestration environment for data processing workloads. Prefect Cloud provides a powerful interface to monitor and manage these tasks, while GitHub Actions automates the CI/CD pipeline for smooth deployments.

This setup allows for easy scaling and management of workflows in a real-world data engineering or DevOps environment. Whether you're automating ETL pipelines or orchestrating machine learning tasks, Prefect's versatility makes it a valuable tool in your stack.