Source code for dbt_llm_tools.dbt_project

import glob
import json
import os
import re
from typing import Union

import yaml

from tinydb import TinyDB, Query

from dbt_llm_tools.types import DbtModelDirectoryEntry, DbtProjectDirectory

SOURCE_SEARCH_EXPRESSION = r"source\(['\"]*(.*?)['\"]*,\s*['\"]*(.*?)['\"]*\)"
REF_SEARCH_EXPRESSION = r"ref\(['\"]*(.*?)['\"]*\)"


[docs] class DbtProject: """ A class representing a DBT project. """ def __init__( self, dbt_project_root: str, database_path: str = ".local_storage/db.json", ) -> None: """ Initializes a dbt project parser object. Args: dbt_project_root (str): Root of the dbt prject database_path (str, optional): Path to the directory file that stores the parsed dbt project. Methods: parse: Parse the dbt project and store details in a manifest file. get_single_model: Get a single model by name. get_models: Get a list of models based on the provided filters. update_model_directory: Update a model in the directory. """ self.__project_root = dbt_project_root dbt_project_file = os.path.join(dbt_project_root, "dbt_project.yml") if not os.path.isfile(dbt_project_file): raise Exception("No dbt project found in the specified folder") self.__database_path = database_path os.makedirs(os.path.dirname(database_path), exist_ok=True) with open(dbt_project_file, encoding="utf-8") as f: project_config = yaml.safe_load(f) self.__model_paths = project_config.get("model-paths", ["models"]) self.__sql_files = self.__get_all_files("sql") self.__yaml_files = self.__get_all_files("yml") def __get_all_files(self, file_extension: str): """ Get all files of a certain type in the dbt project. Args: file_extension (str): The file extension to search for. Returns: list: A list of files with the specified extension. """ files = [] for path in self.__model_paths: files.extend( glob.glob( os.path.join( self.__project_root, path, "**", f"*.{file_extension}" ), recursive=True, ) ) return files def __find_upstream_references( self, file_path: str, recursive: bool = False, dependencies: list[str] = None ): """ Find upstream references in a SQL file. Args: file_path (str): The path to the SQL file. recursive (bool, optional): Whether to recursively search for upstream references. dependencies (list, optional): A list of dependencies to add to. Returns: list: A list of upstream references. """ if dependencies is None: dependencies = [] with open(file_path, encoding="utf-8") as f: file_contents = f.read() search_results = re.findall(REF_SEARCH_EXPRESSION, file_contents) unique_results = list(set(search_results)) if recursive: for result in unique_results: if result in dependencies or result in file_path: continue sub_file_path = next( (x for x in self.__sql_files if x.endswith(f"/{result}.sql")), None ) if sub_file_path is not None: dependencies = self.__find_upstream_references( file_path=sub_file_path, recursive=True, dependencies=dependencies, ) return dependencies + unique_results def __parse_sql_file(self, sql_file: str): """ Parse a SQL file and return a dictionary with the file metadata. Args: sql_file (str): The path to the SQL file. Returns: dict: A dictionary containing the parsed SQL file metadata. """ with open(sql_file, encoding="utf-8") as f: sql_contents = f.read() source_search = re.findall(SOURCE_SEARCH_EXPRESSION, sql_contents) sources = [{"name": match[0], "table": match[1]} for match in source_search] return { "type": "model", "absolute_path": sql_file, "relative_path": sql_file.replace(self.__project_root, ""), "name": os.path.basename(sql_file).replace(".sql", ""), "refs": self.__find_upstream_references(sql_file, False), "deps": self.__find_upstream_references(sql_file, True), "sources": sources, "sql_contents": sql_contents, } def __parse_yaml_files(self, yaml_files: list[str]): """ Extract documentation from the parsed yaml files. Args: yaml_files (list): A list of yaml files to parse. Returns: dict: A dictionary containing the parsed models. dict: A dictionary containing the parsed sources. """ models = {} sources = {} for yaml_path in yaml_files: with open(yaml_path, encoding="utf-8") as f: yaml_contents = yaml.safe_load(f) if yaml_contents is None: continue for model in yaml_contents.get("models", []): model["yaml_path"] = yaml_path models[model["name"]] = model for source in yaml_contents.get("sources", []): source["type"] = "source" source["yaml_path"] = yaml_path sources[source["name"]] = source return models, sources def __get_directory(self): """ Get the parsed directory from the directory file. Returns: dict: The parsed directory. """ with open(self.__database_path, encoding="utf-8") as f: return json.load(f) def __save_directory(self, directory): """ Save the parsed directory to a file. Args: directory (dict): The directory to save. """ db = TinyDB(self.__database_path, sort_keys=True, indent=4) Model = Query() # pylint: disable=invalid-name Source = Query() # pylint: disable=invalid-name for name, model in directory["models"].items(): if "name" in model: db.upsert(model, Model.name == name) for name, source in directory["sources"].items(): if "name" in source: db.upsert(source, Source.name == source["name"])
[docs] def parse(self) -> DbtProjectDirectory: """ Parse the dbt project and store details in a manifest file. Returns: dict: The parsed directory. """ source_sql_models = {} for sql_file in self.__sql_files: parsed_model = self.__parse_sql_file(sql_file) source_sql_models[parsed_model["name"]] = parsed_model documented_models, documented_sources = self.__parse_yaml_files( self.__yaml_files ) for model_name, model_dict in documented_models.items(): yaml_path = model_dict.pop("yaml_path") if model_name in source_sql_models: source_sql_models[model_name]["yaml_path"] = yaml_path source_sql_models[model_name]["documentation"] = model_dict else: source_sql_models[model_name] = { "yaml_path": yaml_path, "documentation": model_dict, } directory = { "models": source_sql_models, "sources": documented_sources, } self.__save_directory(directory) return directory
[docs] def get_single_model(self, model_name: str) -> Union[DbtModelDirectoryEntry, None]: """ Get a single model by name. Args: model_name (str): The name of the model to get. Returns: dict: The model object. """ if model_name is None: raise Exception("No model name provided") # directory = self.__get_directory() db = TinyDB(self.__database_path) Model = Query() # pylint: disable=invalid-name return db.get(Model.name == model_name)
[docs] def get_models( self, models: list[str] = None, included_folders: list[str] = None, excluded_folders: list[str] = None, ): """ Get a list of models based on the provided filters. Args: models (list, optional): A list of model names to get. included_folders (list, optional): A list of folders to include in the search for sql or yaml files. excluded_folders (list, optional): A list of folders to exclude from the search for sql or yaml files. Returns: list: A list of DbtModel objects. """ searched_models = [] db = TinyDB(self.__database_path) Model = Query() # pylint: disable=invalid-name File = Query() # pylint: disable=invalid-name if models is None and included_folders is None: searched_models = db.search(File.type == "model") for model in models or []: if model := db.get(Model.name == model): searched_models.append(model) for included_folder in included_folders or []: for model in db.search(File.type == "model"): if included_folder in model.get( "absolute_path", "" ) or included_folder in model.get("yaml_path", ""): searched_models.append(model) for excluded_folder in excluded_folders or []: for model in searched_models.copy(): if excluded_folder in model.get( "absolute_path", "" ) or excluded_folder in model.get("yaml_path", ""): searched_models.remove(model) return searched_models
[docs] def update_model_directory(self, model: dict): """ Update a model in the directory. Args: model (dict): The model to update. """ directory = self.__get_directory() if model["name"] in directory["models"]: directory["models"][model["name"]] = model self.__save_directory(directory)