Source code for examples.impala_datasource
import logging
import os
import pandas as pd
from mllaunchpad.resource import DataSource
logger = logging.getLogger(__name__)
try:
from impala.dbapi import connect
except ModuleNotFoundError:
logger.warning("Please install the impyla package to be able to use ImpalaDataSource.")
[docs]class ImpalaDataSource(DataSource):
"""DataSource for fetching data from Impala queries.
EXPERIMENTAL
Creates a short-lived connection on every non-cached call to `get_dataframe`.
Configuration::
dbms:
# ... (other connections)
my_connection: # NOTE: You can use the same connection for several datasources and datasinks
type: impala
# NOTE: The options below are examples for accessing a kerberized cluster.
# You can specify other options here, and they will be used for creating
# the connection. For a full list, refer to the documentation of
# `impyla.dbapi.connect`: https://pydoc.net/impyla/0.14.0/impala.dbapi/
# NOTE 2: For user and password parameters, you can provide the parameter with
# the suffix "_var". For example, if you configure `ldap_password_var: MY_PW`,
# ML Launchpad will get the value of the environment variable `MY_PW` and pass
# it to `impyla.dbapi.connect`s `ldap_password` parameter. This is to avoid
# having to add passwords to the configuration file.
host: host.example.com
port: 21050
database: my_db
kerberos_service_name: impala
auth_mechanism: GSSAPI
use_ssl: true
# ...
datasources:
# ... (other datasources)
my_datasource:
type: dbms.my_connection
query: SELECT * FROM my_table where id = :id # fill `:params` by calling `get_dataframe` with a `dict`
expires: 0 # generic parameter, see documentation on DataSources
tags: [train] # generic parameter, see documentation on DataSources and DataSinks
options: {} # (optional) any other kwargs to pass to `pd.read_sql`
Note: In order to access a kerberized cluster, you need the correct packages.
We found the following packages to work reliably::
$ pip install pykerberos thrift_sasl impyla pandas
(for Windows, use winkerberos instead of pykerberos)
"""
serves = ['dbms.impala']
def __init__(self, identifier, datasink_config, dbms_config):
super().__init__(identifier, datasink_config)
# Fill "_var"-suffixed configuration items from environment variables
new_dbms_config = {}
key: str
for key, value in dbms_config.items():
if key.endswith("_var"):
new_value = os.environ.get(value)
if new_value is not None:
new_key = key[:-4]
new_dbms_config[new_key] = new_value
logger.debug("Replaced Impala connection parameter '%s' specifying environment"
"variable '%s' with parameter '%s'", key, value, new_key)
else:
logger.warning("Environment variable '%s' not set (from config key '%s')", value, key)
new_dbms_config[key] = value
else:
new_dbms_config[key] = value
self.dbms_config = new_dbms_config
def get_dataframe(self, params=None, chunksize=None):
"""Get data as a pandas dataframe.
Example::
data_sources["my_datasource"].get_dataframe({"id": 387})
:param params: Query parameters to fill in query (e.g. `:id` with value 387)
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
:return: DataFrame object, possibly cached according to expires-config
"""
if chunksize:
raise NotImplementedError('Buffered reading not supported yet')
# Open connection
logger.info(
"Establishing Impala database connection for datasource {}...".format(
self.id
)
)
conn_args = {k: v for k, v in self.dbms_config.items() if k not in ["type"]}
with connect(**conn_args) as conn:
# Fetch query
query = self.config["query"]
params = params or {}
kw_options = self.config.get("options", {})
logger.debug(
"Fetching query {} with params {} and options {}...".format(
query, params, kw_options
)
)
df = pd.read_sql(
query, con=conn, params=params, **kw_options
)
return df
def get_raw(self, params=None, chunksize=None):
"""Not implemented.
:param params: Query parameters to fill in query (e.g. `:id` with value 387)
:type params: optional dict
:param chunksize: Currently not implemented
:type chunksize: optional bool
:raises NotImplementedError:
:return: Nothing, always raises NotImplementedError
"""
raise NotImplementedError(
"ImpalaDataSource currently does not not support raw format/blobs. "
'Use method "get_dataframe" for dataframes'
)