Wednesday, November 1, 2023
HomeBig DataUse Snowflake with Amazon MWAA to orchestrate information pipelines

Use Snowflake with Amazon MWAA to orchestrate information pipelines


This weblog put up is co-written with James Solar from Snowflake.

Clients depend on information from completely different sources equivalent to cellular functions, clickstream occasions from web sites, historic information, and extra to infer significant patterns to optimize their merchandise, providers, and processes. With a knowledge pipeline, which is a set of duties used to automate the motion and transformation of knowledge between completely different programs, you may scale back the effort and time wanted to realize insights from the info. Apache Airflow and Snowflake have emerged as highly effective applied sciences for information administration and evaluation.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed workflow orchestration service for Apache Airflow that you should use to arrange and function end-to-end information pipelines within the cloud at scale. The Snowflake Information Cloud gives a single supply of fact for all of your information wants and permits your organizations to retailer, analyze, and share massive quantities of knowledge. The Apache Airflow open-source group gives over 1,000 pre-built operators (plugins that simplify connections to providers) for Apache Airflow to construct information pipelines.

On this put up, we offer an summary of orchestrating your information pipeline utilizing Snowflake operators in your Amazon MWAA atmosphere. We outline the steps wanted to arrange the mixing between Amazon MWAA and Snowflake. The answer gives an end-to-end automated workflow that features information ingestion, transformation, analytics, and consumption.

Overview of resolution

The next diagram illustrates our resolution structure.

Solution Overview

The info used for transformation and evaluation is predicated on the publicly accessible New York Citi Bike dataset. The info (zipped recordsdata), which incorporates rider demographics and journey information, is copied from the general public Citi Bike Amazon Easy Storage Service (Amazon S3) bucket in your AWS account. Information is decompressed and saved in a special S3 bucket (remodeled information will be saved in the identical S3 bucket the place information was ingested, however for simplicity, we’re utilizing two separate S3 buckets). The remodeled information is then made accessible to Snowflake for information evaluation. The output of the queried information is revealed to Amazon Easy Notification Service (Amazon SNS) for consumption.

Amazon MWAA makes use of a directed acyclic graph (DAG) to run the workflows. On this put up, we run three DAGs:

The next diagram illustrates this workflow.

DAG run workflow

See the GitHub repo for the DAGs and different recordsdata associated to the put up.

Observe that on this put up, we’re utilizing a DAG to create a Snowflake connection, however you can even create the Snowflake connection utilizing the Airflow UI or CLI.

Conditions

To deploy the answer, you must have a primary understanding of Snowflake and Amazon MWAA with the next stipulations:

  • An AWS account in an AWS Area the place Amazon MWAA is supported.
  • A Snowflake account with admin credentials. In the event you don’t have an account, join a 30-day free trial. Choose the Snowflake enterprise version for the AWS Cloud platform.
  • Entry to Amazon MWAA, Secrets and techniques Supervisor, and Amazon SNS.
  • On this put up, we’re utilizing two S3 buckets, referred to as airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID. Amazon S3 helps international buckets, which implies that every bucket title should be distinctive throughout all AWS accounts in all of the Areas inside a partition. If the S3 bucket title is already taken, select a special S3 bucket title. Create the S3 buckets in your AWS account. We add content material to the S3 bucket later within the put up. Change ACCOUNT_ID with your personal AWS account ID or every other distinctive identifier. The bucket particulars are as follows:
    • airflow-blog-bucket-ACCOUNT_ID – The highest-level bucket for Amazon MWAA-related recordsdata.
    • airflow-blog-bucket-ACCOUNT_ID/necessities – The bucket used for storing the necessities.txt file wanted to deploy Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags – The bucked used for storing the DAG recordsdata to run workflows in Amazon MWAA.
    • airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries – The bucket used for storing the Snowflake SQL queries.
    • citibike-tripdata-destination-ACCOUNT_ID – The bucket used for storing the remodeled dataset.

When implementing the answer on this put up, change references to airflow-blog-bucket-ACCOUNT_ID and citibike-tripdata-destination-ACCOUNT_ID with the names of your personal S3 buckets.

Arrange the Amazon MWAA atmosphere

First, you create an Amazon MWAA atmosphere. Earlier than deploying the atmosphere, add the necessities file to the airflow-blog-bucket-ACCOUNT_ID/necessities S3 bucket. The necessities file is predicated on Amazon MWAA model 2.6.3. In the event you’re testing on a special Amazon MWAA model, replace the necessities file accordingly.

Full the next steps to arrange the atmosphere:

  1. On the Amazon MWAA console, select Create atmosphere.
  2. Present a reputation of your alternative for the atmosphere.
  3. Select Airflow model 2.6.3.
  4. For the S3 bucket, enter the trail of your bucket (s3:// airflow-blog-bucket-ACCOUNT_ID).
  5. For the DAGs folder, enter the DAGs folder path (s3:// airflow-blog-bucket-ACCOUNT_ID/dags).
  6. For the necessities file, enter the necessities file path (s3:// airflow-blog-bucket-ACCOUNT_ID/ necessities/necessities.txt).
  7. Select Subsequent.
  8. Beneath Networking, select your present VPC or select Create MWAA VPC.
  9. Beneath Internet server entry, select Public community.
  10. Beneath Safety teams, depart Create new safety group chosen.
  11. For the Setting class, Encryption, and Monitoring sections, depart all values as default.
  12. Within the Airflow configuration choices part, select Add customized configuration worth and configure two values:
    1. Set Configuration possibility to secrets and techniques.backend and Customized worth to airflow.suppliers.amazon.aws.secrets and techniques.secrets_manager.SecretsManagerBackend.
    2. Set Configuration possibility to secrets and techniques.backend_kwargs and Customized worth to {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}.                      Configuration options for secret manager
  13. Within the Permissions part, depart the default settings and select Create a brand new position.
  14. Select Subsequent.
  15. When the Amazon MWAA atmosphere us accessible, assign S3 bucket permissions to the AWS Identification and Entry Administration (IAM) execution position (created through the Amazon MWAA set up).

MWAA execution role
It will direct you to the created execution position on the IAM console.

For testing functions, you may select Add permissions and add the managed AmazonS3FullAccess coverage to the person as a substitute of offering restricted entry. For this put up, we offer solely the required entry to the S3 buckets.

  1. On the drop-down menu, select Create inline coverage.
  2. For Choose Service, select S3.
  3. Beneath Entry degree, specify the next:
    1. Develop Listing degree and choose ListBucket.
    2. Develop Learn degree and choose GetObject.
    3. Develop Write degree and choose PutObject.
  4. Beneath Sources, select Add ARN.
  5. On the Textual content tab, present the next ARNs for S3 bucket entry:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID (use your personal bucket).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID (use your personal bucket).
    3. arn:aws:s3:::tripdata (that is the general public S3 bucket the place the Citi Bike dataset is saved; use the ARN as specified right here).
  6. Beneath Sources, select Add ARN.
  7. On the Textual content tab, present the next ARNs for S3 bucket entry:
    1. arn:aws:s3:::airflow-blog-bucket-ACCOUNT_ID/* (ensure to incorporate the asterisk).
    2. arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID /*.
    3. arn:aws:s3:::tripdata/* (that is the general public S3 bucket the place the Citi Bike dataset is saved, use the ARN as specified right here).
  8. Select Subsequent.
  9. For Coverage title, enter S3ReadWrite.
  10. Select Create coverage.
  11. Lastly, present Amazon MWAA with permission to entry Secrets and techniques Supervisor secret keys.

This step gives the Amazon MWAA execution position to your Amazon MWAA atmosphere learn entry to the key key in Secrets and techniques Supervisor.

The execution position ought to have the insurance policies MWAA-Execution-Coverage*, S3ReadWrite, and SecretsManagerReadWrite connected to it.

MWAA execution role policies

When the Amazon MWAA atmosphere is obtainable, you may register to the Airflow UI from the Amazon MWAA console utilizing hyperlink for Open Airflow UI.

Airflow UI access

Arrange an SNS matter and subscription

Subsequent, you create an SNS matter and add a subscription to the subject. Full the next steps:

  1. On the Amazon SNS console, select Matters from the navigation pane.
  2. Select Create matter.
  3. For Subject kind, select Customary.
  4. For Identify, enter mwaa_snowflake.
  5. Go away the remainder as default.
  6. After you create the subject, navigate to the Subscriptions tab and select Create subscription.
    SNS topic
  7. For Subject ARN, select mwaa_snowflake.
  8. Set the protocol to E-mail.
  9. For Endpoint, enter your electronic mail ID (you’re going to get a notification in your electronic mail to just accept the subscription).

By default, solely the subject proprietor can publish and subscribe to the subject, so you might want to modify the Amazon MWAA execution position entry coverage to permit Amazon SNS entry.

  1. On the IAM console, navigate to the execution position you created earlier.
  2. On the drop-down menu, select Create inline coverage.
    MWAA execution role SNS policy
  3. For Choose service, select SNS.
  4. Beneath Actions, increase Write entry degree and choose Publish.
  5. Beneath Sources, select Add ARN.
  6. On the Textual content tab, specify the ARN arn:aws:sns:<<area>>:<<our_account_ID>>:mwaa_snowflake.
  7. Select Subsequent.
  8. For Coverage title, enter SNSPublishOnly.
  9. Select Create coverage.

Configure a Secrets and techniques Supervisor secret

Subsequent, we arrange Secrets and techniques Supervisor, which is a supported various database for storing Snowflake connection info and credentials.

To create the connection string, the Snowflake host and account title is required. Log in to your Snowflake account, and underneath the Worksheets menu, select the plus signal and choose SQL worksheet. Utilizing the worksheet, run the next SQL instructions to seek out the host and account title.

Run the next question for the host title:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'"','') AS HOST
FROM HOSTLIST
WHERE VALUE:kind="SNOWFLAKE_DEPLOYMENT_REGIONLESS";

Run the next question for the account title:

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT
FROM HOSTLIST
WHERE VALUE:kind="SNOWFLAKE_DEPLOYMENT";

Subsequent, we configure the key in Secrets and techniques Supervisor.

  1. On the Secrets and techniques Supervisor console, select Retailer a brand new secret.
  2. For Secret kind, select Different kind of secret.
  3. Beneath Key/Worth pairs, select the Plaintext tab.
  4. Within the textual content area, enter the next code and modify the string to mirror your Snowflake account info:

{"host": "<<snowflake_host_name>>", "account":"<<snowflake_account_name>>","person":"<<snowflake_username>>","password":"<<snowflake_password>>","schema":"mwaa_schema","database":"mwaa_db","position":"accountadmin","warehouse":"dev_wh"}

For instance:

{"host": "xxxxxx.snowflakecomputing.com", "account":"xxxxxx" ,"person":"xxxxx","password":"*****","schema":"mwaa_schema","database":"mwaa_db", "position":"accountadmin","warehouse":"dev_wh"}

The values for the database title, schema title, and position needs to be as talked about earlier. The account, host, person, password, and warehouse can differ primarily based in your setup.

Secret information

Select Subsequent.

  1. For Secret title, enter airflow/connections/snowflake_accountadmin.
  2. Go away all different values as default and select Subsequent.
  3. Select Retailer.

Be aware of the Area through which the key was created underneath Secret ARN. We later outline it as a variable within the Airflow UI.

Configure Snowflake entry permissions and IAM position

Subsequent, log in to your Snowflake account. Make sure the account you might be utilizing has account administrator entry. Create a SQL worksheet. Beneath the worksheet, create a warehouse named dev_wh.

The next is an instance SQL command:

CREATE OR REPLACE WAREHOUSE dev_wh 
 WITH WAREHOUSE_SIZE = 'xsmall' 
 AUTO_SUSPEND = 60 
 INITIALLY_SUSPENDED = true
 AUTO_RESUME = true
 MIN_CLUSTER_COUNT = 1
 MAX_CLUSTER_COUNT = 5;

For Snowflake to learn information from and write information to an S3 bucket referenced in an exterior (S3 bucket) stage, a storage integration is required. Observe the steps outlined in Choice 1: Configuring a Snowflake Storage Integration to Entry Amazon S3(solely carry out Steps 1 and a pair of, as described on this part).

Configure entry permissions for the S3 bucket

Whereas creating the IAM coverage, a pattern coverage doc code is required (see the next code), which gives Snowflake with the required permissions to load or unload information utilizing a single bucket and folder path. The bucket title used on this put up is citibike-tripdata-destination-ACCOUNT_ID. It is best to modify it to mirror your bucket title.

{
  "Model": "2012-10-17",
  "Assertion": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:DeleteObject",
        "s3:DeleteObjectVersion"
      ],
      "Useful resource": "arn:aws:s3::: citibike-tripdata-destination-ACCOUNT_ID/*"
    },
    {
      "Impact": "Permit",
      "Motion": [
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Useful resource": "arn:aws:s3:::citibike-tripdata-destination-ACCOUNT_ID"
    }
  ]
}

Create the IAM position

Subsequent, you create the IAM position to grant privileges on the S3 bucket containing your information recordsdata. After creation, document the Position ARN worth positioned on the position abstract web page.

Snowflake IAM role

Configure variables

Lastly, configure the variables that can be accessed by the DAGs in Airflow. Log in to the Airflow UI and on the Admin menu, select Variables and the plus signal.

Airflow variables

Add 4 variables with the next key/worth pairs:

  • Key aws_role_arn with worth <<snowflake_aws_role_arn>> (the ARN for position mysnowflakerole famous earlier)
  • Key destination_bucket with worth <<bucket_name>> (for this put up, the bucket utilized in citibike-tripdata-destination-ACCOUNT_ID)
  • Key target_sns_arn with worth <<sns_Arn>> (the SNS matter in your account)
  • Key sec_key_region with worth <<region_of_secret_deployment>> (the Area the place the key in Secrets and techniques Supervisor was created)

The next screenshot illustrates the place to seek out the SNS matter ARN.

SNS topic ARN

The Airflow UI will now have the variables outlined, which can be referred to by the DAGs.

Airflow variables list

Congratulations, you might have accomplished all of the configuration steps.

Run the DAG

Let’s take a look at run the DAGs. To recap:

  • DAG1 (create_snowflake_connection_blog.py) – Creates the Snowflake connection in Apache Airflow. This connection can be used to authenticate with Snowflake. The Snowflake connection string is saved in Secrets and techniques Supervisor, which is referenced within the DAG file.
  • DAG2 (create-snowflake_initial-setup_blog.py) – Creates the database, schema, storage integration, and stage in Snowflake.
  • DAG3 (run_mwaa_datapipeline_blog.py) – Runs the info pipeline, which is able to unzip recordsdata from the supply public S3 bucket and duplicate them to the vacation spot S3 bucket. The subsequent job will create a desk in Snowflake to retailer the info. Then the info from the vacation spot S3 bucket can be copied into the desk utilizing a Snowflake stage. After the info is efficiently copied, a view can be created in Snowflake, on prime of which the SQL queries can be run.

To run the DAGs, full the next steps:

  1. Add the DAGs to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags.
  2. Add the SQL question recordsdata to the S3 bucket airflow-blog-bucket-ACCOUNT_ID/dags/mwaa_snowflake_queries.
  3. Log in to the Apache Airflow UI.
  4. Find DAG1 (create_snowflake_connection_blog), un-pause it, and select the play icon to run it.

You possibly can view the run state of the DAG utilizing the Grid or Graph view within the Airflow UI.

Dag1 run

After DAG1 runs, the Snowflake connection snowflake_conn_accountadmin is created on the Admin, Connections menu.

  1. Find and run DAG2 (create-snowflake_initial-setup_blog).

Dag2 run

After DAG2 runs, the next objects are created in Snowflake:

  • The database mwaa_db
  • The schema mwaa_schema
  • The storage integration mwaa_citibike_storage_int
  • The stage mwaa_citibike_stg

Earlier than operating the ultimate DAG, the belief relationship for the IAM person must be up to date.

  1. Log in to your Snowflake account utilizing your admin account credentials.
  2. Open your SQL worksheet created earlier and run the next command:
DESC INTEGRATION mwaa_citibike_storage_int;

mwaa_citibike_storage_int is the title of the mixing created by the DAG2 within the earlier step.

From the output, document the property worth of the next two properties:

  • STORAGE_AWS_IAM_USER_ARN – The IAM person created to your Snowflake account.
  • STORAGE_AWS_EXTERNAL_ID – The exterior ID that’s wanted to determine a belief relationship.

Now we grant the Snowflake IAM person permissions to entry bucket objects.

  1. On the IAM console, select Roles within the navigation pane.
  2. Select the position mysnowflakerole.
  3. On the Belief relationships tab, select Edit belief relationship.
  4. Modify the coverage doc with the DESC STORAGE INTEGRATION output values you recorded. For instance:
{
  "Model": "2012-10-17",
  "Assertion": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::5xxxxxxxx:user/mgm4-s- ssca0079"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "AWSPARTNER_SFCRole=4_vsarJrupIjjJh77J9Nxxxx/j98="
        }
      }
    }
  ]
}

The AWS position ARN and ExternalId can be completely different to your atmosphere primarily based on the output of the DESC STORAGE INTEGRATION question

Trust relationship

  1. Find and run the ultimate DAG (run_mwaa_datapipeline_blog).

On the finish of the DAG run, the info is prepared for querying. On this instance, the question (discovering the highest begin and vacation spot stations) is run as a part of the DAG and the output will be seen from the Airflow XCOMs UI.

Xcoms

Within the DAG run, the output can also be revealed to Amazon SNS and primarily based on the subscription, an electronic mail notification is distributed out with the question output.

Email

One other methodology to visualise the outcomes is immediately from the Snowflake console utilizing the Snowflake worksheet. The next is an instance question:

SELECT START_STATION_NAME,
COUNT(START_STATION_NAME) C FROM MWAA_DB.MWAA_SCHEMA.CITIBIKE_VW 
GROUP BY 
START_STATION_NAME ORDER BY C DESC LIMIT 10;

Snowflake visual

There are alternative ways to visualise the output primarily based in your use case.

As we noticed, DAG1 and DAG2 must be run just one time to arrange the Amazon MWAA connection and Snowflake objects. DAG3 will be scheduled to run each week or month. With this resolution, the person inspecting the info doesn’t should log in to both Amazon MWAA or Snowflake. You possibly can have an automatic workflow triggered on a schedule that can ingest the most recent information from the Citi Bike dataset and supply the highest begin and vacation spot stations for the given dataset.

Clear up

To keep away from incurring future costs, delete the AWS assets (IAM customers and roles, Secrets and techniques Supervisor secrets and techniques, Amazon MWAA atmosphere, SNS subjects and subscription, S3 buckets) and Snowflake assets (database, stage, storage integration, view, tables) created as a part of this put up.

Conclusion

On this put up, we demonstrated arrange an Amazon MWAA connection for authenticating to Snowflake in addition to to AWS utilizing AWS person credentials. We used a DAG to automate creating the Snowflake objects equivalent to database, tables, and stage utilizing SQL queries. We additionally orchestrated the info pipeline utilizing Amazon MWAA, which ran duties associated to information transformation in addition to Snowflake queries. We used Secrets and techniques Supervisor to retailer Snowflake connection info and credentials and Amazon SNS to publish the info output for finish consumption.

With this resolution, you might have an automatic end-to-end orchestration of your information pipeline encompassing ingesting, transformation, evaluation, and information consumption.

To be taught extra, check with the next assets:


Concerning the authors

Payal Singh is a Accomplice Options Architect at Amazon Internet Companies, centered on the Serverless platform. She is accountable for serving to associate and clients modernize and migrate their functions to AWS.

James Solar is a Senior Accomplice Options Architect at Snowflake. He actively collaborates with strategic cloud companions like AWS, supporting product and repair integrations, in addition to the event of joint options. He has held senior technical positions at tech corporations equivalent to EMC, AWS, and MapR Applied sciences. With over 20 years of expertise in storage and information analytics, he additionally holds a PhD from Stanford College.

Bosco Albuquerque is a Sr. Accomplice Options Architect at AWS and has over 20 years of expertise working with database and analytics merchandise from enterprise database distributors and cloud suppliers. He has helped expertise corporations design and implement information analytics options and merchandise.

Manuj Arora is a Sr. Options Architect for Strategic Accounts in AWS. He focuses on Migration and Modernization capabilities and choices in AWS. Manuj has labored as a Accomplice Success Options Architect in AWS over the past 3 years and labored with companions like Snowflake to construct resolution blueprints which are leveraged by the shoppers. Outdoors of labor, he enjoys touring, enjoying tennis and exploring new locations with household and associates.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments