I’m continuing straight on from where I left off yesterday, where I used dlt to extract data from the Bluesky APIs into DuckDB directly. I did evolve my code to a point where I didn’t use dlt and stored in parquet instead - and I may come back to that method later in this series - but I want to look at this route today because of one of the advantages it offers: automatic creation of sqlmesh models from a dlt pipeline.
When dlt pipelines load data, they also load a lot of really useful metadata. Every load has a load_id and insert timestamp. The load_id itself is a timestamp which seems to be a few seconds, at most, before the insert timestamp - the gap being the time between the data being put in the pipeline and then when it is loaded into the database. This is nice to have out of the box, but what I would call standard data engineering practice.
What goes beyond this, though, is that dlt also tracks schema evolution, with its schema version hashes. Loads which use the same schema have the same schema version hash. This is incredibly useful both to debug if schema evolution broke downstream code and also to be able to gracefully handle it. Schema evolution will happen, it’s just a question of whether you’re ready for it.
One other cool thing I noticed that dlt did automagicmatically is to handle nested arrays within the JSON, by making them their own table with many records per primary key in the original data. If you see the schema from the Bluesky API:
The labels arrays have been made into followers__labels and follows__labels tables, respectively.
So let’s run the command that is the title of this post and see what happens:
sqlmesh init -t dlt --dlt-pipeline bluesky duckdb
It worked straight away with no issue and completed within a couple of seconds.
It has automatically created incremental models for both followers, followers__labels, follows and follows__labels, plus the _dlt_loads metadata table that is also in the same DuckDB schema.
The models are nicely formatted, with explicit casts and using the _dlt_load_id cast to a timestamp as the partition key for incrementality. This a really nice workflow, I’ve done this kind of work with dbt before where I had to build tens of staging tables based on a source and there was no way to generate the models automatically. You only get this kind of benefit in dbt if you are using a dbt package for a connector provided by an ELT vendor like Fivetran or Airbyte, and they don’t typically handle incremental loading.
Next, I’ll cover sqlmesh model types and how it deals with schema evolution.