# Stdlib imports
import logging
import os
from typing import Dict, Generator, Iterable, Optional, Union, cast
# Third-party imports
import numpy as np
import pandas as pd
# Project imports
from mllaunchpad.resource import DataSink, DataSource, Raw, get_user_pw
logger = logging.getLogger(__name__)
SUPPORTED_FILE_TYPES = ["csv", "euro_csv", "text_file", "binary_file"]
[docs]def get_connection_args(dbms_config: Dict) -> Dict:
"""Fill "_var"-suffixed configuration items from environment variables"""
if "options" not in dbms_config:
return {}
new_options = {}
key: str
for key, value in dbms_config["options"].items():
if key.endswith("_var"):
new_value = os.environ.get(value)
if new_value is not None:
new_key = key[:-4]
new_options[new_key] = new_value
logger.debug(
"Replaced connection parameter '%s' specifying environment"
"variable '%s' with parameter '%s'",
key,
value,
new_key,
)
else:
logger.warning(
"Environment variable '%s' not set (from config key '%s'). Leaving it as is.",
value,
key,
)
new_options[key] = value
else:
new_options[key] = value
return new_options
def _get_dict_without_keys(a_dict: Dict, without: Iterable) -> Dict:
return {k: v for k, v in a_dict.items() if k not in without}
def _create_sqlalchemy_engine(dbms_config: Dict):
try:
# Third-party imports
import sqlalchemy
except ModuleNotFoundError as e:
logger.error(
"Please install the SQLAlchemy package to be able to use SqlDataSource."
)
raise e
connection_string = dbms_config.get("connection_string")
connect_args = get_connection_args(dbms_config)
kw_args = _get_dict_without_keys(
dbms_config, ["type", "options", "connection_string"]
)
if "connect_args" in kw_args:
if connect_args:
raise ValueError(
"'options:' (used as 'connect_args') have been specified in "
"combination with an explicit 'connect_args:'. Please specify "
"either 'options:' or 'connect_args:', not both."
)
else:
kw_args["connect_args"] = connect_args
if "url" in kw_args:
if connection_string:
raise ValueError(
"'connection_string:' (used as 'url') has been specified in "
"combination with an explicit 'url:'. Please specify "
"either 'connection_string:' or 'url:', not both."
)
connection_string = kw_args["url"]
del kw_args["url"]
engine = sqlalchemy.create_engine(connection_string, **kw_args)
return engine
[docs]def fill_nas(
df: pd.DataFrame, as_generator: bool = False
) -> Union[pd.DataFrame, Generator]:
if as_generator:
def wrapped_iterator(data):
for partial_df in data:
partial_df.fillna(np.nan, inplace=True)
yield partial_df
return wrapped_iterator(df)
else:
df.fillna(np.nan, inplace=True)
return df
[docs]def ensure_dir_to(file_path):
path = os.path.dirname(file_path)
if path != "" and not os.path.exists(path):
logger.info("Creating missing path to file `{}`.".format(file_path))
os.makedirs(path)
[docs]class SqlDataSource(DataSource):
"""DataSource for RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL (ODBC), and their dialects.
Uses `SQLAlchemy <https://docs.sqlalchemy.org/>`_ under the hood, and as such, manages a connection pool automatically.
Please configure the ``dbms:<name>:connection_string:``, which is a standard RFC-1738 URL with the syntax ``dialect[+driver]://[user:password@][host]/[dbname][?key=value..]``.
The exact URL is specific for the database you want to connect to.
Find examples for all supported database dialects `here <https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls>`_.
Depending on the dialect you want to use, you might need to install additional drivers and packages.
For example, for connecting to a kerberized Impala instance via ODBC, you need to:
#. Install `Impala ODBC drivers for your OS <https://www.cloudera.com/downloads/connectors/impala/odbc/2-6-10.html>`_,
#. ``pip install winkerberos thrift_sasl pyodbc sqlalchemy`` # use pykerberos for non-windows systems
If you are tasked with connecting to a particular database system, and don't know where
to start, researching on how to connect to it from SQLAlchemy will serve as a good starting point.
Other configuration in the ``dbms:`` section (besides ``connection_string:``) is optional,
but can be provided if deemed necessary:
* Any ``dbms:``-level settings other than ``type:``, ``connection_string:`` and ``options:`` will be passed as additional
keyword arguments to SQLAlchemy's `create_engine <https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine>`_.
* Any key-value pairs inside ``dbms:<name>:options: {}`` will be passed to SQLAlchemy as `connect_args <https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args>`_.
If you append ``_var`` to the end of an argument key, its value will be interpreted as an
environment variable name which ML Launchpad will attempt to get a value from.
This can be useful for information like passwords which you do not want to store in the configuration file.
Configuration example::
dbms:
# ... (other connections)
# Example for connecting to a kerberized Impala instance via ODBC:
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: sql
connection_string: mssql+pyodbc:///default?&driver=Cloudera+ODBC+Driver+for+Impala&host=servername.somedomain.com&port=21050&authmech=1&krbservicename=impala&ssl=1&usesasl=1&ignoretransactions=1&usesystemtruststore=1
# pyodbc alternative: mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BCloudera+ODBC+Driver+for+Impala%7D%3BHOST%3Dservername.somedomain.com%3BPORT%3D21050%3BAUTHMECH%3D1%3BKRBSERVICENAME%3Dimpala%3BSSL%3D1%3BUSESASL%3D1%3BIGNORETRANSACTIONS%3D1%3BUSESYSTEMTRUSTSTORE%3D1
echo: True # example for an additional SQLAlchemy keyword argument (logs the SQL) -- these are optional
options: {} # used as `connect_args` when creating the SQLAlchemy engine
# ...
datasources:
# ... (other datasources)
my_datasource:
type: dbms.my_connection
query: SELECT * FROM somewhere.my_table WHERE id = :id # fill `:params` by calling `get_dataframe` with a `dict`
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when fetching the query using `pandas.read_sql`
"""
serves = ["dbms.sql"]
def __init__(
self, identifier: str, datasource_config: Dict, dbms_config: Dict
):
super().__init__(identifier, datasource_config)
self.dbms_config = dbms_config
logger.info(
"Creating database connection engine for datasource {}...".format(
self.id
)
)
self.engine = _create_sqlalchemy_engine(dbms_config)
def get_dataframe(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Union[pd.DataFrame, Generator]:
"""Get the data as pandas dataframe.
Null values are replaced by ``numpy.nan``.
Example::
my_df = data_sources["my_datasource"].get_dataframe({"id": 387})
:param params: Query parameters to fill in query (e.g. replace query's `:id` parameter with value `387`)
:type params: optional dict
:param chunksize: Return an iterator where chunksize is the number of rows to include in each chunk.
:type chunksize: optional int
:return: DataFrame object, possibly cached according to config value of `expires:`
"""
# https://stackoverflow.com/questions/53793877/usage-error-in-pandas-read-sql-with-sqlalchemy#comment94441435_53793978
# Third-party imports
from sqlalchemy import text
query = self.config["query"]
params = params or {}
kw_options = self.options
logger.debug(
"Fetching query {} with params {}, chunksize {}, and options {}...".format(
query, params, chunksize, kw_options
)
)
df = pd.read_sql(
text(query),
con=self.engine,
params=params,
chunksize=chunksize,
**kw_options
)
return fill_nas(df, as_generator=chunksize is not None)
def get_raw(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Raw:
"""Not implemented.
:raises NotImplementedError: Raw/blob format currently not supported.
"""
raise NotImplementedError(
"SqlDataSource currently does not not support raw format/blobs. "
'Use method "get_dataframe" for dataframes'
)
[docs]class SqlDataSink(DataSink):
"""DataSink for RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL (ODBC), and their dialects.
Uses `SQLAlchemy <https://docs.sqlalchemy.org/>`_ under the hood, and as such, manages a connection pool automatically.
Please configure the ``dbms:<name>:connection_string:``, which is a standard RFC-1738 URL with the syntax ``dialect[+driver]://[user:password@][host]/[dbname][?key=value..]``.
The exact URL is specific for the database you want to connect to.
Find examples for all supported database dialects `here <https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls>`_.
Depending on the dialect you want to use, you might need to install additional drivers and packages.
For example, for connecting to a kerberized Impala instance via ODBC, you need to:
#. Install `Impala ODBC drivers for your OS <https://www.cloudera.com/downloads/connectors/impala/odbc/2-6-10.html>`_,
#. ``pip install winkerberos thrift_sasl pyodbc sqlalchemy`` # use pykerberos for non-windows systems
If you are tasked with connecting to a particular database system, and don't know where
to start, researching on how to connect to it from SQLAlchemy will serve as a good starting point.
Other configuration in the ``dbms:`` section (besides ``connection_string:``) is optional,
but can be provided if deemed necessary:
* Any ``dbms:``-level settings other than ``type:``, ``connection_string:`` and ``options:`` will be passed as additional
keyword arguments to SQLAlchemy's `create_engine <https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine>`_.
* Any key-value pairs inside ``dbms:<name>:options: {}`` will be passed to SQLAlchemy as `connect_args <https://docs.sqlalchemy.org/en/13/core/engines.html#sqlalchemy.create_engine.params.connect_args>`_.
If you append ``_var`` to the end of an argument key, its value will be interpreted as an
environment variable name which ML Launchpad will attempt to get a value from.
This can be useful for information like passwords which you do not want to store in the configuration file.
Configuration example::
dbms:
# ... (other connections)
# Example for connecting to a kerberized Impala instance via ODBC:
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: sql
connection_string: mssql+pyodbc:///default?&driver=Cloudera+ODBC+Driver+for+Impala&host=servername.somedomain.com&port=21050&authmech=1&krbservicename=impala&ssl=1&usesasl=1&ignoretransactions=1&usesystemtruststore=1
# pyodbc alternative: mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BCloudera+ODBC+Driver+for+Impala%7D%3BHOST%3Dservername.somedomain.com%3BPORT%3D21050%3BAUTHMECH%3D1%3BKRBSERVICENAME%3Dimpala%3BSSL%3D1%3BUSESASL%3D1%3BIGNORETRANSACTIONS%3D1%3BUSESYSTEMTRUSTSTORE%3D1
echo: True # example for an additional SQLAlchemy keyword argument (logs the SQL) -- these are optional
options: {} # used as `connect_args` when creating the SQLAlchemy engine
# ...
datasinks:
# ... (other datasinks)
my_datasink:
type: dbms.my_connection
table: somewhere.my_table
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when storing the table using `my_df.to_sql`
"""
serves = ["dbms.sql"]
def __init__(
self, identifier: str, datasource_config: Dict, dbms_config: Dict
):
super().__init__(identifier, datasource_config)
self.dbms_config = dbms_config
logger.info(
"Creating database connection engine for datasource {}...".format(
self.id
)
)
self.engine = _create_sqlalchemy_engine(dbms_config)
[docs] def put_dataframe(
self,
dataframe: pd.DataFrame,
params: Dict = None,
chunksize: Optional[int] = None,
) -> None:
"""Store the pandas dataframe as a table.
The default is not to store the dataframe's row index.
Configure the DataSink's options dict to pass keyword arguments to `df.to_sql`.
Example::
data_sinks["my_datasink"].put_dataframe(my_df)
:param dataframe: The pandas dataframe to store
:type dataframe: pandas DataFrame
:param params: Currently not implemented
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
"""
if params:
raise NotImplementedError("Parameters not supported yet")
if chunksize:
raise NotImplementedError("Buffered storing not supported yet")
table = self.config["table"]
kw_options = self.options
if "index" not in kw_options:
kw_options["index"] = False
logger.debug(
"Storing data in table {} with options {}...".format(
table, kw_options
)
)
dataframe.to_sql(table, con=self.engine, **kw_options)
[docs] def put_raw(
self, raw_data, params: Dict = None, chunksize: Optional[int] = None
) -> None:
"""Not implemented.
:raises NotImplementedError: Raw/blob format currently not supported.
"""
raise NotImplementedError(
"SqlDataSink currently does not not support raw format/blobs. "
'Use method "put_dataframe" for raw data'
)
def _get_oracle_connection(dbms_config: Dict):
# Third-party imports
import cx_Oracle # Importing here avoids environment-specific dependencies
user, pw = get_user_pw(
dbms_config["user_var"], dbms_config["password_var"]
)
dsn_tns = cx_Oracle.makedsn(
dbms_config["host"],
dbms_config["port"],
service_name=dbms_config["service_name"],
)
logger.debug("Oracle connection string: %s", dsn_tns)
kw_options = dbms_config.get("options", {})
connection = cx_Oracle.connect(user, pw, dsn_tns, **kw_options)
return connection
[docs]class OracleDataSource(DataSource):
"""DataSource for Oracle database connections.
Creates a long-living connection on initialization.
Configuration example::
dbms:
# ... (other connections)
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: oracle
host: host.example.com
port: 1251
user_var: MY_USER_ENV_VAR
password_var: MY_PW_ENV_VAR # optional
service_name: servicename.example.com
options: {} # used as **kwargs when initializing the DB connection
# ...
datasources:
# ... (other datasources)
my_datasource:
type: dbms.my_connection
query: SELECT * FROM somewhere.my_table where id = :id # fill `:params` by calling `get_dataframe` with a `dict`
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when fetching the query using `pandas.read_sql`
"""
serves = ["dbms.oracle"]
def __init__(
self, identifier: str, datasource_config: Dict, dbms_config: Dict
):
super().__init__(identifier, datasource_config)
self.dbms_config = dbms_config
logger.info(
"Establishing Oracle database connection for datasource {}...".format(
self.id
)
)
self.connection = _get_oracle_connection(dbms_config)
def get_dataframe(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Union[pd.DataFrame, Generator]:
"""Get the data as pandas dataframe.
Null values are replaced by ``numpy.nan``.
Example::
data_sources["my_datasource"].get_dataframe({"id": 387})
:param params: Query parameters to fill in query (e.g. replace query's `:id` parameter with value `387`)
:type params: optional dict
:param chunksize: Return an iterator where chunksize is the number of rows to include in each chunk.
:type chunksize: optional int
:return: DataFrame object, possibly cached according to config value of `expires:`
"""
# TODO: maybe want to open/close connection on every method call (shouldn't happen often)
query = self.config["query"]
params = params or {}
kw_options = self.options
logger.debug(
"Fetching query {} with params {}, chunksize {}, and options {}...".format(
query, params, chunksize, kw_options
)
)
df = pd.read_sql(
query,
con=self.connection,
params=params,
chunksize=chunksize,
**kw_options
)
return fill_nas(df, as_generator=chunksize is not None)
def get_raw(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Raw:
"""Not implemented.
:raises NotImplementedError: Raw/blob format currently not supported.
"""
raise NotImplementedError(
"OracleDataSource currently does not not support raw format/blobs. "
'Use method "get_dataframe" for dataframes'
)
def __del__(self):
if hasattr(self, "connection"):
self.connection.close()
[docs]class OracleDataSink(DataSink):
"""DataSink for Oracle database connections.
Creates a long-living connection on initialization.
Configuration example::
dbms:
# ... (other connections)
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: oracle
host: host.example.com
port: 1251
user_var: MY_USER_ENV_VAR
password_var: MY_PW_ENV_VAR # optional
service_name: servicename.example.com
options: {} # used as **kwargs when initializing the DB connection
# ...
datasinks:
# ... (other datasinks)
my_datasink:
type: dbms.my_connection
table: somewhere.my_table
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when storing the table using `my_df.to_sql`
"""
serves = ["dbms.oracle"]
def __init__(
self, identifier: str, datasink_config: Dict, dbms_config: Dict
):
super().__init__(identifier, datasink_config)
self.dbms_config = dbms_config
logger.info(
"Establishing Oracle database connection for datasource {}...".format(
self.id
)
)
self.connection = _get_oracle_connection(dbms_config)
[docs] def put_dataframe(
self,
dataframe: pd.DataFrame,
params: Dict = None,
chunksize: Optional[int] = None,
) -> None:
"""Store the pandas dataframe as a table.
The default is not to store the dataframe's row index.
Configure the DataSink's options dict to pass keyword arguments to `df.to_sql`.
Example::
data_sinks["my_datasink"].put_dataframe(my_df)
:param dataframe: The pandas dataframe to store
:type dataframe: pandas DataFrame
:param params: Currently not implemented
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
"""
if params:
raise NotImplementedError("Parameters not supported yet")
if chunksize:
raise NotImplementedError("Buffered storing not supported yet")
# TODO: maybe want to open/close connection on every method call (shouldn't happen often)
table = self.config["table"]
kw_options = self.options
if "index" not in kw_options:
kw_options["index"] = False
logger.debug(
"Storing data in table {} with options {}...".format(
table, kw_options
)
)
dataframe.to_sql(table, con=self.connection, **kw_options)
[docs] def put_raw(
self, raw_data, params: Dict = None, chunksize: Optional[int] = None
) -> None:
"""Not implemented.
:raises NotImplementedError: Raw/blob format currently not supported.
"""
raise NotImplementedError(
"OracleDataSink currently does not not support raw format/blobs. "
'Use method "put_dataframe" for raw data'
)
def __del__(self):
if hasattr(self, "connection"):
self.connection.close()
[docs]class FileDataSource(DataSource):
"""DataSource for fetching data from files.
See :attr:`serves` for the available types.
Configuration example::
datasources:
# ... (other datasources)
my_datasource:
type: euro_csv # `euro_csv` changes separators to ";" and decimals to "," w.r.t. `csv`
path: /some/file.csv # Can be URL, uses `pandas.read_csv` internally
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when fetching the data using `pandas.read_csv`
dtypes_path: ./some/file.dtypes # optional: location with the csv's column dtypes info
my_raw_datasource:
type: text_file # raw files can also be of type `binary_file`
path: /some/file.txt # Can be URL
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when fetching the data using `fh.read`
When loading `csv` or `euro_csv` type formats, you can use the setting `dtypes_path`
to specify a location with dtypes description for the csv (usually generated earlier
by using :class:`FileDataSink`'s `dtypes_path` setting). These dtypes will be enforced when
reading the csv, which helps avoid problems when `pandas.read_csv` interprets data differently than you do.
Use `dtypes_path` to enforce dtype parity between csv datasinks and datasources.
Using the raw formats `binary_file` and `text_file`, you can read arbitrary data, as long as
it can be represented as a `bytes` or a `str` object, respectively. `text_file` uses UTF-8
encoding. Please note that while possible, it is not
recommended to persist `DataFrame`s this way, because by adding format-specific code to your
model, you're giving up your code's independence from the type of `DataSource`/`DataSink`.
Here's an example for unpickling an arbitrary object::
# config fragment:
datasources:
# ...
my_pickle_datasource:
type: binary_file
path: /some/file.pickle
tags: [train]
options: {}
# code fragment:
import pickle
# ...
# in predict/test/train code:
my_pickle = data_sources["my_pickle_datasource"].get_raw()
my_object = pickle.loads(my_pickle)
"""
serves = SUPPORTED_FILE_TYPES
def __init__(self, identifier: str, datasource_config: Dict):
super().__init__(identifier, datasource_config)
ds_type = datasource_config["type"]
if ds_type not in SUPPORTED_FILE_TYPES:
raise TypeError(
"{} is not a datasource file type (in datasource {}).".format(
repr(ds_type), repr(identifier)
)
)
self.type = ds_type
self.path = datasource_config["path"]
self.dtypes_path = datasource_config.get("dtypes_path")
def get_dataframe(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Union[pd.DataFrame, Generator]:
"""Get data as a pandas dataframe.
Example::
data_sources["my_datasource"].get_dataframe()
:param params: Currently not implemented
:type params: optional dict
:param chunksize: Return an iterator where chunksize is the number of rows to include in each chunk.
:type chunksize: optional int
:return: DataFrame object, possibly cached according to config value of `expires:`
"""
if params:
raise NotImplementedError("Parameters not supported yet")
kw_options = self.options
logger.debug(
"Loading type {} file {} with dtypes_path {}, chunksize {} and options {}...".format(
self.type, self.path, self.dtypes_path, chunksize, kw_options
)
)
if self.dtypes_path is not None:
input_dtypes = pd.read_csv(self.dtypes_path).set_index("columns")
kw_options["dtype"] = {
item: input_dtypes.loc[item]["dtypes"]
for item in input_dtypes.index
if input_dtypes.loc[item]["dtypes"] != "datetime"
}
kw_options["parse_dates"] = [
item
for item in input_dtypes.index
if input_dtypes.loc[item]["dtypes"] == "datetime"
]
if self.type == "csv":
df = pd.read_csv(self.path, chunksize=chunksize, **kw_options)
elif self.type == "euro_csv":
df = pd.read_csv(
self.path,
sep=";",
decimal=",",
chunksize=chunksize,
**kw_options
)
else:
raise TypeError(
'Can only read csv files as dataframes. Use method "get_raw" for raw data'
)
return df
def get_raw(
self, params: Dict = None, chunksize: Optional[int] = None
) -> Raw:
"""Get data as raw (unstructured) data.
Example::
data_sources["my_raw_datasource"].get_raw()
:param params: Currently not implemented
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
:return: The file's bytes (binary) or string (text) contents, possibly cached according to config value of `expires:`
:rtype: bytes or str
"""
if params:
raise NotImplementedError("Parameters not supported yet")
if chunksize:
raise NotImplementedError("Buffered reading not supported yet")
kw_options = self.options
logger.debug(
"Loading raw {} {} with options {}...".format(
self.type, self.path, kw_options
)
)
raw: Raw
if self.type == "text_file":
with open(self.path, "r", encoding="utf-8") as txt_file:
raw = txt_file.read(**kw_options)
elif self.type == "binary_file":
with open(self.path, "rb") as bin_file:
raw = bin_file.read(**kw_options)
else:
raise TypeError(
"Can only read binary data or text strings as raw file. "
'Use method "get_dataframe" for dataframes'
)
return raw
[docs]class FileDataSink(DataSink):
"""DataSink for putting data into files.
See :attr:`serves` for the available types.
Configuration example::
datasinks:
# ... (other datasinks)
my_datasink:
type: euro_csv # `euro_csv` changes separators to ";" and decimals to "," w.r.t. `csv`
path: /some/file.csv # Can be URL, uses `df.to_csv` internally
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when fetching the data using `df.to_csv`
dtypes_path: ./some/file.dtypes # optional: location for saving the csv's column dtypes info
my_raw_datasink:
type: text_file # raw files can also be of type `binary_file`
path: /some/file.txt # Can be URL
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # used as **kwargs when writing the data using `fh.write`
When saving `csv` or `euro_csv` type formats, you can use the setting `dtypes_path`
to specify a location where to save dtypes descriptions for the csv (that you can use later
with :class:`FileDataSource`'s `dtypes_path` setting). These dtypes will be enforced when
reading the csv, which helps avoid problems when `pandas.read_csv` interprets data differently than you do.
Use `dtypes_path` to enforce dtype parity between csv datasinks and datasources.
Using the raw formats `binary_file` and `text_file`, you can persist arbitrary data, as long as
it can be represented as a `bytes` or a `str` object, respectively. `text_file` uses UTF-8
encoding. Please note that while possible, it is not
recommended to persist `DataFrame`s this way, because by adding format-specific code to your
model, you're giving up your code's independence from the type of `DataSource`/`DataSink`.
Here's an example for pickling an arbitrary object::
# config fragment:
datasinks:
# ...
my_pickle_datasink:
type: binary_file
path: /some/file.pickle
tags: [train]
options: {}
# code fragment:
import pickle
# ...
# in predict/test/train code:
my_pickle = pickle.dumps(my_object)
data_sinks["my_pickle_datasink"].put_raw(my_pickle)
"""
serves = SUPPORTED_FILE_TYPES
def __init__(self, identifier: str, datasink_config: Dict):
super().__init__(identifier, datasink_config)
ds_type = datasink_config["type"]
if ds_type not in SUPPORTED_FILE_TYPES:
raise TypeError(
"{} is not a datasink file type (in datasink {}).".format(
repr(ds_type), repr(identifier)
)
)
self.type = ds_type
self.path = datasink_config["path"]
self.dtypes_path = datasink_config.get("dtypes_path")
[docs] def put_dataframe(
self,
dataframe: pd.DataFrame,
params: Dict = None,
chunksize: Optional[int] = None,
) -> None:
"""Write a pandas dataframe to file and optionally the dtypes if included in the configuration.
The default is not to save the dataframe's row index.
Configure the DataSink's `options` dict to pass keyword arguments to `my_df.to_csv`.
If the directory path leading to the file does not exist, it will be created.
Example::
data_sinks["my_datasink"].put_dataframe(my_df)
:param dataframe: The pandas dataframe to save
:type dataframe: pandas DataFrame
:param params: Currently not implemented
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
"""
if params:
raise NotImplementedError("Parameters not supported yet")
if chunksize:
raise NotImplementedError("Buffered writing not supported yet")
kw_options = self.options
if "index" not in kw_options:
kw_options["index"] = False
logger.debug(
"Writing dataframe to type {} file {} with options {} and dtypes_path {}...".format(
self.type, self.path, kw_options, self.dtypes_path
)
)
if self.dtypes_path:
dtypes_file = pd.DataFrame(
dataframe.dtypes, columns=["dtypes"]
).rename_axis("columns")
dtypes_file.loc[
dtypes_file["dtypes"] == "object", "dtypes"
] = "str"
dtypes_file.loc[
dtypes_file["dtypes"] == "datetime64[ns]", "dtypes"
] = "datetime"
ensure_dir_to(self.dtypes_path)
dtypes_file.to_csv(self.dtypes_path)
if self.type == "csv":
ensure_dir_to(self.path)
dataframe.to_csv(self.path, **kw_options)
elif self.type == "euro_csv":
ensure_dir_to(self.path)
dataframe.to_csv(self.path, sep=";", decimal=",", **kw_options)
else:
raise TypeError(
'Can only write dataframes to csv file. Use method "put_raw" for raw data'
)
[docs] def put_raw(
self,
raw_data: Raw,
params: Dict = None,
chunksize: Optional[int] = None,
) -> None:
"""Write raw (unstructured) data to file.
If the directory path leading to the file does not exist, it will be created.
Example::
data_sinks["my_raw_datasink"].put_raw(my_data)
:param raw_data: The data to save (bytes for binary, string for text file)
:type raw_data: bytes or str
:param params: Currently not implemented
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
"""
if params:
raise NotImplementedError("Parameters not supported yet")
if chunksize:
raise NotImplementedError("Buffered writing not supported yet")
kw_options = self.options
logger.debug(
"Writing raw {} file {} with options {}...".format(
self.type, self.path, kw_options
)
)
if self.type == "text_file":
if "encoding" not in kw_options:
kw_options["encoding"] = "utf-8"
ensure_dir_to(self.path)
with open(self.path, "w", **kw_options) as txt_file:
raw_str: str = cast(str, raw_data)
txt_file.write(raw_str)
elif self.type == "binary_file":
ensure_dir_to(self.path)
with open(self.path, "wb", **kw_options) as bin_file:
raw_bytes: bytes = cast(bytes, raw_data)
bin_file.write(raw_bytes)
else:
raise TypeError(
"Can only write binary data or text strings as raw file. "
+ 'Use method "put_dataframe" for dataframes'
)