Continuous data ingestion with Snowpipe Auto Ingest and Snowpipe REST API in Snowflake (AWS)

Chayan Shrang Raj
13 min readDec 16, 2023

--

As we learned about COPY INTO command in the previous article, there are some trade-offs that need to be balanced between using Snowpipe and COPY command. We will dive deeper into this in upcoming sections.

In this article, we will go over hands-on using Snowpipe and exploring its quirks in loading data into snowflake tables. So how does Snowpipe works?

A pipe is a named, first-class Snowflake object that contains a COPY statement used by Snowpipe. The COPY statement identifies the source location of the data files (i.e., a stage) and a target table.

Snowpipe loads data from files as soon as they are available in a stage. The data is loaded according to the COPY statement defined in a pipe.

Snowpipe Architecture (Image credits: Author)

Let’s connect — LinkedIn
Let’s build something — GitHub

In order to get the best out of it and help your organization save tons of money and create friendly data ingestion processes, it is crucial to understand the subtle differences between COPY and Snowpipe approaches. There are some differences in detailed semantics listed in the documentation. Definitely check it out.

COPY vs SNOWPIPE (Image credits: Author)

Snowpipe relies on the cloud vendor-specific system for event distribution, such as AWS SQS or SNS, Azure Event Grid, or GCP Pub/Sub. This setup requires corresponding privileges to the cloud account to deliver event notifications from the source bucket to Snowpipe. The following table indicates the cloud storage service support for automated Snowpipe and Snowpipe REST API calls from Snowflake accounts hosted on each cloud platform:

Image credits: Snowflake

Automated data loads utilize event notifications in cloud storage to notify Snowpipe when new data files are available. Snowpipe then transfers these files to a queue and seamlessly loads them into the target table. This process is continuous, serverless, and follows parameters specified in a designated pipe object. There are two ways Snowpipe can be utilized namely Snowpipe auto ingest and Snowpipe REST API.

Now, it is time to get out hands dirty and setup Snowpipe under the three scenarios:

  • Load files from AWS S3 bucket using Snowpipe Auto-ingest
  • Load files from AWS S3 bucket using Snowpipe REST API call
  • Load files from AWS S3 using Snowpipe, Lambda trigger automatically

I am writing for different methods because I want an understanding of different services and scenarios of how we can leverage Snowpipe. But feel free to navigate to where you find most value or better, stick till the end!

Scenario 1:
-
File format: CSV
-
Data Load Technique: SNOWPIPE AUTO INGEST
-
Source: AWS S3 bucket
-
Loading type: Continuous data load

As mentioned in the previous article, when dealing with cloud services, there has to be a secure authentication and authorization process between the Snowflake user and cloud service. Every cloud platform has similar procedure to create roles, grant read, write, delete privileges and associate that role with Snowflake.
One of the upside of creating Storage integrations for authentication is that Snowflake creates a single IAM user that is referenced by all S3 storage integrations in your Snowflake account such that several external stage objects can reference different buckets and paths and use the same storage integration for authentication.
We will carry on with the integration created in the last post ‘aws_snowflake’.

Image credits: Author

Follow this link to create an AWS IAM role and attach required policies to it. After, that we need to setup the message queue notification service for the target S3 buckets to allow Snowflake to trigger COPY event when there is a new file in the S3 bucket.

In the diagram below, we can see a general idea of the automatic data ingestion flow and interaction between Snowflake and AWS:

Image credits: Snowflake

Typical ingestion process follows the below mentioned steps:

  1. Data files are loaded in an external named stage. The stage is created with proper authentication for S3, url for source S3 bucket and other parameters.
  2. When a new file lands in the S3 bucket, an S3 event notification is triggered and sent to Snowpipe via an AWS SQS service. Snowflake puts the metadata (Important: not the original files, only the metadata information) of the files in the queue that are ready to load.
  3. A Snowflake-managed virtual warehouse loads data from the queued files using the COPY INTO command into the target table based on parameters defined in the specified pipe.

Currently, there are no files in the S3 bucket but it will filled with customer_csv files that we used in previous article.

NOTE: Cloud providers charge for data transferred out of their own network. To recover these expenses, Snowflake charges a per-byte fee when you unload data from Snowflake (hosted on AWS, GCP, or Microsoft Azure) into an external stage in a different region or different cloud provider. Snowflake does not charge for data ingress (i.e. when loading data into Snowflake).

NOTE: Currently, accessing cloud storage in a government region using a storage integration is limited to Snowflake accounts hosted in the same government region.

NOTE: For security reasons, if you create a new storage integration (or recreate an existing storage integration using the CREATE OR REPLACE STORAGE INTEGRATION syntax), the resulting integration has a different external ID and so it cannot resolve the trust relationship unless the trust policy is modified.

Following the creation of your cloud storage integration and referencing that integration in the specified stage, we create a PIPE object as shown below:

Image credits: Author

The ‘aws_pipe’ has the automatic data ingestion ability with AUTO_INGEST=TRUE and copies the data to ‘customer_csv’ table defined under custom schema SNOWPIPE_AWS inside SNOWPIPE_AWS_DATABASE database, which then loads the data from ‘AWS_LOAD’ stage object that is referencing the external S3 bucket on AWS.

For ease of use, Snowpipe SQS queues are created and managed by Snowflake. The SHOW PIPES command output displays the Amazon Resource Name (ARN) of your SQS queue. Note the ARN of the SQS queue for the stage in the notification_channel column. Copy the ARN to a convenient location.

Next step is to configure an event notification for your AWS S3 bucket. Follow this link to create it. Complete the fields as follows:

  • Name: Name of the event notification.
  • Events: Select the ObjectCreate (All) option.
  • Destination: Select SQS Queue from the radio buttons.
  • SQS: Select Add SQS queue ARN from the radio buttons.
  • SQS queue ARN: Paste the SQS queue ARN (notification channel) from the SHOW PIPES output.

Snowpipe with auto-ingest is now configured!

When new data files are added to the S3 bucket, the event notification informs Snowpipe to load them into the target table defined in the pipe.

It’s time to load a csv file in the predefined S3 bucket and check the customer_csv table. I will load ‘customer_1.csv’ file in the S3 bucket. Since, this process has a temporal factor, I will display output with each timestamp to monitor the process completion.

NOTE: Working with timestamps could be a tricky part since based on your region, your timezone may differ from Snowflake and could be difficult to adjust the mindset accordingly. Amazon does not allow you to change the timezone for your account or in our case for our S3 buckets. So, to tackle that issue, Snowflake allows changing the timezone for your current session or even the whole account. I changed mine like this:

Image credits: Author

Now, I have the same timezone for both AWS and Snowflake but because of day light saving, it shows 1 hour behind in Snowflake but that’s not an issue.

Note the time this file was loaded into the S3 bucket.

Image credits: Author

Then, we check the status of our pipe by using the command:

SELECT SYSTEM$PIPE_STATUS( 'SNOWPIPE_AWS_DATABASE.SNOWPIPE_AWS.AWS_PIPE' );

Keeping in mind the one hour difference in timestamps, the pipe object returns the following JSON object:

{“executionState”:”RUNNING”,”pendingFileCount”:0,”lastIngestedTimestamp”:”2023–12–11T10:49:05.953Z”,”lastIngestedFilePath”:”customer_1.csv”,”notificationChannelName”:”arn:aws:sqs:us-east-1:046927018749:sf-snowpipe-AIDAQV3IRPL64U3KQ62EU-x_EgStMgw3jOeWfe6YQC2g”,”numOutstandingMessagesOnChannel”:1,”lastReceivedMessageTimestamp”:”2023–12–11T10:49:05.78Z”,”lastForwardedMessageTimestamp”:”2023–12–11T10:49:06.205Z”,”lastPulledFromChannelTimestamp”:”2023–12–11T10:51:30.752Z”,”lastForwardedFilePath”:”sfpipe-data/customer_1.csv”}

We can see some of the parameters above like:

1. lastIngestedTimestamp
Timestamp when the most recent file was loaded successfully by Snowpipe into the destination table.

2. lastIngestedFilePath
Path of the file loaded at the timestamp specified in lastIngestedTimestamp.

This indicates that our data has been successfully loaded into the ‘customer_csv’ table and if we now check the load history we can also see the same:

Image credits: Author

Hooray! Any data files loaded into the S3 bucket will automatically get ingested using Snowpipe.

Scenario 2:
-
File format: CSV
-
Data Load Technique: SNOWPIPE REST API
-
Source: AWS S3 bucket
-
Loading type: Continuous data load

There are two options to call Snowpipe REST API to load data into Snowflake tables.

Option 1: Using a Client to Call the REST API
Use a client to call the REST API such as Java and Python SDK.
- Call a REST endpoint with a list of files to load when staged.
- Retrieve the load history.

Option 2: Using AWS Lambda to Call the REST API
Automate Snowpipe by using an AWS Lambda function to call the REST API. A Lambda function can call the REST API to load data from files stored in Amazon S3 only.
- Create an AWS Lambda function that calls the Snowpipe REST API to load data from your external (i.e. S3) stage .
- Retrieve the load history.

I would be working with AWS Lambda function which is a serverless compute function that can run your code when triggered by an event and executes code that has been loaded into the system.

The workflow is as below:

  1. Create a named stage object where your data files will be staged. Snowpipe supports both internal (Snowflake) stages and external stages, i.e. S3 buckets.
  2. Create a pipe object using CREATE PIPE.
  3. Configure security for the user who will execute the continuous data load. If you plan to restrict Snowpipe data loads to a single user, you only need to configure key pair authentication for the user once. After that, you only need to grant access control privileges on the database objects used for each data load.
  4. Install a client SDK (Java or Python) for calling the Snowpipe public REST endpoints.

We already have managed to complete step 1 and step 2. Now, we create proper security using key pair authentication for Snowpipe REST endpoints. You need to create RSA based public and private keys and assign your snowflake user with the public key as mentioned in this link.

NOTE: Use your command prompt (Windows) or Terminal (MacOS) to create the keys and not Powershell. Command prompt has proper OpenSSH configurations and you can test the validity of your keys also by following this.

Once the key pair authentication is up and running, we then creare Snowpipe REST endpoints require key pair authentication with JSON Web Token (JWT). JWTs are signed using a public/private key pair with RSA encryption. And with every API request you would need an authorization token to establish the connection with your Snowflake account. You can create JWTs for each session as follows and they are valid for 1 hour, it could be managed but it is the default. We can also create the JWT token with Python but this is actually very straight forward.

snowsql --private-key-path rsa_key.p8 --generate-jwt -a <Snowflake account> -u <user>

Now we will invoke the Snowpipe REST API using Postman. Postman is an API platform for building and using APIs.

Submit a request to the insertFiles REST endpoint to load the staged data files. This call does not actually load the data, it triggers the pipe run that loads the data from the internal or external stage. Using the below format as mentioned here, we can see the external staged ‘customer_1.csv’ file stored in S3 bucket.

https://{account}.snowflakecomputing.com/v1/data/pipes/{pipeName}/insertFiles?requestId={requestId}

Image credits: Author

API request body with the following attributes:

account
Account identifier for your Snowflake account.

pipeName
Case-sensitive, fully-qualified pipe name.

requestId
String used to track requests through the system. We recommend providing a random string with each request, e.g. a UUID.

Scenario 3:
-
File format: CSV
-
Data Load Technique: SNOWPIPE REST API — AWS LAMBDA TRIGGER
-
Source: AWS S3 bucket
-
Loading type: Continuous data load

Creating a Lambda function that calls the Snowpipe REST API to load data from an external stage. The function is deployed in a serverless manner to the AWS account, where it is hosted. Events you define in Lambda (e.g. when files in your S3 bucket are updated) invoke the Lambda function and run the Python code.

You can choose whatever development environment you like, I shall be working with EC2 instance as I will also sharpen my skills a bit by working in Linux environment. The sample python code is as below:

from __future__ import print_function
from snowflake.ingest import SimpleIngestManager
from snowflake.ingest import StagedFile
from requests import HTTPError
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import PrivateFormat
from cryptography.hazmat.primitives.serialization import NoEncryption
from cryptography.hazmat.backends import default_backend

import os

with open("./rsa_key.p8", 'rb') as pem_in:
pemlines = pem_in.read()
private_key_obj = load_pem_private_key(pemlines,
os.environ['PRIVATE_KEY_PASSPHRASE'].encode(),
default_backend())

private_key_text = private_key_obj.private_bytes(
Encoding.PEM, PrivateFormat.PKCS8, NoEncryption()).decode('utf-8')
# Assume the public key has been registered in Snowflake:
# private key in PEM format

# List of files in the stage specified in the pipe definition
ingest_manager = SimpleIngestManager(account='<account_identifier>',
host='<account_identifier>.snowflakecomputing.com',
user='<user_login_name>',
pipe='<db_name>.<schema_name>.<pipe_name>',
private_key=private_key_text)

def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']

print("Bucket: " + bucket + " Key: " + key)
# List of files in the stage specified in the pipe definition
# wrapped into a class
staged_file_list = []
staged_file_list.append(StagedFile(key, None))

print('Pushing file list to ingest REST API')
resp = ingest_manager.ingest_files(staged_file_list)

The code above needs to be modified based for different paramerters such as: private_key, account=’<account_identifier>’, host=’<account_identifier>.snowflakecomputing.com’, user=’<user_login_name>’, pipe=’<db_name>.<schema_name>.<pipe_name>’,
staged_file_list = [] —
The path you specify must be relative to the stage where the files are located. Include the complete name for each file, including the file extension.

Next, we create a lambda deployment package by following this link. Since the Python code needs some dependencies and we will pack them all in one zip and upload to the lambda function.

NOTE: I have faced the issue of different Lambda runtime requirements that the Python code and this mismatch will cause the code to break. The reason for this is described in this link. The error will be as follows:

Unable to import module 'lambda_function': /lib64/libc.so.6: version 'GLIBC_2.28'

This error arises due to a compatibility issue between the version of the GNU C Library (GLIBC) required by the Python connector and the one available in the AWS Lambda execution environment.

The Snowflake Python Connector utilizes the shared libraries from the GLIBC library. These shared libraries must be compatible with those present in the execution environment where your AWS Lambda function is running.

In this case, the Python connector may use the manylinux2014_x86_64 platform, which requires GLIBC_2.28 or later. However, the AWS Lambda execution environment is based on Amazon Linux, which only supports up to GLIBC_2.17. This mismatch causes the error mentioned above.

So whenever you install any library for this tutorial for the lambda function try to run it this way:

pip3 install \
--platform manylinux2010_x86_64 \
--implementation cp \
--only-binary=:all: --upgrade \
--target <your_env>/lib/python<version>/site-packages/ \
<package_name>

Now create the lambda function with proper IAM roles and add permissions with S3 buckets to identify and trigger the lambda function.

aws lambda create-function \
--region <your region> \
--function-name <custom function name> \
--zip-file fileb://~/Snowpipe.zip \
--role arn:aws:iam::<aws_account_id>:role/lambda-s3-execution-role \
--handler SnowpipeLambdaCode.handler \
--runtime python3.6 \
--profile adminuser \
--timeout 10 \
--memory-size 1024

and add permission for S3 bucket.

aws lambda add-permission \
--function-name IngestFile \
--region us-west-2 \
--statement-id enable-ingest-calls \
--action "lambda:InvokeFunction" \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::<SourceBucket> \
--source-account <aws_account_id> \
--profile adminuser

This will create the function as follows with your S3 bucket trigger.

Image credits: Author

In order for this to work you need to create an event notifcation that will trigger the lambda function as soon any data file lands in S3:

Image credits: Author

Here I have created one SQS event, you can also create multiple message queue with SNS and also add AWS eventBridge.

In order to see what is going on under the hood, we would want to leverage the logs which you can access by going to AWS cloudWatch and under log groups you will see your lambda function.

Image credits: Author

Next, I will upload a file in my S3 bucket configured with ‘ingestFile’ lambda function and then head back to see the function trigger and python code run as you can see clearly below:

Image credits: Author

You can see when I upload the file at 18:05, the lambda function triggered and ran successfully as it reached the end of the code and printed the message ‘Pushing file list to ingest REST API’ as implemented in code above.

And this is how we do it. This is a building block and in real scenarios there could be a lot of complexities involved but the basics are strong.

Thanks for reading!

Let’s connect — LinkedIn
Let’s build something — GitHub

--

--

Chayan Shrang Raj
Chayan Shrang Raj

Written by Chayan Shrang Raj

Learning by writing. Writing by experiencing. Experiencing by taking action. GitHub - https://github.com/chayansraj

No responses yet