import numpy as np import pandas as pd import logging class Dataset: def __init__(self, dataset_filename: str) -> None: """A wrapper class for the pandas dataframe. Loads and preprocesses the dataset, then stores it in self.dataframe. Args: dataset_filename (str): the path of the dataset, relative to the location of the file calling this function. """ if type(dataset_filename) != str: logging.error("parameter `dataset_filename` is not a string") raise ValueError(f"{dataset_filename} is not a string") if not dataset_filename.endswith(".csv"): logging.error("dataset filename should be CSV") raise OSError(f"{dataset_filename} is not a CSV file.") try: raw_dataframe = pd.read_csv( dataset_filename, encoding="windows-1254" ) logging.info("Dataframe successfully loaded") except FileNotFoundError as e: logging.error("CSV file not found") raise e self.dataframe = self.preprocess_dataset(raw_dataframe) logging.info("Dataset class successfully initialised") def preprocess_dataset(self, raw_dataframe: pd.DataFrame) -> pd.DataFrame: """preprocess dataframe immediately after loading it. Args: raw_dataframe (pd.DataFrame): raw dataframe as read from pd.read_csv(). This dataframe is discarded afterwards. Returns: pd.DataFrame: resulting preprocessed dataframe. """ if type(raw_dataframe) != pd.DataFrame: logging.error( "parameter `raw_dataframe` is not a pandas DataFrame" ) raise ValueError(f"{raw_dataframe} is not a pandas DataFrame") dataframe = self._drop_unnecessary_columns( raw_dataframe ) # for conveneince dataframe = self.remove_nonaccepting_rows(dataframe) dataframe = self.treat_outliers(dataframe, "Hours") dataframe["Anxiety_score"] = self.get_combined_anxiety_score(dataframe) dataframe["Is_narcissist"] = self.get_is_narcissist_col(dataframe) dataframe["Is_competitive"] = self.preprocess_whyplay(dataframe) # more preprocessing goes here return dataframe def get_is_competitive_col(self, dataframe: pd.DataFrame): if type(dataframe) != pd.DataFrame: logging.error("parameter `dataframe` is not a pandas DataFrame") raise ValueError(f"{dataframe} is not a pandas DataFrame") is_competitive_col = np.zeros(shape=len(dataframe)) is_competitive_col[ (dataframe["whyplay"] == "improving") | (dataframe["whyplay"] == "winning") | (dataframe["whyplay"] == "all of the above") ] = True is_competitive_col[ (dataframe["whyplay"] == "having fun") | (dataframe["whyplay"] == "relaxing") ] = False is_competitive_col[(dataframe["whyplay"] == "other")] = None return is_competitive_col def _drop_unnecessary_columns( self, dataframe: pd.DataFrame ) -> pd.DataFrame: """Drop unnecessary rows from the dataset. Args: dataframe (pd.DataFrame): the dataframe. Returns: pd.DataFrame: the dataframe. """ if type(dataframe) != pd.DataFrame: logging.error("parameter `dataframe` is not a pandas DataFrame") raise ValueError(f"{dataframe} is not a pandas DataFrame") rows_to_drop = ( [ "League", "S. No.", "Timestamp", "highestleague", "earnings", "Birthplace", "Birthplace_ISO3", ] + [f"GAD{i}" for i in range(1, 8)] + ["GADE"] + [f"SWL{i}" for i in range(1, 6)] + [f"SPIN{i}" for i in range(1, 18)] ) return dataframe.drop(rows_to_drop, axis="columns") def remove_nonaccepting_rows( self, dataframe: pd.DataFrame ) -> pd.DataFrame: """Removes rows where participants did not consent to data processing. Args: dataframe (pd.DataFrame): the dataframe. Returns: pd.DataFrame: the dataframe. """ if type(dataframe) != pd.DataFrame: logging.error("parameter `dataframe` is not a pandas DataFrame") raise ValueError(f"{dataframe} is not a pandas DataFrame") # drop rows where users did not accept to having their data used dataframe = dataframe.drop( dataframe[dataframe["accept"] != "Accept"].index, ) dataframe = dataframe.drop(["accept"], axis=1) return dataframe def preprocess_whyplay(self, dataframe: pd.DataFrame) -> pd.Series: """Preprocesses the whyplay column, and returns a Is_competitive col. Args: dataframe (pd.DataFrame): the dataframe. Returns: pd.Series: the Is_competitive column. """ if type(dataframe) != pd.DataFrame: logging.error("parameter `dataframe` is not a pandas DataFrame") raise ValueError(f"{dataframe} is not a pandas DataFrame") dataframe["whyplay"] = dataframe["whyplay"].str.lower() most_common_whyplay_reasons = list( dataframe.groupby("whyplay") .size() .sort_values(ascending=False) .head(5) .index ) dataframe.loc[ ~dataframe["whyplay"].isin(most_common_whyplay_reasons), "whyplay" ] = "other" is_competitive_col = self.get_is_competitive_col(dataframe) return is_competitive_col def treat_outliers(self, df, colname) -> pd.DataFrame: q = df[colname].quantile(0.99) return df[df[colname] < q] def get_combined_anxiety_score(self, dataframe: pd.DataFrame) -> pd.Series: """Get the combined anxiety score, as a column. This score is based on the GAN, SPIN and SWL metrics. Each of the three columns are first normalised, then the mean is returned. Args: dataframe (pd.DataFrame): the dataframe. Returns: pd.Series: the anxiety score column. """ if type(dataframe) != pd.DataFrame: logging.error("parameter `dataframe` is not a pandas DataFrame") raise ValueError(f"{dataframe} is not a pandas DataFrame") gad_max = 21 gad_min = 0 gad_normalised = (dataframe["GAD_T"] - gad_min) / gad_max spin_max = 68 spin_min = 0 spin_normalised = (dataframe["SPIN_T"] - spin_min) / spin_max swl_max = 35 swl_min = 5 swl_flipped = 1 - (dataframe["SWL_T"] - swl_min) / swl_max combined = (gad_normalised + spin_normalised + swl_flipped) / 3 return combined def get_is_narcissist_col(self, dataframe: pd.DataFrame) -> pd.Series: """Get a boolean narcissist column. The Narcissism score of 1.0 is considered Not a Narcissist, while all values above that are above are considered Narcissist. Args: dataframe (pd.DataFrame): the dataframe Returns: pd.Series: the boolean narcissist column. """ if type(dataframe) != pd.DataFrame: logging.error("parameter `dataframe` is not a pandas DataFrame") raise ValueError(f"{dataframe} is not a pandas DataFrame") return np.where(dataframe["Narcissism"] <= 1.0, True, False) def get_dataframe(self) -> pd.DataFrame: """A getter function for the dataframe. Returns: pd.DataFrame: the dataset. """ return self.dataframe def get_sorted_column( self, colname: str, ascending: bool = True ) -> pd.Series: """Returns a single column, sorted either ascending or descending. Args: colname (str): the column name (see get_dataset_columns()). ascending (bool, optional): Sorting order. Defaults to True. Returns: pd.Series: The sorted column. """ if type(colname) != str: logging.error("parameter `colname` is not a string") raise ValueError(f"{colname} is not a string") if type(ascending) not in (None, bool): logging.error("parameter `ascending` is not a bool or None") raise ValueError(f"{colname} is not a bool or None") return self.dataframe[colname].sort_values(ascending=ascending) def get_unique_column_values(self, colname: str): """Returns a count of categorical values in the dataset. Args: colname (str): the column name. Returns: string array: an array of strings containing the unique values present in the column """ if type(colname) != str: logging.error("parameter `colname` is not a string") raise ValueError(f"{colname} is not a string") return self.dataframe[colname].explode().unique() def get_category_counts( self, colname: str, ascending: bool = None ) -> pd.Series: """Returns a count of categorical values in the dataset. Args: colname (str): the column name. ascending (bool, optional): Direction to sort results. If set to None, the results are not sorted. Defaults to None. Returns: pd.Series: the counted categories. """ if type(colname) != str: logging.error("parameter `colname` is not a string") raise ValueError(f"{colname} is not a string") if type(ascending) not in (None, bool): logging.error("parameter `ascending` is not a bool or None") raise ValueError(f"{colname} is not a bool or None") grouped_size = self.dataframe.groupby(colname).size() return ( grouped_size if ascending is None else grouped_size.sort_values(ascending=ascending) )