Manage EventBridge Pipes with the ACK Pipes Controller
Create and manage EventBridge Pipes directly from Kubernetes
Amazon EventBridge Pipes connects sources to targets. It reduces the need for specialized knowledge and integration code when developing event driven architectures, fostering consistency across your company’s applications. To set up a pipe, you choose the source, add optional filtering, define optional enrichment, and choose the target for the event data.
In this tutorial you will learn how to create and manage an EventBridge Pipe to forward messages between two SQS queues from an Amazon Elastic Kubernetes (EKS) deployment.
Setup
Although it is not necessary to use Amazon Elastic Kubernetes Service (Amazon EKS) with ACK, this guide assumes that you
have access to an Amazon EKS cluster. If this is your first time creating an Amazon EKS cluster, see Amazon EKS
Setup.
For automated cluster creation using eksctl
, see Getting started with Amazon EKS -
eksctl
and create your cluster with
Amazon EC2 Linux managed nodes.
Prerequisites
This guide assumes that you have:
- Created an EKS cluster with Kubernetes version 1.24 or higher.
- AWS IAM permissions to create roles and attach policies to roles.
- AWS IAM permissions to manages queues and send messages to a queue.
- Installed the following tools on the client machine used to access your Kubernetes cluster:
Install the ACK service controller for Pipes
Log into the Helm registry that stores the ACK charts:
aws ecr-public get-login-password --region us-east-1 | helm registry login --username AWS --password-stdin public.ecr.aws
Deploy the ACK service controller for Amazon Pipes using the pipes-chart Helm chart. Resources should be created in the us-east-1
region:
helm install --create-namespace -n ack-system oci://public.ecr.aws/aws-controllers-k8s/pipes-chart --version=v1.0.0 --generate-name --set=aws.region=us-east-1
For a full list of available values to the Helm chart, please review the values.yaml file.
Configure IAM permissions
Once the service controller is deployed, you will need to configure the IAM permissions for the
controller to query the Pipes API. For full details, please review the AWS Controllers for Kubernetes documentation for
how to configure the IAM permissions. If you follow the examples in the documentation, use the value
of pipes
for SERVICE
.
Create an EventBridge Pipe
Create the source and target SQS queues
To keep the scope of this tutorial simple, the SQS queues and IAM permissions will be created with the AWS CLI. Alternatively, the ACK SQS Controller and ACK IAM Controller can be used to manage these resources with Kubernetes.
Execute the following command to define the environment variables used throughout the example.
${AWS_REGION}
and
${AWS_ACCOUNT_ID}
are already set. Otherwise please set these variables before executing the following steps. The value for ${AWS_REGION}
must also match the --set=aws.region
value used in the helm install
command above.export PIPE_NAME=pipes-sqs-to-sqs
export PIPE_NAMESPACE=pipes-example
export SOURCE_QUEUE=pipes-sqs-source
export TARGET_QUEUE=pipes-sqs-target
export PIPE_ROLE=pipes-sqs-to-sqs-role
export PIPE_POLICY=pipes-sqs-to-sqs-policy
Create the source and target queues.
aws sqs create-queue --queue-name ${SOURCE_QUEUE}
aws sqs create-queue --queue-name ${TARGET_QUEUE}
The output of above commands looks like
{
"QueueUrl": "https://sqs.us-east-1.amazonaws.com/1234567890/pipes-sqs-source"
}
{
"QueueUrl": "https://sqs.us-east-1.amazonaws.com/1234567890/pipes-sqs-target"
}
Create the Pipes IAM Role
Create an IAM role for the pipe to consume messages from the source queue and send messages to the target queue.
cat <<EOF > trust.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "${AWS_ACCOUNT_ID}"
}
}
}
]
}
EOF
aws iam create-role --role-name ${PIPE_ROLE} --assume-role-policy-document file://trust.json
The output of above commands looks like
{
"Role": {
"Path": "/",
"RoleName": "pipes-sqs-to-sqs-role",
"RoleId": "ABCDU3F4JDBEUCMGT3XBH",
"Arn": "arn:aws:iam::1234567890:role/pipes-sqs-to-sqs-role",
"CreateDate": "2023-03-21T13:11:59+00:00",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "1234567890"
}
}
}
]
}
}
}
Attach a policy to the role to give the pipe permissions to read and send messages.
cat <<EOF > policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": [
"arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${SOURCE_QUEUE}"
]
},
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage"
],
"Resource": [
"arn:aws:sqs:${AWS_REGION}:${AWS_ACCOUNT_ID}:${TARGET_QUEUE}"
]
}
]
}
EOF
aws iam put-role-policy --role-name ${PIPE_ROLE} --policy-name ${PIPE_POLICY} --policy-document file://policy.json
If the command executes successfully, no output is generated.
Create the Pipe
Execute the following command to retrieve the ARNs for the resources created above needed for the Kubernetes manifest.
export SOURCE_QUEUE_ARN=$(aws --output json sqs get-queue-attributes --queue-url "https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SOURCE_QUEUE}" --attribute-names QueueArn | jq -r '.Attributes.QueueArn')
export TARGET_QUEUE_ARN=$(aws --output json sqs get-queue-attributes --queue-url "https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${TARGET_QUEUE}" --attribute-names QueueArn | jq -r '.Attributes.QueueArn')
export PIPE_ROLE_ARN=$(aws --output json iam get-role --role-name ${PIPE_ROLE} | jq -r '.Role.Arn')
Execute the following command to create a Kubernetes manifest for a pipe consuming messages from the source queue and sending messages matching the filter criteria to the target queue using the above created IAM role.
The EventBridge filter pattern will match any SQS message from the source queue with a JSON-stringified body
{\"from\":\"kubernetes\"}
. Alternatively, the filter pattern can be omitted to forward all messages from the source
queue.
kubectl create ns ${PIPE_NAMESPACE}
cat <<EOF > pipe-sqs-to-sqs.yaml
apiVersion: pipes.services.k8s.aws/v1alpha1
kind: Pipe
metadata:
name: $PIPE_NAME
spec:
name: $PIPE_NAME
source: $SOURCE_QUEUE_ARN
description: "SQS to SQS Pipe with filtering"
sourceParameters:
filterCriteria:
filters:
- pattern: "{\"body\":{\"from\":[\"kubernetes\"]}}"
sqsQueueParameters:
batchSize: 1
maximumBatchingWindowInSeconds: 1
target: $TARGET_QUEUE_ARN
roleARN: $PIPE_ROLE_ARN
EOF
kubectl -n ${PIPE_NAMESPACE} create -f pipe-sqs-to-sqs.yaml
The output of above commands looks like
namespace/pipes-example created
pipe.pipes.services.k8s.aws/pipes-sqs-to-sqs created
Describe Pipe Custom Resource
View the Pipe custom resource to verify it is in a RUNNING
state.
kubectl -n $PIPE_NAMESPACE get pipe $PIPE_NAME
The output of above commands looks like
NAME STATE SYNCED AGE
pipes-sqs-to-sqs RUNNING True 3m10s
Verify the Pipe filtering and forwarding is working
Execute the following command to send a message to the source queue with a body matching the pipe filter pattern.
aws sqs send-message --queue-url https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SOURCE_QUEUE} --message-body "{\"from\":\"kubernetes\"}"
The output of above commands looks like
{
"MD5OfMessageBody": "fde2da607356f1974691e48fa6a87157",
"MessageId": "f4157187-0308-420c-b69b-aa439e6be7e3"
}
Verify the message was consumed by the pipe, the filter pattern matched and the message was received by the target queue with
aws sqs receive-message --queue-url https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${TARGET_QUEUE}
The output of above commands looks like
{
"Messages": [
{
<snip>
"MD5OfBody": "d5255184c571cca2c78e76d6eea1745d",
"Body": "{\"messageId\":\"f4157187-0308-420c-b69b-aa439e6be7e3\",
<snip>
\"body\":\"{\\\"from\\\":\\\"kubernetes\\\"}\",\"attributes\":{\"ApproximateReceiveCount\":\"1\",
<snip>
\"eventSourceARN\":\"arn:aws:sqs:us-east-1:1234567890:pipes-sqs-source\",\"awsRegion\":\"us-east-1\"}"
}
]
}
Next steps
The ACK service controller for Amazon EventBridge Pipes is based on the Amazon EventBridge Pipes API.
Refer to API Reference for Pipes to find all the supported Kubernetes custom resources and fields.
Cleanup
Remove all the Pipes resources created in this tutorial using kubectl delete
command.
kubectl -n ${PIPE_NAMESPACE} delete -f pipe-sqs-to-sqs.yaml
kubectl delete ns ${PIPE_NAMESPACE}
The output of delete command should look like
pipe.pipes.services.k8s.aws "pipes-sqs-to-sqs" deleted
namespace "pipes-example" deleted
Remove the manually created SQS resources.
aws sqs delete-queue --queue-url https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${SOURCE_QUEUE}
aws sqs delete-queue --queue-url https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}/${TARGET_QUEUE}
If the command executes successfully, no output is generated.
Remove the manually created IAM resources.
aws iam delete-role-policy --role-name ${PIPE_ROLE} --policy-name ${PIPE_POLICY}
aws iam delete-role --role-name ${PIPE_ROLE}
If the command executes successfully, no output is generated.
To remove the Pipes ACK service controller, related CRDs, and namespaces, see ACK Cleanup.
To delete your EKS clusters, see Amazon EKS - Deleting a cluster.