import os
from typing import List, Dict, Union
from warnings import warn
import csverve.utils as utils
import pandas as pd # type: ignore
from csverve.core import CsverveInput
from csverve.core import CsverveOutputDataFrame
from csverve.core import CsverveOutputFileStream
from csverve.core import IrregularCsverveInput
from csverve.errors import CsverveConcatException
[docs]def concatenate_csv_files_pandas(
in_filenames: Union[List[str], Dict[str, str]],
out_filename: str,
dtypes: Dict[str, str],
skip_header: bool = False,
drop_duplicates: bool = False,
**kwargs
) -> None:
"""
Concatenate gzipped CSV files.
@param in_filenames: List of gzipped CSV file paths, or a dictionary where the keys are file paths.
@param out_filename: Path of resulting concatenated gzipped CSV file and meta YAML.
@param dtypes: Dictionary of pandas dtypes, where key = column name, value = dtype.
@param skip_header: boolean, True = write header, False = don't write header.
@return:
"""
if kwargs.get('write_header') is not None:
raise DeprecationWarning('write_header has been deprecated and will be ignored, please use skip_header instead')
if isinstance(in_filenames, dict):
in_filenames = list(in_filenames.values())
data: List[CsverveInput] = [
CsverveInput(in_filename).read_csv() for in_filename in in_filenames
]
concat_data: pd.DataFrame = pd.concat(data, ignore_index=True)
if drop_duplicates:
concat_data = concat_data.drop_duplicates()
csvoutput: CsverveOutputDataFrame = CsverveOutputDataFrame(
concat_data, out_filename, dtypes, skip_header=skip_header
)
csvoutput.write_df()
[docs]def concatenate_csv_files_quick_lowmem(
inputfiles: List[str],
output: str,
dtypes: Dict[str, str],
columns: List[str],
skip_header: bool = False,
**kwargs
) -> None:
"""
Concatenate gzipped CSV files.
@param inputfiles: List of gzipped CSV file paths.
@param output: Path of resulting concatenated gzipped CSV file and meta YAML.
@param dtypes: Dictionary of pandas dtypes, where key = column name, value = dtype.
@param columns: List of column names for newly concatenated gzipped CSV file.
@param skip_header: boolean, True = write header, False = don't write header.
@return:
"""
if kwargs.get('write_header') is not None:
raise DeprecationWarning('write_header has been deprecated and will be ignored, please use skip_header instead')
csvoutput: CsverveOutputFileStream = CsverveOutputFileStream(
output, dtypes, skip_header=skip_header, columns=columns
)
csvoutput.write_data_streams(inputfiles)
[docs]def get_columns(infile):
return CsverveInput(infile).columns
[docs]def get_dtypes(infile):
return CsverveInput(infile).dtypes
[docs]def rewrite_csv_file(
filepath: str,
outputfile: str,
skip_header: bool = False,
dtypes: Dict[str, str] = None,
**kwargs
) -> None:
"""
Generate header less csv files.
@param filepath: File path of CSV.
@param outputfile: File path of header less CSV to be generated.
@param skip_header: boolean, True = write header, False = don't write header.
@param dtypes: Dictionary of pandas dtypes, where key = column name, value = dtype.
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
if os.path.exists(filepath + '.yaml'):
csvinput: Union[CsverveInput, IrregularCsverveInput] = CsverveInput(filepath)
df = csvinput.read_csv()
csvoutput_df = CsverveOutputDataFrame(
df, outputfile, skip_header=skip_header,
dtypes=csvinput.dtypes
)
csvoutput_df.write_df()
else:
assert dtypes
csvinput = IrregularCsverveInput(filepath, dtypes)
csvoutput_fs = CsverveOutputFileStream(
outputfile, skip_header=skip_header, columns=csvinput.columns,
dtypes=csvinput.dtypes
)
csvoutput_fs.rewrite_csv(filepath)
[docs]def merge_csv(
in_filenames: Union[List[str], Dict[str, str]],
out_filename: str,
how: str,
on: List[str],
skip_header: bool = False,
**kwargs
) -> None:
"""
Create one gzipped CSV out of multiple gzipped CSVs.
@param in_filenames: Dictionary containing file paths as keys
@param out_filename: Path to newly merged CSV
@param how: How to join DataFrames (inner, outer, left, right).
@param on: Column(s) to join on, comma separated if multiple.
@param skip_header: boolean, True = write header, False = don't write header
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
if isinstance(in_filenames, dict):
in_filenames = list(in_filenames.values())
data: List[CsverveInput] = [CsverveInput(infile) for infile in in_filenames]
dfs: List[str] = [csvinput.read_csv() for csvinput in data]
dtypes: List[Dict[str, str]] = [csvinput.dtypes for csvinput in data]
merged_data: pd.DataFrame = utils.merge_frames(dfs, how, on)
dtypes_: Dict[str, str] = utils.merge_dtypes(dtypes)
csvoutput: CsverveOutputDataFrame = CsverveOutputDataFrame(
merged_data, out_filename, dtypes_, skip_header=skip_header
)
csvoutput.write_df()
[docs]def concatenate_csv(inputfiles: List[str], output: str, skip_header: bool = False,
drop_duplicates: bool = False, **kwargs) -> None:
"""
Concatenate gzipped CSV files, dtypes in meta YAML files must be the same.
@param inputfiles: List of gzipped CSV file paths, or a dictionary where the keys are file paths.
@param output: Path of resulting concatenated gzipped CSV file and meta YAML.
@param skip_header: boolean, True = write header, False = don't write header.
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
if isinstance(inputfiles, dict):
inputfiles = list(inputfiles.values())
if inputfiles == []:
raise CsverveConcatException("nothing provided to concat")
inputs: List[CsverveInput] = [CsverveInput(infile) for infile in inputfiles]
dtypes: Dict[str, str] = utils.merge_dtypes([csvinput.dtypes for csvinput in inputs])
headers: List[bool] = [csvinput.header for csvinput in inputs]
columns: List[List[str]] = [csvinput.columns for csvinput in inputs]
low_memory: bool = True
if any(headers):
low_memory = False
if not all(columns[0] == elem for elem in columns):
low_memory = False
if drop_duplicates:
low_memory = False
if low_memory:
concatenate_csv_files_quick_lowmem(inputfiles, output, dtypes, columns[0], skip_header=skip_header)
else:
concatenate_csv_files_pandas(inputfiles, output, dtypes, skip_header=skip_header,
drop_duplicates=drop_duplicates)
[docs]def annotate_csv(
infile: str,
annotation_df: pd.DataFrame,
outfile,
annotation_dtypes,
on="cell_id",
skip_header: bool = False,
**kwargs
):
"""
TODO: fill this in
@param infile:
@param annotation_df:
@param outfile:
@param annotation_dtypes:
@param on:
@param skip_header:
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
csvinput = CsverveInput(infile)
metrics_df = csvinput.read_csv()
# get annotation rows that correspond to rows in on
reformed_annotation = annotation_df[annotation_df[on].isin(metrics_df[on])]
# do nothing if the annotation df is empty
if reformed_annotation.empty: # so we dont add NaNs
return write_dataframe_to_csv_and_yaml(metrics_df, outfile,
csvinput.dtypes,
skip_header=skip_header)
metrics_df = metrics_df.merge(reformed_annotation, on=on, how='outer')
csv_dtypes = csvinput.dtypes
for col, dtype in csv_dtypes.items():
if col in annotation_dtypes:
assert dtype == annotation_dtypes[col]
csv_dtypes.update(annotation_dtypes)
output = CsverveOutputDataFrame(metrics_df, outfile, csv_dtypes, skip_header=skip_header)
output.write_df()
[docs]def simple_annotate_csv(
in_f: str,
out_f: str,
col_name: str,
col_val: str,
col_dtype: str,
skip_header: bool = False,
**kwargs
) -> None:
"""
Simplified version of the annotate_csv method.
Add column with the same value for all rows.
@param in_f:
@param out_f:
@param col_name:
@param col_val:
@param col_dtype:
@param skip_header:
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
csvinput = CsverveInput(in_f)
metrics_df = csvinput.read_csv()
metrics_df[col_name] = col_val
csv_dtypes = csvinput.dtypes
csv_dtypes[col_name] = col_dtype
output = CsverveOutputDataFrame(metrics_df, out_f, csv_dtypes, skip_header=skip_header)
output.write_df()
[docs]def add_col_from_dict(
infile,
col_data,
outfile,
dtypes,
skip_header=False,
**kwargs
):
"""
TODO: fill this in
Add column to gzipped CSV.
@param infile:
@param col_data:
@param outfile:
@param dtypes:
@param skip_header:
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
csvinput = CsverveInput(infile)
csv_dtypes = csvinput.dtypes
csvinput = csvinput.read_csv()
for col_name, col_value in col_data.items():
csvinput[col_name] = col_value
dtypes = utils.merge_dtypes([csv_dtypes, dtypes])
output = CsverveOutputDataFrame(
csvinput, outfile, dtypes, skip_header=skip_header
)
output.write_df()
[docs]def write_dataframe_to_csv_and_yaml(
df: pd.DataFrame,
outfile: str,
dtypes: Dict[str, str],
skip_header: bool = False,
**kwargs
) -> None:
"""
Output pandas dataframe to a CSV and meta YAML files.
@param df: pandas DataFrame.
@param outfile: Path of CSV & YAML file to be written to.
@param dtypes: dictionary of pandas dtypes by column, keys = column name, value = dtype.
@param skip_header: boolean, True = skip writing header, False = write header
@return:
"""
if kwargs.get('write_header') is not None:
warn('write_header has been deprecated and will be ignored, please use skip_header instead', DeprecationWarning)
csvoutput: CsverveOutputDataFrame = CsverveOutputDataFrame(
df, outfile, dtypes, skip_header=skip_header
)
csvoutput.write_df()
[docs]def read_csv(infile: str, chunksize: int = None, usecols=None, dtype=None) -> pd.DataFrame:
"""
Read in CSV file and return as a pandas DataFrame.
Assumes a YAML meta file in the same path with the same name, with a .yaml extension.
YAML file structure is atop this file.
@param infile: Path to CSV file.
@param chunksize: Number of rows to read at a time (optional, applies to large datasets).
@param usecols: Restrict to specific columns (optional).
@param dtype: Override the dtypes on specific columns (optional).
@return: pandas DataFrame.
"""
return CsverveInput(infile).read_csv(chunksize=chunksize, usecols=usecols, dtype=dtype)
[docs]def remove_duplicates(
filepath: str, outputfile: str, skip_header: bool = False,
) -> None:
"""
remove duplicate rows
Assumes a YAML meta file in the same path with the same name, with a .yaml extension.
YAML file structure is atop this file.
@param filepath: Path to CSV file.
@param outputfile: Path to CSV file.
"""
csvinput = CsverveInput(filepath)
df = csvinput.read_csv()
df = df.drop_duplicates(keep='first')
csvoutput: CsverveOutputDataFrame = CsverveOutputDataFrame(
df, outputfile, csvinput.dtypes, skip_header=skip_header
)
csvoutput.write_df()