Source code for abraxos.load

"""SQL loading utilities with error handling and retry logic."""

from __future__ import annotations

import typing as t
from collections.abc import Iterable
from typing import Literal

import pandas as pd

from abraxos import utils

__all__ = ['SqlInsert', 'SqlConnection', 'SqlEngine', 'ToSqlResult', 'to_sql', 'use_sql']


[docs] class SqlInsert(t.Protocol): """ Protocol for a SQL insert statement object (e.g., sqlalchemy.Insert). """ ...
[docs] class SqlConnection(t.Protocol): """ Protocol for a database connection that supports executing insert statements. """
[docs] def execute( self, insert: SqlInsert, records: Iterable[dict] ) -> None: """ Execute an insert statement with given records. """ ...
[docs] class SqlEngine(t.Protocol): """ Protocol for a database engine object that can provide connections. """
[docs] def connect(self) -> SqlConnection: """ Obtain a SQL connection from the engine. """ ...
[docs] class ToSqlResult(t.NamedTuple): """ Result of inserting a DataFrame into a database. Attributes ---------- errors : list of Exception Exceptions encountered during insertion. errored_df : pandas.DataFrame Rows that failed to be inserted. success_df : pandas.DataFrame Rows that were successfully inserted. """ errors: list[Exception] errored_df: pd.DataFrame success_df: pd.DataFrame
[docs] def to_sql( df: pd.DataFrame, name: str, con: SqlConnection | SqlEngine, *, if_exists: Literal['fail', 'replace', 'append'] = 'append', index: bool = False, chunks: int = 2, **kwargs: t.Any ) -> ToSqlResult: """ Writes a DataFrame to a SQL database table with error handling. Parameters ---------- df : pd.DataFrame The DataFrame to insert. name : str Name of the target table. con : SqlConnection or SqlEngine SQLAlchemy-like connection or engine object. if_exists : {'fail', 'replace', 'append'}, optional SQL behavior if the table already exists (default is 'append'). index : bool, optional Whether to include the DataFrame index in the output (default is False). chunks : int, optional Number of chunks to recursively split on failure (default is 2). **kwargs Additional arguments passed to `pandas.DataFrame.to_sql`. Returns ------- ToSqlResult A named tuple with lists of errors, failed rows, and successful rows. """ errors: list[Exception] = [] errored_dfs: list[pd.DataFrame] = [utils.clear(df)] success_dfs: list[pd.DataFrame] = [utils.clear(df)] try: df.to_sql(name, con, if_exists=if_exists, index=index, method='multi', **kwargs) return ToSqlResult([], utils.clear(df), df) except Exception: if len(df) > 1: for df_chunk in utils.split(df, chunks): result: ToSqlResult = to_sql( df_chunk, name, con, if_exists=if_exists, index=index, chunks=chunks, **kwargs ) errors.extend(result.errors) errored_dfs.append(result.errored_df) success_dfs.append(result.success_df) else: try: df.to_sql(name, con, if_exists=if_exists, index=index, method='multi', **kwargs) return ToSqlResult([], utils.clear(df), df) except Exception as e: return ToSqlResult([e], df, utils.clear(df)) return ToSqlResult(errors, pd.concat(errored_dfs), pd.concat(success_dfs))
def insert_df( df: pd.DataFrame, connection: SqlConnection, sql_query: SqlInsert ) -> ToSqlResult: """ Inserts a DataFrame into a database using a raw SQL insert statement. Parameters ---------- df : pd.DataFrame The DataFrame to insert. connection : SqlConnection A SQL connection object with an `execute` method. sql_query : SqlInsert A SQLAlchemy-compatible insert statement. Returns ------- ToSqlResult A result containing successful and errored inserts. """ records: list[dict] = utils.to_records(df) connection.execute(sql_query, records) return ToSqlResult([], utils.clear(df), df)
[docs] def use_sql( df: pd.DataFrame, connection: SqlConnection, sql_query: SqlInsert, chunks: int = 2 ) -> ToSqlResult: """ Executes user-provided SQL insert using `insert_df` with error handling. Parameters ---------- df : pd.DataFrame The DataFrame to insert. connection : SqlConnection SQL connection object. sql_query : SqlInsert SQL insert statement object. chunks : int, optional Number of chunks to split on failure (default is 2). Returns ------- ToSqlResult A result indicating which rows succeeded and which failed. """ errors: list[Exception] = [] errored_dfs: list[pd.DataFrame] = [utils.clear(df)] success_dfs: list[pd.DataFrame] = [utils.clear(df)] try: return insert_df(df, connection, sql_query) except Exception: if len(df) > 1: for df_chunk in utils.split(df, chunks): result: ToSqlResult = use_sql(df_chunk, connection, sql_query, chunks) errors.extend(result.errors) errored_dfs.append(result.errored_df) success_dfs.append(result.success_df) else: try: return insert_df(df, connection, sql_query) except Exception as e: return ToSqlResult([e], df, utils.clear(df)) return ToSqlResult( errors, pd.concat(errored_dfs), pd.concat(success_dfs) )