DataSources and DataSinks¶
DataSources
and
DataSinks
are what loosely couples
your model’s code to the data.
From your model’s code, instead of accessing
your data locations directly, you access your data via the
DataSources
and
DataSinks
that are provided by ML Launchpad.
To your code, all DataSources
and
DataSinks
behave the same and are used
the same way for the same data format (DataFrames
, raw files, etc.).
For example, to obtain a pandas DataFrame
,
use the DataSource
’s
get_dataframe()
method.
Your code does not need to know
whether your data was originally obtained from a database, file, or web service.
As DataSources
and
DataSinks
are very similar, we will
use the term DataSource
below meaning either.
Different subclasses of DataSources
provide you with different kinds of connections. In the following sections, you can find
lists of built-in and external DataSources
and DataSinks
.
Each subclass of DataSources
(e.g. FileDataSource
,
SqlDataSource
)
serves one or several types
(e.g. csv
, euro_csv
, dbms.sql
).
You specify the type
in your DataSource’s configuration.
The same type
can even be served by several different
DataSource
subclasses, in which case the
last-imported plugin.
Here is an example for a configured
FileDataSource
:
datasources:
my_datasource:
type: csv
path: ./iris_train.csv
expires: 0
cache_size: 32 # optional
options: {}
tags: train
Where the parts of this examples are:
datasources
(ordatasinks
; optional): Can contain as many child elements (configured DataSources or DataSinks) as you like.my_datasource
: The name by which you want to refer to a specific configured DataSource. Used to get data, e.g.:data_sources["my_datasource"].get_dataframe()
. This name is up to you to choose.type
(required in every DataSource): thetype
that a DataSource needs to serve in order to be chosen for you. In this case, when ML Launchpad looks up which DataSources serve thecsv
type, it findsFileDataSource
and will use it.path
(specific toFileDataSource
): The path of the file. Every DataSource has its own specific properties which are part of the DataSource’s documentation (see the next section for built-ins).expires
(required in every DataSource): This controls caching of the data. 0 means that it expires immediately, -1 that it never expires, and another number specifies the number of seconds after the cached data expires and should be gotten afresh from the source. Note that data is cached specifically for each unique combination ofparams
passed toget_dataframe()
, up to the maximum cache size.cache_size
(optional, default: 32, DataSources only): Maximum number of items to cache.tags
(required in every DataSource): a combination of one or several of the possible tagstrain
,test
andpredict
(use [brackets] around more than one tag). This determines the model function(s) the DataSource will be made available for.
For more complete examples, have a look at the Tutorial or at the
examples (download
).
Please note that DataSources
and DataSinks
will be initialized
for you by ML Launchpad depending on your configuration.
Your code will just get “some” DataSource, but won’t have to import, initialize, or
even know the name of the DataSource class that is used under the hood.
When needing to use e.g. several tables that reside in the same data base, it
is useful to not have to configure their connection details for every one
of the DataSources that correspond with those tables, but configure a
connection only once. For this, you specify a separate dbms:
section in your
configuration where you give each connection a name (e.g. my_connection
) which
you can refer to in your datasource
config by a type like e.g. dmbs.my_connection
.
See SqlDataSource
) below for an example.
Built-in DataSources and DataSinks¶
When you pip install mllaunchpad
, it comes with a number of built-in
DataSources and DataSinks that are ready to use without needing to specify
any plugins: []
in the Configuration.
Their documentation follows hereunder.
-
class
mllaunchpad.datasources.
FileDataSource
(identifier: str, datasource_config: Dict)[source] DataSource for fetching data from files.
See
serves
for the available types.Configuration example:
datasources: # ... (other datasources) my_datasource: type: euro_csv # `euro_csv` changes separators to ";" and decimals to "," w.r.t. `csv` path: /some/file.csv # Can be URL, uses `pandas.read_csv` internally expires: 0 # generic parameter, see documentation on DataSources tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when fetching the data using `pandas.read_csv` dtypes_path: ./some/file.dtypes # optional: location with the csv's column dtypes info my_raw_datasource: type: text_file # raw files can also be of type `binary_file` path: /some/file.txt # Can be URL expires: 0 # generic parameter, see documentation on DataSources tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when fetching the data using `fh.read`
When loading csv or euro_csv type formats, you can use the setting dtypes_path to specify a location with dtypes description for the csv (usually generated earlier by using
FileDataSink
’s dtypes_path setting). These dtypes will be enforced when reading the csv, which helps avoid problems when pandas.read_csv interprets data differently than you do. Use dtypes_path to enforce dtype parity between csv datasinks and datasources.Using the raw formats binary_file and text_file, you can read arbitrary data, as long as it can be represented as a bytes or a str object, respectively. text_file uses UTF-8 encoding. Please note that while possible, it is not recommended to persist DataFrame`s this way, because by adding format-specific code to your model, you’re giving up your code’s independence from the type of `DataSource/DataSink. Here’s an example for unpickling an arbitrary object:
# config fragment: datasources: # ... my_pickle_datasource: type: binary_file path: /some/file.pickle tags: [train] options: {} # code fragment: import pickle # ... # in predict/test/train code: my_pickle = data_sources["my_pickle_datasource"].get_raw() my_object = pickle.loads(my_pickle)
-
get_dataframe
(params: Dict = None, chunksize: Optional[int] = None) Get data as a pandas dataframe.
Example:
data_sources["my_datasource"].get_dataframe()
- Parameters
params (optional dict) – Currently not implemented
chunksize (optional int) – Return an iterator where chunksize is the number of rows to include in each chunk.
- Returns
DataFrame object, possibly cached according to config value of expires:
-
get_raw
(params: Dict = None, chunksize: Optional[int] = None) Get data as raw (unstructured) data.
Example:
data_sources["my_raw_datasource"].get_raw()
- Parameters
params (optional dict) – Currently not implemented
chunksize (optional bool) – Currently not implemented
- Returns
The file’s bytes (binary) or string (text) contents, possibly cached according to config value of expires:
- Return type
bytes or str
-
serves
: List[str] = ['csv', 'euro_csv', 'text_file', 'binary_file']
-
-
class
mllaunchpad.datasources.
FileDataSink
(identifier: str, datasink_config: Dict)[source] DataSink for putting data into files.
See
serves
for the available types.Configuration example:
datasinks: # ... (other datasinks) my_datasink: type: euro_csv # `euro_csv` changes separators to ";" and decimals to "," w.r.t. `csv` path: /some/file.csv # Can be URL, uses `df.to_csv` internally tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when fetching the data using `df.to_csv` dtypes_path: ./some/file.dtypes # optional: location for saving the csv's column dtypes info my_raw_datasink: type: text_file # raw files can also be of type `binary_file` path: /some/file.txt # Can be URL tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when writing the data using `fh.write`
When saving csv or euro_csv type formats, you can use the setting dtypes_path to specify a location where to save dtypes descriptions for the csv (that you can use later with
FileDataSource
’s dtypes_path setting). These dtypes will be enforced when reading the csv, which helps avoid problems when pandas.read_csv interprets data differently than you do. Use dtypes_path to enforce dtype parity between csv datasinks and datasources.Using the raw formats binary_file and text_file, you can persist arbitrary data, as long as it can be represented as a bytes or a str object, respectively. text_file uses UTF-8 encoding. Please note that while possible, it is not recommended to persist DataFrame`s this way, because by adding format-specific code to your model, you’re giving up your code’s independence from the type of `DataSource/DataSink. Here’s an example for pickling an arbitrary object:
# config fragment: datasinks: # ... my_pickle_datasink: type: binary_file path: /some/file.pickle tags: [train] options: {} # code fragment: import pickle # ... # in predict/test/train code: my_pickle = pickle.dumps(my_object) data_sinks["my_pickle_datasink"].put_raw(my_pickle)
-
put_dataframe
(dataframe: pandas.core.frame.DataFrame, params: Dict = None, chunksize: Optional[int] = None) → None[source] Write a pandas dataframe to file and optionally the dtypes if included in the configuration. The default is not to save the dataframe’s row index. Configure the DataSink’s options dict to pass keyword arguments to my_df.to_csv. If the directory path leading to the file does not exist, it will be created.
Example:
data_sinks["my_datasink"].put_dataframe(my_df)
- Parameters
dataframe (pandas DataFrame) – The pandas dataframe to save
params (optional dict) – Currently not implemented
chunksize (optional bool) – Currently not implemented
-
put_raw
(raw_data: Union[str, bytes], params: Dict = None, chunksize: Optional[int] = None) → None[source] Write raw (unstructured) data to file. If the directory path leading to the file does not exist, it will be created.
Example:
data_sinks["my_raw_datasink"].put_raw(my_data)
- Parameters
raw_data (bytes or str) – The data to save (bytes for binary, string for text file)
params (optional dict) – Currently not implemented
chunksize (optional bool) – Currently not implemented
-
serves
: List[str] = ['csv', 'euro_csv', 'text_file', 'binary_file']
-
-
class
mllaunchpad.datasources.
SqlDataSource
(identifier: str, datasource_config: Dict, dbms_config: Dict)[source] DataSource for RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL (ODBC), and their dialects.
Uses SQLAlchemy under the hood, and as such, manages a connection pool automatically.
Please configure the
dbms:<name>:connection_string:
, which is a standard RFC-1738 URL with the syntaxdialect[+driver]://[user:password@][host]/[dbname][?key=value..]
. The exact URL is specific for the database you want to connect to. Find examples for all supported database dialects here.Depending on the dialect you want to use, you might need to install additional drivers and packages. For example, for connecting to a kerberized Impala instance via ODBC, you need to:
Install Impala ODBC drivers for your OS,
pip install winkerberos thrift_sasl pyodbc sqlalchemy
# use pykerberos for non-windows systems
If you are tasked with connecting to a particular database system, and don’t know where to start, researching on how to connect to it from SQLAlchemy will serve as a good starting point.
Other configuration in the
dbms:
section (besidesconnection_string:
) is optional, but can be provided if deemed necessary:Any
dbms:
-level settings other thantype:
,connection_string:
andoptions:
will be passed as additional keyword arguments to SQLAlchemy’s create_engine.Any key-value pairs inside
dbms:<name>:options: {}
will be passed to SQLAlchemy as connect_args. If you append_var
to the end of an argument key, its value will be interpreted as an environment variable name which ML Launchpad will attempt to get a value from. This can be useful for information like passwords which you do not want to store in the configuration file.
Configuration example:
dbms: # ... (other connections) # Example for connecting to a kerberized Impala instance via ODBC: my_connection: # NOTE: You can use the same connection for several datasources and datasinks type: sql connection_string: mssql+pyodbc:///default?&driver=Cloudera+ODBC+Driver+for+Impala&host=servername.somedomain.com&port=21050&authmech=1&krbservicename=impala&ssl=1&usesasl=1&ignoretransactions=1&usesystemtruststore=1 # pyodbc alternative: mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BCloudera+ODBC+Driver+for+Impala%7D%3BHOST%3Dservername.somedomain.com%3BPORT%3D21050%3BAUTHMECH%3D1%3BKRBSERVICENAME%3Dimpala%3BSSL%3D1%3BUSESASL%3D1%3BIGNORETRANSACTIONS%3D1%3BUSESYSTEMTRUSTSTORE%3D1 echo: True # example for an additional SQLAlchemy keyword argument (logs the SQL) -- these are optional options: {} # used as `connect_args` when creating the SQLAlchemy engine # ... datasources: # ... (other datasources) my_datasource: type: dbms.my_connection query: SELECT * FROM somewhere.my_table WHERE id = :id # fill `:params` by calling `get_dataframe` with a `dict` expires: 0 # generic parameter, see documentation on DataSources tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when fetching the query using `pandas.read_sql`
-
get_dataframe
(params: Dict = None, chunksize: Optional[int] = None) Get the data as pandas dataframe.
Null values are replaced by
numpy.nan
.Example:
my_df = data_sources["my_datasource"].get_dataframe({"id": 387})
- Parameters
params (optional dict) – Query parameters to fill in query (e.g. replace query’s :id parameter with value 387)
chunksize (optional int) – Return an iterator where chunksize is the number of rows to include in each chunk.
- Returns
DataFrame object, possibly cached according to config value of expires:
-
get_raw
(params: Dict = None, chunksize: Optional[int] = None) Not implemented.
- Raises
NotImplementedError – Raw/blob format currently not supported.
-
serves
: List[str] = ['dbms.sql']
-
class
mllaunchpad.datasources.
SqlDataSink
(identifier: str, datasource_config: Dict, dbms_config: Dict)[source] DataSink for RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL (ODBC), and their dialects.
Uses SQLAlchemy under the hood, and as such, manages a connection pool automatically.
Please configure the
dbms:<name>:connection_string:
, which is a standard RFC-1738 URL with the syntaxdialect[+driver]://[user:password@][host]/[dbname][?key=value..]
. The exact URL is specific for the database you want to connect to. Find examples for all supported database dialects here.Depending on the dialect you want to use, you might need to install additional drivers and packages. For example, for connecting to a kerberized Impala instance via ODBC, you need to:
Install Impala ODBC drivers for your OS,
pip install winkerberos thrift_sasl pyodbc sqlalchemy
# use pykerberos for non-windows systems
If you are tasked with connecting to a particular database system, and don’t know where to start, researching on how to connect to it from SQLAlchemy will serve as a good starting point.
Other configuration in the
dbms:
section (besidesconnection_string:
) is optional, but can be provided if deemed necessary:Any
dbms:
-level settings other thantype:
,connection_string:
andoptions:
will be passed as additional keyword arguments to SQLAlchemy’s create_engine.Any key-value pairs inside
dbms:<name>:options: {}
will be passed to SQLAlchemy as connect_args. If you append_var
to the end of an argument key, its value will be interpreted as an environment variable name which ML Launchpad will attempt to get a value from. This can be useful for information like passwords which you do not want to store in the configuration file.
Configuration example:
dbms: # ... (other connections) # Example for connecting to a kerberized Impala instance via ODBC: my_connection: # NOTE: You can use the same connection for several datasources and datasinks type: sql connection_string: mssql+pyodbc:///default?&driver=Cloudera+ODBC+Driver+for+Impala&host=servername.somedomain.com&port=21050&authmech=1&krbservicename=impala&ssl=1&usesasl=1&ignoretransactions=1&usesystemtruststore=1 # pyodbc alternative: mssql+pyodbc:///?odbc_connect=DRIVER%3D%7BCloudera+ODBC+Driver+for+Impala%7D%3BHOST%3Dservername.somedomain.com%3BPORT%3D21050%3BAUTHMECH%3D1%3BKRBSERVICENAME%3Dimpala%3BSSL%3D1%3BUSESASL%3D1%3BIGNORETRANSACTIONS%3D1%3BUSESYSTEMTRUSTSTORE%3D1 echo: True # example for an additional SQLAlchemy keyword argument (logs the SQL) -- these are optional options: {} # used as `connect_args` when creating the SQLAlchemy engine # ... datasinks: # ... (other datasinks) my_datasink: type: dbms.my_connection table: somewhere.my_table tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when storing the table using `my_df.to_sql`
-
put_dataframe
(dataframe: pandas.core.frame.DataFrame, params: Dict = None, chunksize: Optional[int] = None) → None[source] Store the pandas dataframe as a table. The default is not to store the dataframe’s row index. Configure the DataSink’s options dict to pass keyword arguments to df.to_sql.
Example:
data_sinks["my_datasink"].put_dataframe(my_df)
- Parameters
dataframe (pandas DataFrame) – The pandas dataframe to store
params (optional dict) – Currently not implemented
chunksize (optional bool) – Currently not implemented
-
put_raw
(raw_data, params: Dict = None, chunksize: Optional[int] = None) → None[source] Not implemented.
- Raises
NotImplementedError – Raw/blob format currently not supported.
-
serves
: List[str] = ['dbms.sql']
-
class
mllaunchpad.datasources.
OracleDataSource
(identifier: str, datasource_config: Dict, dbms_config: Dict)[source] DataSource for Oracle database connections.
Creates a long-living connection on initialization.
Configuration example:
dbms: # ... (other connections) my_connection: # NOTE: You can use the same connection for several datasources and datasinks type: oracle host: host.example.com port: 1251 user_var: MY_USER_ENV_VAR password_var: MY_PW_ENV_VAR # optional service_name: servicename.example.com options: {} # used as **kwargs when initializing the DB connection # ... datasources: # ... (other datasources) my_datasource: type: dbms.my_connection query: SELECT * FROM somewhere.my_table where id = :id # fill `:params` by calling `get_dataframe` with a `dict` expires: 0 # generic parameter, see documentation on DataSources tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when fetching the query using `pandas.read_sql`
-
get_dataframe
(params: Dict = None, chunksize: Optional[int] = None) Get the data as pandas dataframe.
Null values are replaced by
numpy.nan
.Example:
data_sources["my_datasource"].get_dataframe({"id": 387})
- Parameters
params (optional dict) – Query parameters to fill in query (e.g. replace query’s :id parameter with value 387)
chunksize (optional int) – Return an iterator where chunksize is the number of rows to include in each chunk.
- Returns
DataFrame object, possibly cached according to config value of expires:
-
get_raw
(params: Dict = None, chunksize: Optional[int] = None) Not implemented.
- Raises
NotImplementedError – Raw/blob format currently not supported.
-
serves
: List[str] = ['dbms.oracle']
-
-
class
mllaunchpad.datasources.
OracleDataSink
(identifier: str, datasink_config: Dict, dbms_config: Dict)[source] DataSink for Oracle database connections.
Creates a long-living connection on initialization.
Configuration example:
dbms: # ... (other connections) my_connection: # NOTE: You can use the same connection for several datasources and datasinks type: oracle host: host.example.com port: 1251 user_var: MY_USER_ENV_VAR password_var: MY_PW_ENV_VAR # optional service_name: servicename.example.com options: {} # used as **kwargs when initializing the DB connection # ... datasinks: # ... (other datasinks) my_datasink: type: dbms.my_connection table: somewhere.my_table tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # used as **kwargs when storing the table using `my_df.to_sql`
-
put_dataframe
(dataframe: pandas.core.frame.DataFrame, params: Dict = None, chunksize: Optional[int] = None) → None[source] Store the pandas dataframe as a table. The default is not to store the dataframe’s row index. Configure the DataSink’s options dict to pass keyword arguments to df.to_sql.
Example:
data_sinks["my_datasink"].put_dataframe(my_df)
- Parameters
dataframe (pandas DataFrame) – The pandas dataframe to store
params (optional dict) – Currently not implemented
chunksize (optional bool) – Currently not implemented
-
put_raw
(raw_data, params: Dict = None, chunksize: Optional[int] = None) → None[source] Not implemented.
- Raises
NotImplementedError – Raw/blob format currently not supported.
-
serves
: List[str] = ['dbms.oracle']
-
External DataSources and DataSinks¶
The datasources here are not part of core ML Launchpad.
To be able to use them:
install them if they support it, or copy them into your project’s source code directory;
add their import statement to the
plugins:
section of your configuration, e.g.
plugins: # Add this line if it's not already in your config
- some_module.my_datasource # for a file in the 'some_module' directory called 'my_datasource.py'
Their documentation follows hereunder.
-
class
examples.impala_datasource.
ImpalaDataSource
(identifier, datasink_config, dbms_config)[source]¶ DataSource for fetching data from Impala queries.
EXPERIMENTAL
Creates a short-lived connection on every non-cached call to get_dataframe.
Configuration:
dbms: # ... (other connections) my_connection: # NOTE: You can use the same connection for several datasources and datasinks type: impala # NOTE: The options below are examples for accessing a kerberized cluster. # You can specify other options here, and they will be used for creating # the connection. For a full list, refer to the documentation of # `impyla.dbapi.connect`: https://pydoc.net/impyla/0.14.0/impala.dbapi/ # NOTE 2: For user and password parameters, you can provide the parameter with # the suffix "_var". For example, if you configure `ldap_password_var: MY_PW`, # ML Launchpad will get the value of the environment variable `MY_PW` and pass # it to `impyla.dbapi.connect`s `ldap_password` parameter. This is to avoid # having to add passwords to the configuration file. host: host.example.com port: 21050 database: my_db kerberos_service_name: impala auth_mechanism: GSSAPI use_ssl: true # ... datasources: # ... (other datasources) my_datasource: type: dbms.my_connection query: SELECT * FROM my_table where id = :id # fill `:params` by calling `get_dataframe` with a `dict` expires: 0 # generic parameter, see documentation on DataSources tags: [train] # generic parameter, see documentation on DataSources and DataSinks options: {} # (optional) any other kwargs to pass to `pd.read_sql`
Note: In order to access a kerberized cluster, you need the correct packages. We found the following packages to work reliably:
$ pip install pykerberos thrift_sasl impyla pandas (for Windows, use winkerberos instead of pykerberos)
-
get_dataframe
(params: Dict = None, chunksize: Optional[int] = None)¶ Get data as a pandas dataframe.
Example:
data_sources["my_datasource"].get_dataframe({"id": 387})
- Parameters
params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)
chunksize (optional bool) – Currently not implemented
- Returns
DataFrame object, possibly cached according to expires-config
-
get_raw
(params: Dict = None, chunksize: Optional[int] = None)¶ Not implemented.
- Parameters
params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)
chunksize (optional bool) – Currently not implemented
- Raises
NotImplementedError –
- Returns
Nothing, always raises NotImplementedError
-
serves
: List[str] = ['dbms.impala']¶
-
-
class
examples.records_datasource.
RecordsDbDataSource
(identifier: str, datasource_config: Dict, dbms_config: Optional[Dict])[source]¶ DataSource for a bunch of relational database types: RedShift, Postgres, MySQL, SQLite, Oracle, Microsoft SQL.
EXPERIMENTAL
See
serves
for the available types.Creates a long-living connection on initialization.
Configuration:
dbms: # ... (other connections) my_connection: # NOTE: You can use the same connection for several datasources and datasinks type: oracle # see attribute serves for available types host: host.example.com port: 1251 # optional user_var: MY_USER_ENV_VAR password_var: MY_PW_ENV_VAR # optional service_name: servicename.example.com # optional options: {} # used as **kwargs when initializing the DB connection # ... datasources: # ... (other datasources) my_datasource: type: dbms.my_connection query: SELECT * FROM users.my_table where id = :id # fill `:params` by calling `get_dataframe` with a `dict` expires: 0 # generic parameter, see documentation on DataSources tags: [train] # generic parameter, see documentation on DataSources and DataSinks
-
get_dataframe
(params: Dict = None, chunksize: Optional[int] = None)¶ Get data as a pandas dataframe.
Example:
data_sources["my_datasource"].get_dataframe({"id": 387})
- Parameters
params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)
chunksize (optional bool) – Currently not implemented
- Returns
DataFrame object, possibly cached according to expires-config
-
get_raw
(params: Dict = None, chunksize: Optional[int] = None)¶ Not implemented.
- Parameters
params (optional dict) – Query parameters to fill in query (e.g. :id with value 387)
chunksize (optional bool) – Currently not implemented
- Raises
NotImplementedError –
- Returns
Nothing, always raises NotImplementedError
-
serves
: List[str] = ['dbms.oracle', 'dbms.redshift', 'dbms.postgres', 'dbms.mysql', 'dbms.sqlite', 'dbms.ms_sql']¶
-