Snowpark for Python First Impressions26.06.2022
Loading JSON files that are larger than 16MB using Snowpark for Python
As of this writing (June 2022), the long awaited Snowpark for Python has finally been released as a public preview feature in Snowflake. I had a use case waiting to be tested for a while now so as soon as the feature was publicly available, I jumped straight in.
Here I will explain how I set up the environment, my first approach that didn’t work (and why) and my second approach that worked.
We have JSON files in Azure blob storage that are larger than 16MB which is the upper limit for a VARIANT column in Snowflake. Since our JSON files have a nested structure, we can’t use the STRIP_OUTER_ARRAY option with the COPY INTO command while writing to the target table. We have to parse the JSON file first to extract the inner array from the nested structure.
As I was searching for a solution, the article Breaking the 16 MB Limit (Kinda) by Brad McNeely came up. In the article, the author provides a solution using Java User Defined Table Functions. I thought I would try to implement a similar solution using Snowpark for Python.
Snowpark for Python: Set up the environment
According to the Snowflake documentation at the time of this writing (June 2022), Python 3.8 is required. This is a bit of a bummer, since I already had a newer version of Python installed on my computer, but nothing that couldn’t be overcome, I just had to install the required version and point my development environment to this version.
The packages inside Snowflake are provided by Anaconda and as such I had to accept the Snowflake Third Party Terms in my Snowflake account. This can be done only by a user with the ORGADMIN role so be sure that you either have that role granted to you or find the person who does so that the terms are accepted before starting any development.
I set up my Python virtual environment using virtualenv. Alternatively, I could have used Anaconda or Miniconda. In fact, Snowflake recommends using the conda environment on your local machine for development and testing. This allows you to take advantage of the Snowflake channel within Anaconda which mirrors a subset of the packages and versions that are supported in the Snowflake Python UDF environment as described in the Snowflake documentation in the chapter Using Third-Party Packages.
I set up my Python virtual environment using virtualenv, again avoiding setting up a virtual environment with Anaconda. For my IDE, I chose to use Visual Studio Code, but other popular IDEs can be used, as well as Jupyter Notebooks, depending on your preference.
Finally, with the preparation done, I was able to install the Snowpark Python library with Pandas using pip:
pip install "snowflake-snowpark-python[pandas]"
Next, as described in the documentation, I created a dictionary named connection_parameters where I provided my connection parameters, including my Snowflake account, username, password, role, warehouse, database, and schema. Using this dictionary I created a session named mysession:
from snowflake.snowpark import Session mysession = Session.builder.configs(connection_parameters).create()
The real work could now begin.
First approach that works only for files smaller than 16MB
As stated in my use case, we have JSON files with nested structures in Azure blob storage, some of which are larger than 16MB. A stage named mystage that points to the Azure blob storage location has been created in Snowflake. In my Snowpark application, I listed all the files in the stage and returned the result into a data frame named stage_files_df:
# get list of all files in stage into a data frame stage_files_df = mysession.sql("list @mystage") stage_files_df.show()
The resulting data frame stage_files_df is a Snowflake data frame (not to be confused with a Pandas data frame) that represents a relational dataset that is evaluated lazily: it only executes when a specific action is triggered. In my example, I’m using the show() method which is one of the action methods that will force the data frame to be evaluated.
I want to select the “name” column from the resulting data frame to create a list named file_names_list of the files in the stage. To do this, I use the select() method to specify the column name and the collect() method as the action method:
# take the "name" column from the data frame into a list file_names_list = stage_files_df.select(col('"name"')).collect()
Then I loop through the list using the usual Python syntax:
for one_file in file_names_list:
Each element of the list named one_file is of the data type <class ‘snowflake.snowpark.row.Row’> and contains values that look like this:
To parse out the file name into a variable named one_file_name, I convert the one_file variable into a dictionary using the asDict()method and select the name tag (alternatively I could have parsed out the file name using regexp or other string manipulation methods):
one_file_name = one_file.asDict()['name']
I can now read the contents of the file. In the file, I have a nested JSON and I want to extract the “lines” tag into a data frame named df_lines. To do this, I use the read.json() and select() methods:
df_lines = session.read.json("@mystage/" + one_file_name.select(sql_expr("$1:lines"))
Here is where we have a problem. First we have to understand that the resulting data frame named df_lines is a Snowflake data frame. As explained earlier, a Snowflake data frame represents a relational dataset. Basically this means that it is stored as an object in Snowflake, limited by the same micro-partition size of 16MB as any other object in Snowflake. If the file is smaller than 16MB, then the above statement works. But if the file is larger than 16MB, then the above statement fails, because the entire contents of the JSON file is stored in a single VARIANT column which can’t store more than 16MB.
Just for the sake of the exercise, if the file size is less than 16MB I can continue with the steps as below. First, I have to convert the Snowflake data frame to a Pandas data frame, because the Snowflake data frame has very few methods for data manipulation, while a Pandas data frame has many. From the Pandas data frame, I extract the first column and the first row, which is my VARIANT column representing all the contents of the file in one piece. I convert this to JSON using the json.loads() method, which gives me a JSON object that is now separated into multiple lines. I can convert this object back to a Pandas data frame using the json_normalize() method. Finally, I convert the Pandas data frame to a Snowflake data frame that can be saved to a Snowflake table named all_lines using the save_as_table() method. Here is the code for these steps:
# convert the previous Snowflake data frame to a Pandas data frame df_lines_pandas = df_lines.to_pandas() # extract the first element of the Pandas data frame one_column = df_lines_pandas.iloc# create a new Pandas data frame that contains each record individually df_lines_all_pandas = json_normalize(json.loads(one_column)) # convert the Pandas data frame to a Snowflake data frame df_lines_all = session.create_dataframe(df_lines_all_pandas) # write the Snowflake data frame to a table in Snowflake df_lines_all.write.mode("overwrite").save_as_table("all_lines")
As mentioned earlier, this code works only for files that are less than 16MB in size. For larger files, I still had to look for another way that doesn’t try to read the entire contents of the file into a VARIANT column.
One way that would work would be to read the file as a character string using the basic Python file open() and read() methods. This can be done using a Snowpark Stored Procedure and will be explained in the next section.
Second approach using a Snowpark Stored Procedure
Continuing from just before the point of failure from above, once I have parsed out the file name into a variable named one_file_name from the list of all files in the stage, I will take an alternate approach:
one_file_name = one_file.asDict()['name']
I will add the file to the internal stage that will be accessed by the stored procedure that I will create later:
# add the current file to the internal stage session.add_import("@mystage/" + one_file_name)
I also have to add the packages that I intend to use in the stored procedure that I will create later:
# add the packages that will be used by the stored procedure session.add_packages(["pandas", "snowflake-snowpark-python"])
Next, I will create a stored procedure using the \@sproc decorator:
\@sproc(name=”read_file”, is_permanent=True, stage_location=”@mystage“, replace=True, packages=[“pandas”, “snowflake-snowpark-python”])
I will be creating a permanent stored procedure, which means that the stored procedure will be available in Snowflake to be used even after the current session has ended. Below is the code of the stored procedure, which is actually written as a Python function, but due to the decorator used previously, it will also be created as a stored procedure in Snowflake:
def read_file(session: snowflake.snowpark.Session, one_file_name: str) -> str: import pandas as pd from pandas import json_normalize import sys IMPORT_DIRECTORY_NAME = "snowflake_import_directory" import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME] # open the file with open(import_dir + one_file_name, 'r') as file: # read the file into a character variable and convert to json data_json = json.loads(file.read()) # extract the 'lines' tag and convert to a Pandas data frame df_lines_pandas = json_normalize(data_json['lines']) # convert the Pandas data frame to a Snowflake data frame df_lines = session.create_dataframe(df_lines_pandas) # write the Snowflake data frame to a table in Snowflake df_lines.write.mode("overwrite").save_as_table("all_lines") return 'Success'
Note that for the sake of simplicity, there is no error handling implemented in the procedure, although it can be included straightforward using try and except blocks just like usual in Python.
The parameters that are passed to the stored procedure are the current session context and the one_file_name variable. The stored procedure must define the import_dir location which defines the location of the file that will be read by the procedure as explained in the documentation in the section Reading Files from a UDF.
Python open() and read() methods are used to read the contents of the file into a character variable. The following steps in the stored procedure are similar as the steps in the previous example. First, the character variable is converted to JSON using the json.loads() method, then the tag ‘lines’ is extracted and the result converted to a Pandas data frame using the json_normalize() method. Finally, the Pandas data frame is converted to a Snowflake data frame and the results written to a table in Snowflake.
After the stored procedure is created, we can call it from the Python session using the SQL call command:
res = session.sql("call read_file('" + one_file_name + "')")
This now works for files of all sizes, including those that are larger than 16MB because the contents of the file are first read into a Python character variable which doesn’t impose such size limitations. This character variable is then transformed into a data frame and subsequently written to a table in Snowflake.
Of course there are many ways to load JSON files with nested structures that are larger than 16MB into Snowflake, including — but not limited to — the java method by Brad McNeely as mentioned earlier, or writing Azure functions, or using the azure-storage-blob library in Python and more.
Snowpark for Python was designed primarily for data scientists to use with scoring machine learning models and as such may not be the easiest or most efficient approach for this use case, although it does get the job done. Still, implementing this use case allowed me to learn the intricacies of Snowpark for Python and it was a valuable exercise for future reference, especially in terms of understanding the types of use cases where Snowpark for Python could be used to its full potential.
Senior Consultant and Snowflake Data Superhero