Module supermoto.resources

Expand source code
import json
from dataclasses import dataclass
from typing import List, Callable, Any, Dict

import boto3

from .helpers import generate_instance_identity_document


@dataclass
class EcsCluster:
    cluster_name: str
    task_definitions: List[str]


def dynamo_table(table_name: str, id_attribute: str = "id", range_attribute=None):
    ddb = boto3.client("dynamodb")

    attrdefs = [{"AttributeName": id_attribute, "AttributeType": "S"}]
    keyschema = [{"AttributeName": id_attribute, "KeyType": "HASH"}]

    if range_attribute is not None:
        attrdefs.append({
            "AttributeName": range_attribute, "AttributeType": "S"
        })
        keyschema.append({
            "AttributeName": range_attribute, "KeyType": "RANGE"
        })

    ddb.create_table(
        AttributeDefinitions=attrdefs,
        TableName=table_name,
        KeySchema=keyschema,
        ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
    )


def sqs_queue(queue_name: str) -> Callable[[Any], None]:
    """ creates queue, returns function that can be used to send messages """
    sqs = boto3.client("sqs")
    sqs.create_queue(QueueName=queue_name)

    def sqs_sender(msg):
        sqs.send_message(QueueUrl=queue_name, MessageBody=msg)

    return sqs_sender


def s3_bucket(bucket_name: str) -> Callable[[str, bytes], None]:
    """ returns function you can use to populate initial stuff to bucket """
    client = boto3.client("s3", region_name="eu-west-1")
    client.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
    )

    def s3_put(key: str, cont: bytes) -> None:
        client.put_object(
            Bucket=bucket_name,
            Key=key,
            Body=cont
        )

    return s3_put


def s3_ls(bucket_name: str) -> List[str]:
    """ list all files in the bucket """
    client = boto3.client("s3", region_name="eu-west-1")
    resp = client.list_objects(Bucket=bucket_name)
    return [ent["Key"] for ent in resp["Contents"]]


def s3_get_objects(bucket_name: str, prefix: str = None) -> Dict[str, bytes]:
    """ load all files in the bucket to dict"""
    client = boto3.client("s3", region_name="eu-west-1")
    if prefix:
        resp = client.list_objects(Bucket=bucket_name, Prefix=prefix)
    else:
        resp = client.list_objects(Bucket=bucket_name)
    objects = [ent["Key"] for ent in resp["Contents"]]

    res = {}
    for o in objects:
        cont = client.get_object(Bucket=bucket_name, Key=o)
        res[o] = cont["Body"].read()
    return res


def sns_topic(topic_name: str) -> str:
    """ returns arn that you need to use for publishing """
    client = boto3.client("sns", region_name="eu-west-1")
    created = client.create_topic(Name=topic_name)
    return created["TopicArn"]


def ecs_ls(cluster_name: str, desired_status="RUNNING"):
    """ List (and describes) all the tasks running in ECS cluster, with specified status """
    client = boto3.client("ecs", region_name="eu-west-1")
    tasks = client.list_tasks(cluster=cluster_name, desiredStatus=desired_status)
    if not tasks:
        return []

    arns = tasks.get("taskArns", [])
    if not arns:
        return []

    describe = client.describe_tasks(
        cluster=cluster_name,
        tasks=arns,
        include=["TAGS"]
    )
    return describe["tasks"]

def ecs_cluster(definition: EcsCluster):
    """ Creates ecs cluster """
    client = boto3.client("ecs", region_name="eu-west-1")
    ec2 = boto3.resource("ec2", region_name="eu-west-1")
    client.create_cluster(clusterName=definition.cluster_name)
    for td in definition.task_definitions:
        client.register_task_definition(
            family=td,
            containerDefinitions=[
                {
                    "name": "supermoto-taskdef-" + td,
                    "memory": 400,
                    "essential": True,
                    "environment": [
                        {"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
                    ],
                    "logConfiguration": {"logDriver": "json-file"},
                }
            ],
        )

    test_instance = ec2.create_instances(
        # EXAMPLE_AMI_ID from Moto
        ImageId="ami-12c6146b",
        MinCount=1,
        MaxCount=1,
    )[0]

    instance_id_document = json.dumps(
        generate_instance_identity_document(test_instance)
    )

    client.register_container_instance(
        cluster=definition.cluster_name, instanceIdentityDocument=instance_id_document
    )

Functions

def dynamo_table(table_name: str, id_attribute: str = 'id', range_attribute=None)
Expand source code
def dynamo_table(table_name: str, id_attribute: str = "id", range_attribute=None):
    ddb = boto3.client("dynamodb")

    attrdefs = [{"AttributeName": id_attribute, "AttributeType": "S"}]
    keyschema = [{"AttributeName": id_attribute, "KeyType": "HASH"}]

    if range_attribute is not None:
        attrdefs.append({
            "AttributeName": range_attribute, "AttributeType": "S"
        })
        keyschema.append({
            "AttributeName": range_attribute, "KeyType": "RANGE"
        })

    ddb.create_table(
        AttributeDefinitions=attrdefs,
        TableName=table_name,
        KeySchema=keyschema,
        ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
    )
def ecs_cluster(definition: EcsCluster)

Creates ecs cluster

Expand source code
def ecs_cluster(definition: EcsCluster):
    """ Creates ecs cluster """
    client = boto3.client("ecs", region_name="eu-west-1")
    ec2 = boto3.resource("ec2", region_name="eu-west-1")
    client.create_cluster(clusterName=definition.cluster_name)
    for td in definition.task_definitions:
        client.register_task_definition(
            family=td,
            containerDefinitions=[
                {
                    "name": "supermoto-taskdef-" + td,
                    "memory": 400,
                    "essential": True,
                    "environment": [
                        {"name": "AWS_ACCESS_KEY_ID", "value": "SOME_ACCESS_KEY"}
                    ],
                    "logConfiguration": {"logDriver": "json-file"},
                }
            ],
        )

    test_instance = ec2.create_instances(
        # EXAMPLE_AMI_ID from Moto
        ImageId="ami-12c6146b",
        MinCount=1,
        MaxCount=1,
    )[0]

    instance_id_document = json.dumps(
        generate_instance_identity_document(test_instance)
    )

    client.register_container_instance(
        cluster=definition.cluster_name, instanceIdentityDocument=instance_id_document
    )
def ecs_ls(cluster_name: str, desired_status='RUNNING')

List (and describes) all the tasks running in ECS cluster, with specified status

Expand source code
def ecs_ls(cluster_name: str, desired_status="RUNNING"):
    """ List (and describes) all the tasks running in ECS cluster, with specified status """
    client = boto3.client("ecs", region_name="eu-west-1")
    tasks = client.list_tasks(cluster=cluster_name, desiredStatus=desired_status)
    if not tasks:
        return []

    arns = tasks.get("taskArns", [])
    if not arns:
        return []

    describe = client.describe_tasks(
        cluster=cluster_name,
        tasks=arns,
        include=["TAGS"]
    )
    return describe["tasks"]
def s3_bucket(bucket_name: str) ‑> Callable[[str, bytes], NoneType]

returns function you can use to populate initial stuff to bucket

Expand source code
def s3_bucket(bucket_name: str) -> Callable[[str, bytes], None]:
    """ returns function you can use to populate initial stuff to bucket """
    client = boto3.client("s3", region_name="eu-west-1")
    client.create_bucket(
        Bucket=bucket_name,
        CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
    )

    def s3_put(key: str, cont: bytes) -> None:
        client.put_object(
            Bucket=bucket_name,
            Key=key,
            Body=cont
        )

    return s3_put
def s3_get_objects(bucket_name: str, prefix: str = None) ‑> Dict[str, bytes]

load all files in the bucket to dict

Expand source code
def s3_get_objects(bucket_name: str, prefix: str = None) -> Dict[str, bytes]:
    """ load all files in the bucket to dict"""
    client = boto3.client("s3", region_name="eu-west-1")
    if prefix:
        resp = client.list_objects(Bucket=bucket_name, Prefix=prefix)
    else:
        resp = client.list_objects(Bucket=bucket_name)
    objects = [ent["Key"] for ent in resp["Contents"]]

    res = {}
    for o in objects:
        cont = client.get_object(Bucket=bucket_name, Key=o)
        res[o] = cont["Body"].read()
    return res
def s3_ls(bucket_name: str) ‑> List[str]

list all files in the bucket

Expand source code
def s3_ls(bucket_name: str) -> List[str]:
    """ list all files in the bucket """
    client = boto3.client("s3", region_name="eu-west-1")
    resp = client.list_objects(Bucket=bucket_name)
    return [ent["Key"] for ent in resp["Contents"]]
def sns_topic(topic_name: str) ‑> str

returns arn that you need to use for publishing

Expand source code
def sns_topic(topic_name: str) -> str:
    """ returns arn that you need to use for publishing """
    client = boto3.client("sns", region_name="eu-west-1")
    created = client.create_topic(Name=topic_name)
    return created["TopicArn"]
def sqs_queue(queue_name: str) ‑> Callable[[Any], NoneType]

creates queue, returns function that can be used to send messages

Expand source code
def sqs_queue(queue_name: str) -> Callable[[Any], None]:
    """ creates queue, returns function that can be used to send messages """
    sqs = boto3.client("sqs")
    sqs.create_queue(QueueName=queue_name)

    def sqs_sender(msg):
        sqs.send_message(QueueUrl=queue_name, MessageBody=msg)

    return sqs_sender

Classes

class EcsCluster (cluster_name: str, task_definitions: List[str])

EcsCluster(cluster_name: str, task_definitions: List[str])

Expand source code
class EcsCluster:
    cluster_name: str
    task_definitions: List[str]

Class variables

var cluster_name : str
var task_definitions : List[str]