×

Filter by tag

AWS re:Invent 2022

2022-12-28

After a couple of years of cancelled bookings due to COVID-19, this year was the first time I’ve been able to attend AWS re:Invent. What a conference. Unlike any other event I’ve ever seen, the scale of this thing is wild, even for Vegas. Everyone warned me about travelling through LAX on the busiest travel day of the year. But overall, things were pretty smooth. After over 24 hours of flights and layovers, the trip from Adelaide->Sydney->Los Angeles->Las Vegas ended in a wonderful thanksgiving lunch with friends I hadn’t seen since 2019.

The conference was incredible. But here are five lessons for next time…


1. It’s pretty spread out

The conference is spread out over multiple properties, ranging from a five minute walk to a shuttle bus ride. Staying within walking distance of the exhibit hall makes it easy to attend the morning keynotes. I stayed at the Palazzo which was probably the nicest hotel I’ve stayed at in Vegas. Bonus: AWS provide good rates if you get in early and book when you purchase a conference pass.

2. It’s not a big deal if you can reserve a seat for a talk

There are plenty of walk-up seats available for every talk. When I didn’t have a reserved seat, I got there 30-60 minutes early and never missed out.

3. Not everything is in the name

We use AWS CoPilot heavily and were pretty disappointed that searching the program only turned up a couple of talks on the topic. But after going to a talk on ECS and seeing them use CoPilot the whole time, despite not including it in the description, we realised we may have missed a lot of valuable sessions!

4. re:Play is a party

re:Play is the big party on Thursday night. It’s not a repeat viewing of previous sessions, not a board game night, and definitely not a movie night. Probably take it easy on Wednesday night in anticipation of this.

5. Leave Friday afternoon or Saturday

The exhibit hall closes on Thursday afternoon, the big party is Thursday night, and the talks wind up by lunchtime on Friday. All of this makes a Friday or Saturday morning exit recommended.

RQ with SQS

2022-04-14

RQ is a great library for building a simple decoupled worker queue, which can invoke arbitrary functions from your code base.

As the name implies, it requires a redis service. If you’re deploying on AWS, you might already have familiarity with SQS and prefer to use that instead. Inspired by RQ, here’s how we do it with no dependencies whatsoever…


Considerations

This comes with all the same caveats as RQ, in that it is very tightly coupled, and absolutely not a substitute for a loosely-coupled pub-sub system (i.e. SNS->SQS).

There is also no priority management. You could build that, but it’s probably worth reaching for something like Celery at that point.

So what is it useful for? Really just one thing: executing an arbitrary function, with guaranteed at-least-once execution, plus retries and timeout.

Architecture

We’re on AWS, so we may as well use Lambda. In that situation, we would have the following:

1. Main application

A Lambda function running Flask, hooked up to API gateway.

If you can’t deal with the cold starts or can’t deploy to Lambda for some other reason, you could just as easily run the Flask application in ECS or App Runner etc.

2. Background worker

A Lambda function handling SQS events.

The key point is that both the main application and the background worker share the same code base. Ideally for simplicity, you deploy the same Docker image to both.

Implementation

First, we need to define our function to queue the message to SQS:

def enqueue(func_name, *args, **kwargs):
        payload = {"func_name": func_name, "args": args, "kwargs": kwargs}
        response = boto3.client("sqs").send_message(
            QueueUrl=environ["WORKER_QUEUE_URL"],
            DelaySeconds=0,
            MessageBody=json.dumps(payload),
        )

Then we can call it anywhere we like, passing in the fully qualified path name of the function we want to call, plus any arguments:

enqueue("app.emailer.send_email", "person@gmail.com", "Hello", "Here is the body")

In this case, the function name we pass in is a simple function to send an email, looking like this:

def send_email(email_address, subject, body):
    //Send email via SMTP etc

Create a new file, where the handler function will live. This is heavily inspired by RQ:

import yaml
from os import environ
import importlib
import logging
import boto3
from botocore.exceptions import ClientError
import json
import threading


TIMEOUT_WARNING = int(environ["TIMEOUT_WARNING"])


def import_function(name):
    name_bits = name.split(".")
    module_name = ".".join(name_bits[:-1])
    module = importlib.import_module(module_name)
    function_name = name_bits[-1]
    return getattr(module, function_name)


class TimeoutThread(threading.Thread):
    """Creates a Thread which runs (sleeps) for a time duration equal to
    timeout and raises an exception if it is not stopped
    """

    def __init__(self, lambda_payload):
        # type: (float, int) -> None
        threading.Thread.__init__(self)
        self.lambda_payload = lambda_payload
        self._stop_event = threading.Event()

    def stop(self):
        self._stop_event.set()

    def run(self):
        self._stop_event.wait(TIMEOUT_WARNING)

        if self._stop_event.is_set():
            return

        # If we get to this point we need to log a timeout warning
        logging.error("Lambda timeout warning triggered")

def handler(event, context):
    if len(event["Records"]) > 1:
        raise ValueError("Lambda should only process one event at a time")
    record = event["Records"][0]
    message = json.loads(record["body"])
    timeout_thread = TimeoutThread(
        message
    )
    timeout_thread.start()

    func = import_function(message["func_name"])
    args = message["args"]
    kwargs = message["kwargs"]
    func(*args, **kwargs)
    timeout_thread.stop()

Or you may want to wrap it in an app context:

from flask_client import create_app
...
with app.app_context():
    func(*args, **kwargs)

The whole timeout thing is really only necessary if you’re having trouble getting error messages relating to timeouts. If the Lambda timeout is triggered, then it just shuts down. But if you have an internal timeout running inside Lambda, it gives you the chance to add some logging or send it to Sentry.

When deploying, you will want to set the timeout warning to less than the lambda timeout. For example, if the lambda timeout is 900 seconds, set the timeout warning to 880 seconds.

Deploy

Remember, all of this lives inside the same code base. We build just one docker image, but execute it in different ways.

Running the flask application is no different to usual. Probably just follow the instructions here.

To run the background worker, we need configure lambda to run the same docker image, but using awslambdaric. That’s the AWS Lambda Python Runtime Interface Client, and it allows correct parsing of the SQS payload and running the handler function:

After installing awslambdaric, we configure the lambda function docker with the following:

entry_point = ["python", "-m", "awslambdaric"]
command     = ["lambda.handler"]

CDK Patterns - App Runner

2021-07-03

AWS App runner is fairly new and currently has no high level CDK support. But it does have CloudFormation support, which means we can still use L1 Constructs in CDK. Let’s look at how to do that.


First, the imports:

import * as apprunner from "@aws-cdk/aws-apprunner";
import { DockerImageAsset } from "@aws-cdk/aws-ecr-assets";

AppRunner has two deployment methods - source code or container image. We are deploying a container image because this gives us full control over the build and deploy process.

const asset = new DockerImageAsset(this, "app", {
  directory: path.join(__dirname, "../app"),
});

Next we need to set up two IAM roles. The first role is used by AppRunner in order to fetch the Docker Image. I imagine this will be abstracted away when a L2 construct for App Runner is released.

const appRunnerEcrRole = new iam.Role(this, "appRunnerEcrRole", {
  assumedBy: new iam.ServicePrincipal("build.apprunner.amazonaws.com"),
});
asset.repository.grantPull(appRunnerEcrRole);

The second IAM role is used/assumed by the service when it runs. We need to grant it permission to (for example) access DynamoDB.

const appRunnerRole = new iam.Role(this, "appRunnerInstanceRole", {
  assumedBy: new iam.ServicePrincipal("tasks.apprunner.amazonaws.com"),
});

// table.grantFullAccess(appRunnerRole);

Now we define the service using the L1 construct.

let svc = new apprunner.CfnService(this, "appRunner", {
  instanceConfiguration: {
    instanceRoleArn: appRunnerRole.roleArn,
  },
  sourceConfiguration: {
    authenticationConfiguration: {
      accessRoleArn: appRunnerEcrRole.roleArn,
    },
    imageRepository: {
      imageIdentifier: asset.imageUri,
      imageRepositoryType: "ECR",
      imageConfiguration: {
        port: "3000",
      },
    },
    autoDeploymentsEnabled: false,
  },
});

The full AWS documentation for the App Runner L1 construct can be found here.

Our service automatically get assigned a Load Balancer and a public URL secured with SSL/HTTPS. We want to see what that URL is after deployment:

new cdk.CfnOutput(this, "AppRunnerUrl", {
  value: svc.getAtt("ServiceUrl").toString(),
});

And that’s it! App Runner provides an affordable and simple way to deploy a container image to AWS. It is automatically assigned a domain with SSL, so you get a unique HTTPS URL without any extra work.

This makes it much more suitable for web applications than ECS which would need to be connected to a Load Balancer or API Gateway in order to get a HTTPS URL.

One good use-case to explore will be automatic deploy previews. Using GitHub Actions, we can deploy our entire CDK stack which now includes the front-end, every time a PR is made.

But App Runner does not scale to zero, so we need to figure out how to make these self-destruct to avoid the ongoing costs associated with App Runner.

CDK Patterns - Step Functions

2021-07-02

We’re doing some numerical work in Lambda. There is some numerical instability and we want to make sure that we can reliably get the same results. We could use the AWS SDK to invoke Lambda in parallel…


def generate(lambda_payload):
    payload_bytes = bytes(json.dumps(lambda_payload), "UTF-8")
    response = lambda_client.invoke(
        FunctionName="arn:aws:lambda:ap-southeast-2:...",
        InvocationType="RequestResponse",
        Payload=payload_bytes,
        LogType="Tail",
    )
    if response["StatusCode"] == 200:
        payload = response["Payload"].read()
        output.append(payload)
        return payload
    else:
        raise Exception("Non-200 response")


output = []
threads = []

for idx in range(0, 180):
    x = threading.Thread(target=generate, args=(DEMO_PAYLOAD,))
    threads.append(x)


print(f"Executing {len(threads)} threads in parallel")

t1 = time()
for thread in threads:
    thread.start()

print("Running...")

for thread in threads:
    thread.join()
t2 = time()

print(output[0])
for line in output:
    if output[0] != line:
        print(line)
        raise Exception("Difference found")

print(f"Elapsed time for {len(threads)} threads: {t2-t1}")
print(f"Output length: {len(output)}")

But this is a bit brittle and we need to keep track of everything in memory. An alternative would be to string together a system with SQS, but AWS Step functions provides an easier way:

We will implement this with the CDK in TypeScript. Let’s get started.

Imports, nothing unusual here:

import * as cdk from "@aws-cdk/core";
import * as lambda from "@aws-cdk/aws-lambda";
import * as sfn from "@aws-cdk/aws-stepfunctions";
import * as tasks from "@aws-cdk/aws-stepfunctions-tasks";
import * as logs from "@aws-cdk/aws-logs";
import * as s3 from "@aws-cdk/aws-s3";
import { PythonFunction } from "@aws-cdk/aws-lambda-python";

Now we define all of our Lambda functions:

const dockerImage = lambda.DockerImageCode.fromImageAsset("..");

const splitterFunc = new PythonFunction(this, "SplitterFunc", {
    entry: "../lambda",
    index: "lambda_helpers.py",
    handler: "splitter",
    runtime: lambda.Runtime.PYTHON_3_8,
});

const reducerFunc = new PythonFunction(this, "ReducerFunc", {
    entry: "../lambda",
    index: "lambda_helpers.py",
    handler: "reducer",
    runtime: lambda.Runtime.PYTHON_3_8,
});

const dockerFunc = new lambda.DockerImageFunction(this, "clmath", {
    code: dockerImage,
    memorySize: 512,
    timeout: cdk.Duration.seconds(900),
    environment: {
    NUM_WORKERS: "1",
    HASH_ONLY: "TRUE",
    },
    currentVersionOptions: {
    removalPolicy: cdk.RemovalPolicy.RETAIN, // retain old versions
    retryAttempts: 1, // async retry attempts
    },
});

A quick detour to look at the Python code for our helper functions. In this case, our reducer just checks to make sure all results are identical.

def reducer(event, context):
    for payload in event:
        if payload!=event[0]:
            logging.info(f"Difference found: {payload} - {event[0]}")
            return False
    return True


def splitter(event, context):
    return [event]*20

Back to the CDK, now we map our functions to LambdaInvoke steps…

const reduceJob = new tasks.LambdaInvoke(this, "Check Job", {
    lambdaFunction: reducerFunc,
    inputPath: "$",
    outputPath: "$",
});

const splitJob = new tasks.LambdaInvoke(this, "Split Job", {
    lambdaFunction: splitterFunc,
    inputPath: "$",
    outputPath: "$.Payload",
});

// Append lambda output to the input as lambda_response
const calcJob = new tasks.LambdaInvoke(this, "Calc Job", {
    lambdaFunction: dockerFunc,
    outputPath: "$.Payload.SHAH_HASH",
});
const calcMap = new sfn.Map(this, "Calc Map", {
    maxConcurrency: 0, // No limit, run as many concurrently as possible
});
calcMap.iterator(calcJob);

const splitMap = new sfn.Map(this, "Split Map", {
    maxConcurrency: 0, // No limit, run as many concurrently as possible
});

Now we string everything together:

const success = new sfn.Succeed(this, "Success");
const fail = new sfn.Fail(this, "Differences found");

const choiceFunc = new sfn.Choice(this, "Choice Func")
    .when(sfn.Condition.booleanEquals("$.Payload", true), success)
    .when(sfn.Condition.booleanEquals("$.Payload", false), fail);

const subWorkflow = calcMap.next(reduceJob).next(choiceFunc);
splitMap.iterator(splitJob.next(subWorkflow));

const definition = splitMap.next(new sfn.Succeed(this, "FinalSuccess"));

const logGroup = new logs.LogGroup(this, "lg");
new sfn.StateMachine(this, "clmath-standard", {
    definition,
    timeout: cdk.Duration.seconds(3600),
    tracingEnabled: true,
    logs: {
    destination: logGroup,
    level: sfn.LogLevel.ALL,
    },
});

Finally, we invoke it using Python+Boto3:

client = boto3.client("stepfunctions")
t1 = time()
# Submit for execution
response = client.start_execution(
    stateMachineArn="arn:aws:states:ap-southeast-...",
    input=json.dumps(payload),
)
arn = response["executionArn"]

# Now poll until completion
print(f"Running... ExecutionArn: {arn}")
while True:
    response = client.describe_execution(executionArn=arn)
    status = response["status"]
    if status != "RUNNING":
        print(response)
        break
    sleep(1)
t2 = time()
print(f"Completed in {t2-t1} s")