Dabbling with Dagster - Part 3
Software Defined Assets and wrap-up
I recently had a look at Fivetran’s new transformations feature, which actually contains a limited scope orchestrator (focused on Fivetran syncs and dbt models). I was impressed: it implements Software Defined Assets as the default way of operating. You choose the dbt model you want materialised and it will either run its upstream dependencies with the Fivetran syncs on the cadence they normally run, or it can force those syncs to run at the time you want to schedule your asset to be refreshed.
Fivetran’s transformation feature did a great job of linking the syncs with dbt transformations, if you had used their packages in your dbt repository. One problem with this was that there was no way of automatically limiting the scope of the Fivetran sync. Some of the larger Fivetran syncs (like Hubspot) load many tables into your DWH - you pay per active row with Fivetran, so any extra inserts that you don’t need cost unnecessary money. You may have many different assets that you want to refresh on different schedules, which have upstream dependencies on different tables from one Fivetran sync - it’s not possible to limit the scope at the Fivetran sync level.
Thanks for reading davidj.substack! Subscribe for free to receive new posts and support my work.
Thinking about what I would want to happen in Dagster, while using Software Defined Assets:
I would import all of the Fivetran synced tables as individual assets.
Plus all of my dbt models as individual assets - Dagster is already able to automatically parse dependencies from compiling or a manifest file.
Either automatically (preferable but unlikely) or through defining it as code, the dependencies between Fivetran assets and dbt assets will be in my Dagster DAG. As Dagster lets you use Python, I wonder if I could define the dependencies in a loop (if Dagster asset name = dbt asset name then dbt asset name depends on Fivetran asset name).
Then I will be able to schedule specific dbt models as the assets I want refreshed in a Dagster job, which, when run, will use the dependencies in my Dagster DAG to run ONLY the needed upstream dbt models and sync ONLY the needed upstream Fivetran tables (subset of a whole Fivetran sync).
I’m aware that the chances of this working out exactly the way I’d want are very unlikely!
I noticed that in many of the example repos for Dagster with dbt core, there were whole dbt projects contained inside the Dagster repository. How does this keep in sync with the original dbt repository? I first tried to avoid this pattern by using GitPython to clone the repository on initialisation in Dagster, but this didn’t work so I gave up and tried just copying it in… finally, after dealing with mysterious non-recognition by Git from me doing this and dealing with errors in Dagster Cloud builds, I came across an error saying I had brought in a Git submodule improperly! Once I researched it, I realised this was one way I could bring the dbt repo into my Dagster one, and it has the ability to stay in sync. These are some of the nuances that a SWE would know without thinking, but a data person, like me, may struggle with.
With some tinkering (re-adding the submodule so the files showed up, pip installing dbt snowflake adapter, running dbt deps… 😅 - I’ll have to figure out how to get these things to happen in the Dagster Cloud build if I want to use this in prod), I managed to get the dbt core assets to show up in the lineage I have in Dagster. I didn’t have to do the loop I described above to define the dependencies between Fivetran and dbt assets - it understood them from the naming, automatically 🪄.
After speaking to some Dagster folks, they strongly encouraged me to avoid using git submodules as they cause “unmitigated pain and rage”. They helped me set up GitHub actions instead to achieve what I had previously with git submodules (with the added benefit of it working consistently), here’s the PR to see this and everything else I built in this post.
Let’s see what happens when I click materialize:
dagster._core.errors.DagsterInvalidSubsetError: When building job, the AssetsDefinition 'fivetran_sync_sociological_administrator' contains asset keys [AssetKey(['hubspot', 'calendar_event']), AssetKey(['hubspot', 'calendar_topic']), …Plus many more
…..AssetKey(['hubspot', 'owner'])]. This AssetsDefinition does not support subsetting. Please select all asset keys produced by this asset.
What I understand this to mean, is that the whole Fivetran sync for Hubspot must run - it’s not possible to only run specific table syncs from inside it dynamically, based on your DAG. I had thought it was unlikely to be possible, but would have been nice if it had worked (I hope some Fivetran product folks are reading… it would be good to not have to specifically choose the tables synced in the Fivetran UI and have a dynamic option based on the call from the orchestrator).
Let’s make a job which updates deal_flow_enriched, which was my target asset above, to see if it deals with the issue by automatically choosing to run the whole Fivetran sync… it does not. Same issue occurs.
Let’s add the whole Fivetran Hubspot sync to the job. Now it works!
I’ve used the ‘|’ operator on the AssetSelections to choose the whole upstream lineage of deal_flow_enriched, plus the whole Hubspot asset group, which includes all of the Fivetran table syncs and therefore doesn’t cause subsetting. However, it does mean syncing more data than I need and running more dbt models than I need.
Sean from Dagster also helped me setup GitHub actions in my dbt project which triggers a branch deployment build on my Dagster repository upon a dbt PR, thus testing if a dbt PR will break your Dagster production environment 🦾. Here is a metaplane-dbt PR where I delete the asset above, and the a pic of the associated broken branch deployment in Dagster:
At this point in time deal_flow_enriched is the only asset I’ve exposed at Metaplane. With Software Defined Assets (SDAs), I’ve been able to save cost by only running what’s in production rather than a whole DAG. You can focus on just the asset you want updated and its whole lineage can be run just in time.
It also makes operating easier - you know from your SLAs and stakeholder needs that certain assets need to be refreshed at a specific cadence. SDAs allow you to simply schedule these assets to be refreshed at the required cadence, without worrying about “lining up the dominoes” - Dagster does this for you. You don’t need to plan to run a whole DAG or sections of a DAG understanding which parts are needed for your target asset’s lineage. Invariably, this way of operating leads to bloated DAGs being run, with many unneeded assets being materialised and refreshed. With SDAs, you can refresh exactly what you need, and no more, when you need it.
We’ve saved time=money:
Snowflake costs from building tables that aren’t needed. Even within the Fivetran Hubspot dbt package, I’m only running models which my target asset depends on.
Fivetran costs from regularly running syncs that aren’t needed at this time.
Dagster Cloud costs - this job, which is sufficient at this time, takes 5 minutes to run and is single-threaded. The job I showed in my last post with the whole DAG and three Fivetran syncs was multi-threaded and took 13 minutes. You would be concerned about running a DAG which took 13 minutes every 15 minutes but not one that took 5; in another scenario this fine-grained level of selection would allow for a higher refresh cadence.
If, in the future, Fivetran or other ELT providers allowed for subsetting inside of syncs, then more time and cost could be saved along with simplification of job definition.
Software Defined Assets allow you to quickly build large and complex DAGs, but only use the flows you need at any given point in time - you only think about the target assets you want refreshed and not the rest of the DAG.
One thing which would help in the future, if you had many SDA-based jobs running independently with overlapping parts of the DAG, would be that the state of the assets on the DAG could be known across all jobs. What I mean by this is that if, say, one dbt model which was used in many jobs had a state which showed it was refreshed in the last 5 minutes, the next job that depends on it could choose to not refresh it, because it’s sufficiently up to date. This kind of statefulness is best done as part of a service, like Dagster Cloud, and could add additional value over open-source.
Conclusions & Recommendations
I wholeheartedly recommend Dagster over Airflow, especially if you haven’t committed to a specific orchestrator yet. The infrastructure is a breeze to use - developing in it has a steeper learning curve but comes with additional benefits. They are lacking a bit in comprehensive documentation with good worked examples, but this will come in the next few months. Also, I’ve probably already banged my head against the wall solving a lot of the problems someone from a data background might come across, so feel free to use my repository and code. Even though I struggled a bit, you can see my code is relatively concise and easy to read. My struggle was mostly in not knowing what to do - I learn much better from looking at examples than docs; I usually look at docs after examples, to fill in the gaps.
I know that Astronomer will want to have their say on the matter, when they give me access to their lower cost cloud offering. Airflow have also tried to enable Software Defined Assets in their own way using their new datasets feature, but it seems less mature. The way Dagster inherited all of the dbt models and dependencies from my project and linked them with the Fivetran table syncs automatically is excellent. If you do ELT with dbt and other MDS tooling, I think Dagster is most likely your best choice.
At some point, I will take a look at Prefect but I think I’m all orchestrated out for this year. I’m also very interested in Kestra too, which seems to take a genuinely different approach - with YAML instead of Python for config, possibly allowing for an orchestrator that allows less technical folks to contribute.
Madison recently covered Prefect for reference.
There will be some very small teams out there who just use Fivetran + Snowflake/BigQuery + dbt to run their data pipelines. If this is the case, then I think any stand-alone orchestrator would be overkill and a waste of your very limited time. I would simply use the Fivetran transformations feature until your needs become more complex or scale.
If you’re part of a large team with a huge Airflow estate, using Astronomer probably makes sense but also think about building new standalone DAGs, or rebuilds of branches in the DAG, in something like Dagster. You can run both together relatively easily and call one from the other if needed.
With Software Defined Assets, you could have a way to present target assets elsewhere. Imagine if you had a new entity in Dagster called @interface which you could associate with an asset. This could automatically make the asset accessible via, say, GraphQL.
The backend to the interface could have a caching mechanism with configurable expiration, authorisation and RBAC. This could allow any asset in a Dagster graph to become a product in and of itself. You could move away from scheduled refreshes and have the assets refreshed on demand, should they have expired or are approaching expiration.
If only Dagster had someone as CTO who had prior experience of building such interface standards… if only 😁.
Thanks for reading davidj.substack! Subscribe for free to receive new posts and support my work.