Integrating Google Dataproc with Snowflake: A Step-by-Step Guide
19.06.2024Introduction
Google Dataproc is a fully managed cloud service on Google Cloud Platform (GCP) that simplifies big data processing. It utilizes popular open-source frameworks like Apache Spark and Hadoop, allowing users to process and analyze large datasets efficiently. With Dataproc, users can quickly create clusters, automate tasks, and integrate with other GCP services, making it a valuable tool for data engineers and scientists. Integrating Google Cloud’s Dataproc with Snowflake allows organisations to harness the power of both platforms for efficient data processing and storage. In this article, I will show how to prepare an external Snowflake access integration to access the Google Dataproc API (https://cloud.google.com/dataproc/docs/reference/rest).
Creating Snowflake external access integration
The first thing we need to do is create a network rule (https://docs.snowflake.com/en/sql-reference/sql/create-network-rule) to represent the external network location.
CREATE OR REPLACE NETWORK RULE google_dataproc_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('dataproc.googleapis.com');
After that, we will create a security integration to hold the OAuth credentials required to authenticate the external network location specified by the ‘google_dataproc_network_rule’ network rule.
CREATE OR REPLACE SECURITY INTEGRATION google_dataproc_oauth
TYPE = API_AUTHENTICATION
AUTH_TYPE = OAUTH2
OAUTH_CLIENT_ID = 'my-client-id'
OAUTH_CLIENT_SECRET = 'my-client-secret'
OAUTH_TOKEN_ENDPOINT = 'https://oauth2.googleapis.com/token'
OAUTH_AUTHORIZATION_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth'
OAUTH_ALLOWED_SCOPES = ('https://www.googleapis.com/auth/cloud-platform')
ENABLED = TRUE;
As you can see in the command, we are missing ‘my-client-id’ and ‘my-client-secret’. To get these two, we need to create an OAuth 2.0 Client ID in GCP. To do that, we need to follow these steps:
- Go to the Google Cloud Console.
- Navigate to “APIs & Services” > “Credentials”.
- Click on “Create Credentials” and select “OAuth Client IDs”.
- If prompted, set up the consent screen. Here, you need to add scopes to grant full access to all GCP services (including Dataproc): add the https://www.googleapis.com/auth/cloud-platform scope.
- The application type you should choose is “Web application”
- After that, specify the name of the OAuth 2.0 client and then SAVE
Here, we will skip the part about setting the redirect URI for now because we will find it later. Now that we have created the OAuth 2.0 Client, you will get a pop-up screen with information about “Client ID” and “Client Secret.” It is important to save this information now since you cannot get the Secret after that. Once you have saved the credentials, let’s go back to Snowflake, replace the missing parameters, and then run the command.
After creating the SECURITY INTEGRATION we need to create a secret to represent the credentials contained by ‘google_dataproc_oauth’. The secret must specify a refresh token with its OAUTH_REFRESH_TOKEN parameter. To obtain a refresh token from the service provider (in this case, from the Google Cloud Translation API service), you can use a way the provider offers or use Snowflake system functions. I will show you how to do this with Snowflake system functions.
Let’s first run the following command:
CREATE OR REPLACE SECRET oauth_token
TYPE = oauth2
API_AUTHENTICATION = google_dataproc_oauth;
This will hold the oauth2 token, that will be used in authenticating to Google Cloud APIs.
After that, execute the SYSTEM$START_OAUTH_FLOW function to retrieve a URL with which you can obtain a refresh token, specifying as its argument the name of the secret you created previously.
CALL SYSTEM$START_OAUTH_FLOW( 'my_db.my_schema.oauth_token' );
The function will generate a URL you can use to complete the OAuth consent process. Before we can run this URL, we need to set up a redirect URI in OAuth 2.0 Client ID in GCP. We will get the redirect URI from the URL the command generated. The generated URL will look something like this:
From this link, we can see our redirect URI is:
https%3A%2F%2Fapps-api.c1.europe-west2.gcp.app.snowflake.com%2Foauth%2Fcomplete-secret
Which is URL-encoded and should look like this:
https://apps-api.c1.europe-west2.gcp.app.snowflake.com/oauth/complete-secret
Now that we have our redirect URI, we need to go back to GCP and add it to our OAuth 2.0 Client ID.
Once we have added the redirect URI in GCP, we can click on the link provided by the SYSTEM$START_OAUTH_FLOW function. We will then be redirected to the OAuth2 consent process. When you’ve finished, leave the browser open to the last page of the process. From the browser address bar, copy all the text after the question mark in the URL of the last page of the consent process.
Execute the SYSTEM$FINISH_OAUTH_FLOW function, specifying the parameters you just copied from the browser address bar as an argument. This updates the secret with a refresh token. Be sure to execute SYSTEM$FINISH_OAUTH_FLOW in the same session as SYSTEM$START_OAUTH_FLOW. SYSTEM$FINISH_OAUTH_FLOW will update the secret you specified in SYSTEM$START_OAUTH_FLOW with the access token and refresh token it obtained from the OAuth server.
CALL SYSTEM$FINISH_OAUTH_FLOW( 'state=<remaining_url_text>' );
After that, we can create an external access integration using the network rule and secret.
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION google_apis_access_integration
ALLOWED_NETWORK_RULES = (google_dataproc_network_rule)
ALLOWED_AUTHENTICATION_SECRETS = (oauth_token)
ENABLED = TRUE;
Executing Google Dataproc job using Snowflake UDF
Once we have completed these steps, the next steps will be straightforward. Assuming you have all services set up on GCP for running Dataproc jobs you can prepare a Snowflake UDF that will run a Dataproc job:
CREATE OR REPLACE FUNCTION google_dataproc_python(json_body VARCHAR)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'run_dataproc_job'
EXTERNAL_ACCESS_INTEGRATIONS = (google_apis_access_integration)
PACKAGES = ('snowflake-snowpark-python','requests')
SECRETS = ('cred' = oauth_token )
AS
$$
import _snowflake
import requests
import json
session = requests.Session()
def run_dataproc_job(cluster, job, json_parameters):
token = _snowflake.get_oauth_access_token('cred')
url = "https://dataproc.googleapis.com/v1/projects/{your_project}/regions/europe-west2/jobs:submit/"
data = json.loads(json_body)
response = session.post(url, json = data, headers = {"Authorization": "Bearer " + token})
return response.json()
$$;
This function executes the Dataproc job in GCP. We need to remember that oauth_token is not refreshed automatically, meaning that once it expires, you need to go through the Snowflake oauth flow once again.
By following these steps, you can unlock the combined power of Google Dataproc and Snowflake, allowing you to orchestrate complex data pipelines and streamline your big data processing workflows.
Senior Developer