Source code for csverve.core.csverve_output_file_stream

import gzip
import shutil
from typing import List, Dict, TextIO, Any

from csverve.core import CsverveOutput
from csverve.errors import CsverveInputError


[docs]class CsverveOutputFileStream(CsverveOutput): def __init__( self, filepath: str, dtypes: Dict[str, str], columns: List[str], skip_header: bool = False, na_rep: str = 'NaN', sep: str = ',', ) -> None: """ CSV file and all related metadata. @param filepath: CSV file path. @param dtypes: Dictionary of pandas dtypes, where key = column name, value = dtype. @param header: boolean, True = write header, False = don't write header. @param na_rep: replace NaN with this value. @param columns: List of column names. """ super().__init__( filepath, dtypes, columns, skip_header=skip_header, na_rep=na_rep, sep=sep ) def _write_header_to_file(self, writer: TextIO) -> None: """ Write header. @param writer: TextIO. @return: """ assert self.columns header: str = ','.join(self.columns) header = header + '\n' writer.write(header)
[docs] def write_data_streams(self, csvfiles: List[str]) -> None: """ Write data streams. @param csvfiles: List of CSV files paths. @return: """ assert self.columns assert self.dtypes with gzip.open(self.filepath, 'wt') as writer: if not self.skip_header: self._write_header_to_file(writer) for csvfile in csvfiles: with gzip.open(csvfile, 'rt') as data_stream: shutil.copyfileobj( data_stream, writer, length=16 * 1024 * 1024 ) self.write_yaml()
@staticmethod def _file_type(filepath) -> str: if filepath.endswith('gz'): return 'gzip' elif filepath.endswith('csv'): return 'plain-text' else: raise CsverveInputError('Unsupported file type: {}'.format(filepath))
[docs] def rewrite_csv(self, csvfile: str) -> None: """ Rewrite CSV. @param csvfile: Filepath of CSV file. @return: """ assert self.columns assert self.dtypes filetype = self._file_type(csvfile) opener: Any = gzip.open if filetype == 'gzip' else open with gzip.open(self.filepath, 'wt') as writer: with opener(csvfile, 'rt') as data_stream: shutil.copyfileobj( data_stream, writer, length=16 * 1024 * 1024 ) self.write_yaml()