Source code for csverve.utils.utils

from typing import Any
from typing import List, Dict, Union

import pandas as pd  # type: ignore
from csverve.errors import CsverveDtypeError
from csverve.errors import CsverveMergeColumnMismatchException
from csverve.errors import CsverveMergeCommonColException
from csverve.errors import CsverveMergeDtypesEmptyMergeSet
from csverve.errors import CsverveMergeException
from csverve.errors import DtypesMergeException


[docs]def pandas_to_std_types(dtype: Any) -> str: std_dict = { "bool": "bool", "int64": "int", "int": "int", "Int64": "int", "float64": "float", "float": "float", "object": "str", "str": "str", "category": "category", } if not isinstance(dtype, str): if hasattr(dtype, '__name__'): dtype = dtype.__name__ else: raise CsverveDtypeError('Unable to process dtype {}'.format(dtype)) if dtype not in std_dict: raise CsverveDtypeError('Unable to process dtype {}'.format(dtype)) return std_dict[dtype]
[docs]def merge_dtypes(dtypes_all: List[Dict[str, str]]) -> Dict[str, str]: """ Merge pandas dtypes. @param dtypes_all: List of dtypes dictionaries, where key = column name, value = pandas dtype. @return: Merged dtypes dictionary. """ if not dtypes_all: raise CsverveMergeDtypesEmptyMergeSet("must provide dtypes to merge") merged_dtypes: Dict[str, str] = {} for dtypes in dtypes_all: for k, v in dtypes.items(): if k in merged_dtypes: if merged_dtypes[k] != v: raise DtypesMergeException("dtypes not mergeable") else: merged_dtypes[k] = v return merged_dtypes
def _validate_merge_cols(frames: List[pd.DataFrame], on: Union[List[str], str]) -> None: """ Make sure frames look good, raise relevant exceptions. @param frames: list of pandas DataFrames to merge @param on: list of common columns in frames on which to merge @return: """ if not on: raise CsverveMergeException("unable to merge if given nothing to merge on") # check that columns to be merged have identical values standard = frames[0][on] standard = standard.sort_values(on).reset_index(drop=True) for frame in frames: comp_df = frame[on].sort_values(on).reset_index(drop=True) if not pd.concat([standard, comp_df]).drop_duplicates(keep=False).empty: raise CsverveMergeColumnMismatchException("columns on which to merge must be identical") # check that columns to be merged have same dtypes for shared_col in on: if len(set([frame[shared_col].dtypes for frame in frames])) != 1: raise CsverveMergeColumnMismatchException("columns on which to merge must have same dtypes") common_cols = set.intersection(*[set(frame.columns) for frame in frames]) cols_to_check = list(common_cols - set(on)) for frame1, frame2 in zip(frames[:-1], frames[1:]): if not frame1[cols_to_check].equals(frame2[cols_to_check]): raise CsverveMergeCommonColException("non-merged common cols must be identical")
[docs]def merge_frames(frames: List[pd.DataFrame], how: str, on: List[str]) -> pd.DataFrame: """ Takes in a list of pandas DataFrames, and merges into a single DataFrame. #TODO: add handling if empty list is given @param frames: List of pandas DataFrames. @param how: How to join DataFrames (inner, outer, left, right). @param on: Column(s) to join on, comma separated if multiple. @return: merged pandas DataFrame. """ if isinstance(on, str): on = [on] _validate_merge_cols(frames, on) if len(frames) == 1: return frames[0] else: left: pd.DataFrame = frames[0] right: pd.DataFrame = frames[1] cols_to_use: List[str] = list(right.columns.difference(left.columns)) cols_to_use += on cols_to_use = list(set(cols_to_use)) merged_frame: pd.DataFrame = pd.merge( left, right[cols_to_use], how=how, on=on, ) for i, frame in enumerate(frames[2:]): merged_frame = pd.merge( merged_frame, frame, how=how, on=on, ) return merged_frame