Transitioning from development to production
In this article, we'll walk through how to transition your data pipelines from local development to staging and production deployments.
Let's say we’ve been tasked with fetching the N most recent entries from Hacker News and splitting the data into two datasets: one containing all of the data about stories and one containing all of the data about comments. In order to make the pipeline maintainable and testable, we have two additional requirements:
- We must be able to run our data pipeline in local, staging, and production environments.
- We need to be confident that data won't be accidentally overwritten (for example, because a user forgot to change a configuration value).
Using a few Dagster concepts, we can easily tackle this task! Here’s an overview of the main concepts we’ll be using in this guide:
- Assets - An asset is a software object that models a data asset. The prototypical example is a table in a database or a file in cloud storage.
- Resources - A resource is an object that models a connection to a (typically) external service. Resources can be shared between assets, and different implementations of resources can be used depending on the environment. For example, a resource may provide methods to send messages in Slack.
- I/O managers - An I/O manager is a special kind of resource that handles storing and loading assets. For example, if we wanted to store assets in S3, we could use Dagster’s built-in S3 I/O manager.
- Run config - Assets and resources sometimes require configuration to set certain values, like the password to a database. Run config allows you to set these values at run time. In this guide, we will also use an API to set some default run configuration.
Using these Dagster concepts we will:
- Write three assets: the full Hacker News dataset, data about comments, and data about stories.
- Use Dagster's Snowflake I/O manager to store the datasets in Snowflake.
- Set up our Dagster code so that the configuration for the Snowflake I/O manager is automatically supplied based on the environment where the code is running.
Setup
To follow along with this guide, you can copy the full code example and install a few additional pip libraries:
dagster project from-example --name my-dagster-project --example development_to_production
cd my-dagster-project
pip install -e .
Part one: Local development
In this section we will:
- Write our assets
- Add run configuration for the Snowflake I/O manager
- Materialize assets in the Dagster UI
Let’s start by writing our three assets. We'll use Pandas DataFrames to interact with the data.
# assets.py
import pandas as pd
import requests
from dagster import Config, asset
class ItemsConfig(Config):
base_item_id: int
@asset(
io_manager_key="snowflake_io_manager",
)
def items(config: ItemsConfig) -> pd.DataFrame:
"""Items from the Hacker News API: each is a story or a comment on a story."""
rows = []
max_id = requests.get(
"https://hacker-news.firebaseio.com/v0/maxitem.json", timeout=5
).json()
# Hacker News API is 1-indexed, so adjust range by 1
for item_id in range(max_id - config.base_item_id + 1, max_id + 1):
item_url = f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
rows.append(requests.get(item_url, timeout=5).json())
# ITEM_FIELD_NAMES is a list of the column names in the Hacker News dataset
result = pd.DataFrame(rows, columns=ITEM_FIELD_NAMES).drop_duplicates(subset=["id"])
result.rename(columns={"by": "user_id"}, inplace=True)
return result
@asset(
io_manager_key="snowflake_io_manager",
)
def comments(items: pd.DataFrame) -> pd.DataFrame:
"""Comments from the Hacker News API."""
return items[items["type"] == "comment"]
@asset(
io_manager_key="snowflake_io_manager",
)
def stories(items: pd.DataFrame) -> pd.DataFrame:
"""Stories from the Hacker News API."""
return items[items["type"] == "story"]
Now we can add these assets to our Definitions
object and materialize them via the UI as part of our local development workflow. We can pass in credentials to our SnowflakePandasIOManager
.
# definitions.py
from dagster_snowflake_pandas import SnowflakePandasIOManager
from dagster import Definitions
from development_to_production.assets.hacker_news_assets import comments, items, stories
# Note that storing passwords in configuration is bad practice. It will be resolved later in the guide.
resources = {
"snowflake_io_manager": SnowflakePandasIOManager(
account="abc1234.us-east-1",
user="me@company.com",
# password in config is bad practice
password="my_super_secret_password",
database="LOCAL",
schema="ALICE",
),
}
defs = Definitions(assets=[items, comments, stories], resources=resources)
Note that we have passwords in our configuration in this code snippet. This is bad practice, and we will resolve it shortly.
This results in an asset graph that looks like this:
We can materialize the assets in the UI and ensure that the data appears in Snowflake as we expect:
While we define our assets as Pandas DataFrames, the Snowflake I/O manager automatically translates the data to and from Snowflake tables. The Python asset name determines the Snowflake table name. In this case three tables will be created: ITEMS
, COMMENTS
and STORIES
.
Part two: Deployment
In this section we will:
- Modify the configuration for the Snowflake I/O manager to handle staging and production environments
- Discuss different options for managing a staging environment
Now that our assets work locally, we can start the deployment process! We'll first set up our assets for production, and then discuss the options for our staging deployment.