DataSources and DataSinks

DataSources and DataSinks are what loosely couples your model’s code to the data.

From your model’s code, instead of accessing your data locations directly, you access your data via the DataSources and DataSinks that are provided by ML Launchpad. To your code, all DataSources and DataSinks behave the same and are used the same way for the same data format (DataFrames, raw files, etc.).

For example, to obtain a pandas DataFrame, use the DataSource’s get_dataframe() method. Your code does not need to know whether your data was originally obtained from a database, file, or web service.

As DataSources and DataSinks are very similar, we will use the term DataSource below meaning either.

Different subclasses of DataSources provide you with different kinds of connections. In the following sections, you can find lists of built-in and external DataSources and DataSinks.

Each subclass of DataSources (e.g. FileDataSource, SqlDataSource) serves one or several types (e.g. csv, euro_csv, dbms.sql). You specify the type in your DataSource’s configuration. The same type can even be served by several different DataSource subclasses, in which case the last-imported plugin.

Here is an example for a configured FileDataSource:

datasources:
  my_datasource:
    type: csv
    path: ./iris_train.csv
    expires: 0
    cache_size: 32  # optional
    options: {}
    tags: train

Where the parts of this examples are:

  • datasources (or datasinks; optional): Can contain as many child elements (configured DataSources or DataSinks) as you like.

  • my_datasource: The name by which you want to refer to a specific configured DataSource. Used to get data, e.g.: data_sources["my_datasource"].get_dataframe(). This name is up to you to choose.

  • type (required in every DataSource): the type that a DataSource needs to serve in order to be chosen for you. In this case, when ML Launchpad looks up which DataSources serve the csv type, it finds FileDataSource and will use it.

  • path (specific to FileDataSource): The path of the file. Every DataSource has its own specific properties which are part of the DataSource’s documentation (see the next section for built-ins).

  • expires (required in every DataSource): This controls caching of the data. 0 means that it expires immediately, -1 that it never expires, and another number specifies the number of seconds after the cached data expires and should be gotten afresh from the source. Note that data is cached specifically for each unique combination of params passed to get_dataframe(), up to the maximum cache size.

  • cache_size (optional, default: 32, DataSources only): Maximum number of items to cache.

  • tags (required in every DataSource): a combination of one or several of the possible tags train, test and predict (use [brackets] around more than one tag). This determines the model function(s) the DataSource will be made available for.

For more complete examples, have a look at the Tutorial or at the examples (download).

Please note that DataSources and DataSinks will be initialized for you by ML Launchpad depending on your configuration. Your code will just get “some” DataSource, but won’t have to import, initialize, or even know the name of the DataSource class that is used under the hood.

When needing to use e.g. several tables that reside in the same data base, it is useful to not have to configure their connection details for every one of the DataSources that correspond with those tables, but configure a connection only once. For this, you specify a separate dbms: section in your configuration where you give each connection a name (e.g. my_connection) which you can refer to in your datasource config by a type like e.g. dmbs.my_connection. See SqlDataSource) below for an example.

Built-in DataSources and DataSinks

When you pip install mllaunchpad, it comes with a number of built-in DataSources and DataSinks that are ready to use without needing to specify any plugins: [] in the Configuration.

Their documentation follows hereunder.

class mllaunchpad.datasources.FileDataSource(identifier: str, datasource_config: Dict)[source]

DataSource for fetching data from files.

See serves for the available types.

Configuration example:

datasources:
  # ... (other datasources)
  my_datasource:
    type: euro_csv  # `euro_csv` changes separators to ";" and decimals to "," w.r.t. `csv`
    path: /some/file.csv  # Can be URL, uses `pandas.read_csv` internally
    expires: 0    # generic parameter, see documentation on DataSources
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when fetching the data using `pandas.read_csv`
    dtypes_path: ./some/file.dtypes # optional: location with the csv's column dtypes info
  my_raw_datasource:
    type: text_file  # raw files can also be of type `binary_file`
    path: /some/file.txt  # Can be URL
    expires: 0    # generic parameter, see documentation on DataSources
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when fetching the data using `fh.read`

When loading csv or euro_csv type formats, you can use the setting dtypes_path to specify a location with dtypes description for the csv (usually generated earlier by using FileDataSink’s dtypes_path setting). These dtypes will be enforced when reading the csv, which helps avoid problems when pandas.read_csv interprets data differently than you do. Use dtypes_path to enforce dtype parity between csv datasinks and datasources.

Using the raw formats binary_file and text_file, you can read arbitrary data, as long as it can be represented as a bytes or a str object, respectively. text_file uses UTF-8 encoding. Please note that while possible, it is not recommended to persist DataFrame`s this way, because by adding format-specific code to your model, you’re giving up your code’s independence from the type of `DataSource/DataSink. Here’s an example for unpickling an arbitrary object:

# config fragment:
datasources:
  # ...
  my_pickle_datasource:
    type: binary_file
    path: /some/file.pickle
    tags: [train]
    options: {}

# code fragment:
import pickle
# ...
# in predict/test/train code:
my_pickle = data_sources["my_pickle_datasource"].get_raw()
my_object = pickle.loads(my_pickle)
get_dataframe(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Get data as a pandas dataframe.

Example:

data_sources["my_datasource"].get_dataframe()
Parameters
  • params (optional dict) – Currently not implemented

  • chunksize (optional int) – Return an iterator where chunksize is the number of rows to include in each chunk.

Returns

DataFrame object, possibly cached according to config value of expires:

get_raw(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Get data as raw (unstructured) data.

Example:

data_sources["my_raw_datasource"].get_raw()
Parameters
  • params (optional dict) – Currently not implemented

  • chunksize (optional bool) – Currently not implemented

Returns

The file’s bytes (binary) or string (text) contents, possibly cached according to config value of expires:

Return type

bytes or str

serves: List[str] = ['csv', 'euro_csv', 'text_file', 'binary_file']
class mllaunchpad.datasources.FileDataSink(identifier: str, datasink_config: Dict)[source]

DataSink for putting data into files.

See serves for the available types.

Configuration example:

datasinks:
  # ... (other datasinks)
  my_datasink:
    type: euro_csv  # `euro_csv` changes separators to ";" and decimals to "," w.r.t. `csv`
    path: /some/file.csv  # Can be URL, uses `df.to_csv` internally
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when fetching the data using `df.to_csv`
    dtypes_path: ./some/file.dtypes # optional: location for saving the csv's column dtypes info
  my_raw_datasink:
    type: text_file  # raw files can also be of type `binary_file`
    path: /some/file.txt  # Can be URL
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when writing the data using `fh.write`

When saving csv or euro_csv type formats, you can use the setting dtypes_path to specify a location where to save dtypes descriptions for the csv (that you can use later with FileDataSource’s dtypes_path setting). These dtypes will be enforced when reading the csv, which helps avoid problems when pandas.read_csv interprets data differently than you do. Use dtypes_path to enforce dtype parity between csv datasinks and datasources.

Using the raw formats binary_file and text_file, you can persist arbitrary data, as long as it can be represented as a bytes or a str object, respectively. text_file uses UTF-8 encoding. Please note that while possible, it is not recommended to persist DataFrame`s this way, because by adding format-specific code to your model, you’re giving up your code’s independence from the type of `DataSource/DataSink. Here’s an example for pickling an arbitrary object:

# config fragment:
datasinks:
  # ...
  my_pickle_datasink:
    type: binary_file
    path: /some/file.pickle
    tags: [train]
    options: {}

# code fragment:
import pickle
# ...
# in predict/test/train code:
my_pickle = pickle.dumps(my_object)
data_sinks["my_pickle_datasink"].put_raw(my_pickle)
put_dataframe(dataframe: pandas.core.frame.DataFrame, params: Optional[Dict] = None, chunksize: Optional[int] = None) None[source]

Write a pandas dataframe to file and optionally the dtypes if included in the configuration. The default is not to save the dataframe’s row index. Configure the DataSink’s options dict to pass keyword arguments to my_df.to_csv. If the directory path leading to the file does not exist, it will be created.

Example:

data_sinks["my_datasink"].put_dataframe(my_df)
Parameters
  • dataframe (pandas DataFrame) – The pandas dataframe to save

  • params (optional dict) – Currently not implemented

  • chunksize (optional bool) – Currently not implemented

put_raw(raw_data: Union[str, bytes], params: Optional[Dict] = None, chunksize: Optional[int] = None) None[source]

Write raw (unstructured) data to file. If the directory path leading to the file does not exist, it will be created.

Example:

data_sinks["my_raw_datasink"].put_raw(my_data)
Parameters
  • raw_data (bytes or str) – The data to save (bytes for binary, string for text file)

  • params (optional dict) – Currently not implemented

  • chunksize (optional bool) – Currently not implemented

serves: List[str] = ['csv', 'euro_csv', 'text_file', 'binary_file']
class mllaunchpad.datasources.SqlDataSource(identifier: str, datasource_config: Dict, dbms_config: Dict)[source]

DataSource for RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL (ODBC), and their dialects.

Uses SQLAlchemy under the hood, and as such, manages a connection pool automatically.

Please configure the dbms:<name>:connection_string:, which is a standard RFC-1738 URL with the syntax dialect[+driver]://[user:password@][host]/[dbname][?key=value..]. The exact URL is specific for the database you want to connect to. Find examples for all supported database dialects here.

Depending on the dialect you want to use, you might need to install additional drivers and packages. For example, for connecting to a kerberized Impala instance via ODBC, you need to:

  1. Install Impala ODBC drivers for your OS,

  2. pip install winkerberos thrift_sasl pyodbc sqlalchemy # use pykerberos for non-windows systems

If you are tasked with connecting to a particular database system, and don’t know where to start, researching on how to connect to it from SQLAlchemy will serve as a good starting point.

Other configuration in the dbms: section (besides connection_string:) is optional, but can be provided if deemed necessary:

  • Any dbms:-level settings other than type:, connection_string: and options: will be passed as additional keyword arguments to SQLAlchemy’s create_engine.

  • Any key-value pairs inside dbms:<name>:options: {} will be passed to SQLAlchemy as connect_args. If you append _var to the end of an argument key, its value will be interpreted as an environment variable name which ML Launchpad will attempt to get a value from. This can be useful for information like passwords which you do not want to store in the configuration file.

Configuration example:

dbms:
  # ... (other connections)
  # Example for connecting to a kerberized Impala instance via ODBC:
  my_connection:  # NOTE: You can use the same connection for several datasources and datasinks
    type: sql
    connection_string: mssql+pyodbc:///default?&driver=Cloudera+ODBC+Driver+for+Impala&host=servername.somedomain.com&port=21050&authmech=1&krbservicename=impala&ssl=1&usesasl=1&ignoretransactions=1&usesystemtruststore=1
    # pyodbc alternative: mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BCloudera+ODBC+Driver+for+Impala%7D%3BHOST%3Dservername.somedomain.com%3BPORT%3D21050%3BAUTHMECH%3D1%3BKRBSERVICENAME%3Dimpala%3BSSL%3D1%3BUSESASL%3D1%3BIGNORETRANSACTIONS%3D1%3BUSESYSTEMTRUSTSTORE%3D1
    echo: True  # example for an additional SQLAlchemy keyword argument (logs the SQL) -- these are optional
    options: {}  # used as `connect_args` when creating the SQLAlchemy engine
# ...
datasources:
  # ... (other datasources)
  my_datasource:
    type: dbms.my_connection
    query: SELECT * FROM somewhere.my_table WHERE id = :id  # fill `:params` by calling `get_dataframe` with a `dict`
    expires: 0    # generic parameter, see documentation on DataSources
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when fetching the query using `pandas.read_sql`
get_dataframe(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Get the data as pandas dataframe.

Null values are replaced by numpy.nan.

Example:

my_df = data_sources["my_datasource"].get_dataframe({"id": 387})
Parameters
  • params (optional dict) – Query parameters to fill in query (e.g. replace query’s :id parameter with value 387)

  • chunksize (optional int) – Return an iterator where chunksize is the number of rows to include in each chunk.

Returns

DataFrame object, possibly cached according to config value of expires:

get_raw(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Not implemented.

Raises

NotImplementedError – Raw/blob format currently not supported.

serves: List[str] = ['dbms.sql']
class mllaunchpad.datasources.SqlDataSink(identifier: str, datasource_config: Dict, dbms_config: Dict)[source]

DataSink for RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL (ODBC), and their dialects.

Uses SQLAlchemy under the hood, and as such, manages a connection pool automatically.

Please configure the dbms:<name>:connection_string:, which is a standard RFC-1738 URL with the syntax dialect[+driver]://[user:password@][host]/[dbname][?key=value..]. The exact URL is specific for the database you want to connect to. Find examples for all supported database dialects here.

Depending on the dialect you want to use, you might need to install additional drivers and packages. For example, for connecting to a kerberized Impala instance via ODBC, you need to:

  1. Install Impala ODBC drivers for your OS,

  2. pip install winkerberos thrift_sasl pyodbc sqlalchemy # use pykerberos for non-windows systems

If you are tasked with connecting to a particular database system, and don’t know where to start, researching on how to connect to it from SQLAlchemy will serve as a good starting point.

Other configuration in the dbms: section (besides connection_string:) is optional, but can be provided if deemed necessary:

  • Any dbms:-level settings other than type:, connection_string: and options: will be passed as additional keyword arguments to SQLAlchemy’s create_engine.

  • Any key-value pairs inside dbms:<name>:options: {} will be passed to SQLAlchemy as connect_args. If you append _var to the end of an argument key, its value will be interpreted as an environment variable name which ML Launchpad will attempt to get a value from. This can be useful for information like passwords which you do not want to store in the configuration file.

Configuration example:

dbms:
  # ... (other connections)
  # Example for connecting to a kerberized Impala instance via ODBC:
  my_connection:  # NOTE: You can use the same connection for several datasources and datasinks
    type: sql
    connection_string: mssql+pyodbc:///default?&driver=Cloudera+ODBC+Driver+for+Impala&host=servername.somedomain.com&port=21050&authmech=1&krbservicename=impala&ssl=1&usesasl=1&ignoretransactions=1&usesystemtruststore=1
    # pyodbc alternative: mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BCloudera+ODBC+Driver+for+Impala%7D%3BHOST%3Dservername.somedomain.com%3BPORT%3D21050%3BAUTHMECH%3D1%3BKRBSERVICENAME%3Dimpala%3BSSL%3D1%3BUSESASL%3D1%3BIGNORETRANSACTIONS%3D1%3BUSESYSTEMTRUSTSTORE%3D1
    echo: True  # example for an additional SQLAlchemy keyword argument (logs the SQL) -- these are optional
    options: {}  # used as `connect_args` when creating the SQLAlchemy engine
# ...
datasinks:
  # ... (other datasinks)
  my_datasink:
    type: dbms.my_connection
    table: somewhere.my_table
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when storing the table using `my_df.to_sql`
put_dataframe(dataframe: pandas.core.frame.DataFrame, params: Optional[Dict] = None, chunksize: Optional[int] = None) None[source]

Store the pandas dataframe as a table. The default is not to store the dataframe’s row index. Configure the DataSink’s options dict to pass keyword arguments to df.to_sql.

Example:

data_sinks["my_datasink"].put_dataframe(my_df)
Parameters
  • dataframe (pandas DataFrame) – The pandas dataframe to store

  • params (optional dict) – Currently not implemented

  • chunksize (optional bool) – Currently not implemented

put_raw(raw_data, params: Optional[Dict] = None, chunksize: Optional[int] = None) None[source]

Not implemented.

Raises

NotImplementedError – Raw/blob format currently not supported.

serves: List[str] = ['dbms.sql']
class mllaunchpad.datasources.OracleDataSource(identifier: str, datasource_config: Dict, dbms_config: Dict)[source]

DataSource for Oracle database connections.

Creates a long-living connection on initialization.

Configuration example:

dbms:
  # ... (other connections)
  my_connection:  # NOTE: You can use the same connection for several datasources and datasinks
    type: oracle
    host: host.example.com
    port: 1251
    user_var: MY_USER_ENV_VAR
    password_var: MY_PW_ENV_VAR  # optional
    service_name: servicename.example.com
    options: {}  # used as **kwargs when initializing the DB connection
# ...
datasources:
  # ... (other datasources)
  my_datasource:
    type: dbms.my_connection
    query: SELECT * FROM somewhere.my_table where id = :id  # fill `:params` by calling `get_dataframe` with a `dict`
    expires: 0    # generic parameter, see documentation on DataSources
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when fetching the query using `pandas.read_sql`
get_dataframe(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Get the data as pandas dataframe.

Null values are replaced by numpy.nan.

Example:

data_sources["my_datasource"].get_dataframe({"id": 387})
Parameters
  • params (optional dict) – Query parameters to fill in query (e.g. replace query’s :id parameter with value 387)

  • chunksize (optional int) – Return an iterator where chunksize is the number of rows to include in each chunk.

Returns

DataFrame object, possibly cached according to config value of expires:

get_raw(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Not implemented.

Raises

NotImplementedError – Raw/blob format currently not supported.

serves: List[str] = ['dbms.oracle']
class mllaunchpad.datasources.OracleDataSink(identifier: str, datasink_config: Dict, dbms_config: Dict)[source]

DataSink for Oracle database connections.

Creates a long-living connection on initialization.

Configuration example:

dbms:
  # ... (other connections)
  my_connection:  # NOTE: You can use the same connection for several datasources and datasinks
    type: oracle
    host: host.example.com
    port: 1251
    user_var: MY_USER_ENV_VAR
    password_var: MY_PW_ENV_VAR  # optional
    service_name: servicename.example.com
    options: {}  # used as **kwargs when initializing the DB connection
# ...
datasinks:
  # ... (other datasinks)
  my_datasink:
    type: dbms.my_connection
    table: somewhere.my_table
    tags: [train] # generic parameter, see documentation on DataSources and DataSinks
    options: {}   # used as **kwargs when storing the table using `my_df.to_sql`
put_dataframe(dataframe: pandas.core.frame.DataFrame, params: Optional[Dict] = None, chunksize: Optional[int] = None) None[source]

Store the pandas dataframe as a table. The default is not to store the dataframe’s row index. Configure the DataSink’s options dict to pass keyword arguments to df.to_sql.

Example:

data_sinks["my_datasink"].put_dataframe(my_df)
Parameters
  • dataframe (pandas DataFrame) – The pandas dataframe to store

  • params (optional dict) – Currently not implemented

  • chunksize (optional bool) – Currently not implemented

put_raw(raw_data, params: Optional[Dict] = None, chunksize: Optional[int] = None) None[source]

Not implemented.

Raises

NotImplementedError – Raw/blob format currently not supported.

serves: List[str] = ['dbms.oracle']

External DataSources and DataSinks

The datasources here are not part of core ML Launchpad.

To be able to use them:

  1. install them if they support it, or copy them into your project’s source code directory;

  2. add their import statement to the plugins: section of your configuration, e.g.

plugins:  # Add this line if it's not already in your config
  - some_module.my_datasource  # for a file in the 'some_module' directory called 'my_datasource.py'

Their documentation follows hereunder.

class examples.impala_datasource.ImpalaDataSource(identifier, datasink_config, dbms_config)[source]

DataSource for fetching data from Impala queries.

EXPERIMENTAL

Creates a short-lived connection on every non-cached call to get_dataframe.

Configuration:

dbms:
  # ... (other connections)
  my_connection:  # NOTE: You can use the same connection for several datasources and datasinks
    type: impala
    # NOTE: The options below are examples for accessing a kerberized cluster.
    #       You can specify other options here, and they will be used for creating
    #       the connection. For a full list, refer to the documentation of
    #       `impyla.dbapi.connect`: https://pydoc.net/impyla/0.14.0/impala.dbapi/
    # NOTE 2: For user and password parameters, you can provide the parameter with
    #         the suffix "_var". For example, if you configure `ldap_password_var: MY_PW`,
    #         ML Launchpad will get the value of the environment variable `MY_PW` and pass
    #         it to `impyla.dbapi.connect`s `ldap_password` parameter. This is to avoid
    #         having to add passwords to the configuration file.
    host: host.example.com
    port: 21050
    database: my_db
    kerberos_service_name: impala
    auth_mechanism: GSSAPI
    use_ssl: true
# ...
datasources:
  # ... (other datasources)
  my_datasource:
    type: dbms.my_connection
    query: SELECT * FROM my_table where id = :id  # fill `:params` by calling `get_dataframe` with a `dict`
    expires: 0     # generic parameter, see documentation on DataSources
    tags: [train]  # generic parameter, see documentation on DataSources and DataSinks
    options: {}    # (optional) any other kwargs to pass to `pd.read_sql`

Note: In order to access a kerberized cluster, you need the correct packages. We found the following packages to work reliably:

$ pip install pykerberos thrift_sasl impyla pandas
(for Windows, use winkerberos instead of pykerberos)
get_dataframe(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Get data as a pandas dataframe.

Example:

data_sources["my_datasource"].get_dataframe({"id": 387})
Parameters
  • params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)

  • chunksize (optional bool) – Currently not implemented

Returns

DataFrame object, possibly cached according to expires-config

get_raw(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Not implemented.

Parameters
  • params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)

  • chunksize (optional bool) – Currently not implemented

Raises

NotImplementedError

Returns

Nothing, always raises NotImplementedError

serves: List[str] = ['dbms.impala']
class examples.records_datasource.RecordsDbDataSource(identifier: str, datasource_config: Dict, dbms_config: Optional[Dict])[source]

DataSource for a bunch of relational database types: RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL.

EXPERIMENTAL

See serves for the available types.

Creates a long-living connection on initialization.

Configuration:

dbms:
  # ... (other connections)
  my_connection:  # NOTE: You can use the same connection for several datasources and datasinks
    type: oracle  # see attribute serves for available types
    host: host.example.com
    port: 1251  # optional
    user_var: MY_USER_ENV_VAR
    password_var: MY_PW_ENV_VAR  # optional
    service_name: servicename.example.com  # optional
    options: {}  # used as **kwargs when initializing the DB connection
# ...
datasources:
  # ... (other datasources)
  my_datasource:
    type: dbms.my_connection
    query: SELECT * FROM users.my_table where id = :id  # fill `:params` by calling `get_dataframe` with a `dict`
    expires: 0     # generic parameter, see documentation on DataSources
    tags: [train]  # generic parameter, see documentation on DataSources and DataSinks
get_dataframe(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Get data as a pandas dataframe.

Example:

data_sources["my_datasource"].get_dataframe({"id": 387})
Parameters
  • params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)

  • chunksize (optional bool) – Currently not implemented

Returns

DataFrame object, possibly cached according to expires-config

get_raw(params: Optional[Dict] = None, chunksize: Optional[int] = None)

Not implemented.

Parameters
  • params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)

  • chunksize (optional bool) – Currently not implemented

Raises

NotImplementedError

Returns

Nothing, always raises NotImplementedError

serves: List[str] = ['dbms.oracle', 'dbms.redshift', 'dbms.postgres', 'dbms.mysql', 'dbms.sqlite', 'dbms.ms_sql']