Skip to main content

Connect to data icon How to configure a Spark Datasource

SetupArrowConnect to DataArrowCreate ExpectationsArrowValidate Data

This guide will walk you through the process of configuring a Spark Datasource from scratch, verifying that your configuration is valid, and adding it to your Data Context. By the end of this guide you will have a Spark Datasource which you can use in future workflows for creating Expectations and Validating data.

Steps

1. Import necessary modules and initialize your Data Context

from ruamel import yaml

import great_expectations as gx

data_context: gx.DataContext = gx.get_context()

The great_expectations module will give you access to your Data Context, which is the entry point for working with a Great Expectations project.

The yaml module from ruamel will be used in validating your Datasource's configuration. Great Expectations will use a Python dictionary representation of your Datasource configuration when you add your Datasource to your Data Context. However, Great Expectations saves configurations as yaml files, so when you validate your configuration you will need to convert it from a Python dictionary to a yaml string, first.

Your Data Context that is initialized by get_data_context() will be the Data Context defined in your current working directory. It will provide you with convenience methods that we will use to validate your Datasource configuration and add your Datasource to your Great Expectations project once you have configured it.

2. Create a new Datasource configuration.

A new Datasource can be configured in Python as a dictionary with a specific set of keys. We will build our Datasource configuration from scratch in this guide, although you can just as easily modify an existing one.

To start, create an empty dictionary. You will be populating it with keys as you go forward.

At this point, the configuration for your Datasource is merely:

datasource_config: dict = {}

However, from this humble beginning you will be able to build a full Datasource configuration.

The keys needed for your Datasource configuration

At the top level, your Datasource's configuration will need the following keys:

  • name: The name of the Datasource, which will be used to reference the datasource in Batch Requests.
  • class_name: The name of the Python class instantiated by the Datasource. Typically, this will be the Datasource class.
  • module_name: the name of the module that contains the Class definition indicated by class_name.
  • execution_engine: a dictionary containing the class_name and module_name of the Execution Engine instantiated by the Datasource.
  • data_connectors: the configurations for any Data Connectors and their associated Data Assets that you want to have available when utilizing the Datasource.

In the following steps we will add those keys and their corresponding values to your currently empty Datasource configuration dictionary.

3. Name your Datasource

The first key that you will need to define for your new Datasource is its name. You will use this to reference the Datasource in future workflows. It can be anything you want it to be, but ideally you will name it something relevant to the data that it interacts with.

For the purposes of this example, we will name this Datasource:

"name": "my_datasource_name",  # Preferably name it something relevant

You should, however, name your Datsource something more relevant to your data.

At this point, your configuration should now look like:

datasource_config: dict = {
"name": "my_datasource_name", # Preferably name it something relevant
}

4. Specify the Datasource class and module

The class_name and module_name for your Datasource will almost always indicate the Datasource class found at great_expectations.datasource. You may replace this with a specialized subclass, or a custom class, but for almost all regular purposes these two default values will suffice. For the purposes of this guide, add those two values to their corresponding keys.

"class_name": "Datasource",
"module_name": "great_expectations.datasource"

Your full configuration should now look like:

datasource_config: dict = {
"name": "my_datasource_name", # Preferably name it something relevant
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
}

5. Add the Spark Execution Engine to your Datasource configuration

Your Execution Engine is where you will specify that you want this Datasource to use Spark in the backend. As with the Datasource top level configuration, you will need to provide the class_name and module_name that indicate the class definition and containing module for the Execution Engine that you will use.

For the purposes of this guide, these will consist of the SparkDFExecutionEngine found at great_expectations.execution_engine. The execution_engine key and its corresponding value will therefore look like this:

"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
}

After adding the above snippet to your Datasource configuration, your full configuration dictionary should now look like:

datasource_config: dict = {
"name": "my_datasource_name",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
}

6. Add a dictionary as the value of the data_connectors key

The data_connectors key should have a dictionary as its value. Each key/value pair in this dictionary will correspond to a Data Connector's name and configuration, respectively.

The keys in the data_connectors dictionary will be the names of the Data Connectors, which you will use to indicate which Data Connector to use in future workflows. As with value of your Datasource's name key, you can use any value you want for a Data Connector's name. Ideally, you will use something relevant to the data that each particular Data Connector will provide; the only significant difference is that for Data Connectors the name of the Data Connector is its key in the data_connectors dictionary.

The values for each of your data_connectors keys will be the Data Connector configurations that correspond to each Data Connector's name. You may define multiple Data Connectors in the data_connectors dictionary by including multiple key/value pairs.

For now, start by adding an empty dictionary as the value of the data_connectors key. We will begin populating it with Data Connector configurations in the next step.

Your current configuration should look like:

datasource_config: dict = {
"name": "my_datasource_name",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {},
}

7. Configure your individual Data Connectors

For each Data Connector configuration, you will need to specify which type of Data Connector you will be using. When using Spark to work with data in a file system, the most likely ones will be the InferredAssetFilesystemDataConnector, the ConfiguredAssetFilesystemDataConnector, and the RuntimeDataConnector.

If you are working with Spark but not working with a file system, please see our cloud specific guides for more information.

Reminder

If you are uncertain which Data Connector best suits your needs, please refer to our guide on how to choose which Data Connector to use.

Data Connector example configurations:

tip

The InferredDataConnector is ideal for:

  • quickly setting up a Datasource and getting access to data
  • diving straight in to working with Great Expectations
  • initial data discovery and introspection

However, the InferredDataConnector allows less control over the definitions of your Data Assets than the ConfiguredAssetDataConnector provides.

If you are at the point of building a repeatable workflow, we encourage using the ConfiguredAssetDataConnector instead.

Remember, the key that you provide for each Data Connector configuration dictionary will be used as the name of the Data Connector. For this example, we will use the name name_of_my_inferred_data_connector but you may have it be anything you like.

At this point, your configuration should look like:

datasource_config: dict = {
"name": "my_datasource_name",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {"name_of_my_inferred_data_connector": {}},
}

When defining an InferredAssetFilesystemDataConnector you will need to provide values for four keys in the Data Connector's configuration dictionary (the currently empty dictionary that corresponds to "name_of_my_inferred_data_connector" in the example above). These key/value pairs consist of:

  • class_name: The name of the Class that will be instantiated for this DataConnector.
  • base_directory: The string representation of the directory that contains your filesystem data.
  • default_regex: A dictionary that describes how the data should be grouped into Batches.
  • batch_spec_passthrough: A dictionary of values that are passed to the Execution Engine's backend.

Additionally, you may optionally choose to define:

  • glob_directive: A regular expression that can be used to access source data files contained in subfolders of your base_directory. If this is not defined, the default value of * will cause you Data Connector to only look at files in the base_directory itself.

For this example, you will be using the InferredAssetFilesystemDataConnector as your class_name. This is a subclass of the InferredAssetDataConnector that is specialized to support filesystem Execution Engines, such as the SparkDFExecutionEngine. This key/value entry will therefore look like:

"class_name": "InferredAssetFilesystemDataConnector",
tip

Because we are using one of Great Expectation's builtin Data Connectors, an entry for module_name along with a default value will be provided when this Data Connector is initialized.

However, if you want to use a custom Data Connector, you will need to explicitly add a module_name key alongside the class_name key.

The value for module_name would then be set as the import path for the module containing your custom Data Connector, in the same fashion as you would provide class_name and module_name for a custom Datasource or Execution Engine.

For the base directory, you will want to put the relative path of your data from the folder that contains your Data Context. In this example we will use the same path that was used in the Getting Started Tutorial, Step 2: Connect to Data. Since we are manually entering this value rather than letting the CLI generate it, the key/value pair will look like:

"base_directory": "../data",

With these values added, along with blank dictionary for default_regex (we will define it in the next step) and batch_spec_passthrough, your full configuration should now look like:

datasource_config: dict = {
"name": "my_datasource_name",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {
"name_of_my_inferred_data_connector": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "../data",
"default_regex": {},
"batch_spec_passthrough": {},
}
},
}
Optional parameter: glob_directive

The glob_directive parameter is provided to give the DataConnector information about the directory structure to expect when identifying source data files to check against each Data Asset's default_regex. If you do not specify a value for glob_directive a default value of "*" will be used. This will cause your Data Asset to check all files in the folder specified by base_directory to determine which should be returned as Batches for the Data Asset, but will ignore any files in subdirectories.

Overriding the glob_directive by providing your own value will allow your Data Connector to traverse subdirectories or otherwise alter which source data files are compared against your Data Connector's default_regex.

For example, assume your source data is in files contained by subdirectories of your base_folder, like so:

  • 2019/yellow_taxidata_2019_01.csv
  • 2020/yellow_taxidata_2020_01.csv
  • 2021/yellow_taxidata_2021_01.csv
  • 2022/yellow_taxidata_2022_01.csv

To include all of these files, you would need to tell the Data connector to look for files that are nested one level deeper than the base_directory itself.

You would do this by setting the glob_directive key in your Data Connector config to a value of "*/*". This value will cause the Data Connector to look for regex matches against the file names for all files found in any subfolder of your base_directory. Such an entry would look like:

"glob_directive": "*.*"

The glob_directive parameter works off of regex. You can also use it to limit the files that will be compared against the Data Connector's default_regex for a match. For example, to only permit .csv files to be checked for a match, you could specify the glob_directive as "*.csv". To only check for matches against the .csv files in subdirectories, you would use the value */*.csv, and so forth.

In this guide's examples, all of our data is assumed to be in the base_directory folder. Therefore, you will not need to add an entry for glob_directive to your configuration. However, if you were to include the example glob_directive from above, your full configuration would currently look like:

datasource_config: dict = {
"name": "my_datasource_name",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {
"name_of_my_inferred_data_connector": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "../data",
"glob_directive": "*/*",
"default_regex": {},
"batch_spec_passthrough": {},
}
},
}

8. Configure the values for batch_spec_passthrough

The parameter batch_spec_passthrough is used to access some native capabilities of your Execution Engine. If you do not specify it, your Execution Engine will attempt to determine the values based off of file extensions and defaults. If you do define it, it will contain two keys: reader_method and reader_options. These will correspond to a string and a dictionary, respectively.

"batch_spec_passthrough": {
"reader_method": "",
"reader_options": {},

Configuring your reader_method:

The reader_method is used to specify which of Spark's spark.read.* methods will be used to read your data. For our example, we are using .csv files as our source data, so we will specify the csv method of spark.reader as our reader_method, like so:

"reader_method": "csv",

Configuring your reader_options:

Start by adding a blank dictionary as the value of the reader_options parameter. This dictionary will hold two key/value pairs: header and inferSchema.

"reader_options": {
"header": "",
"inferSchema": "",
},

The first key is header, and the value should be either True or False. This will indicate to the Data Connector whether or not the first row of each source data file is a header row. For our example, we will set this to True.

"header": True,

The second key to include is inferSchema. Again, the value should be either True or False. This will indicate to the Data Connector whether or not the Execution Engine should attempt to infer the data type contained by each column in the source data files. Again, we will set this to True for the purpose of this guide's example.

"inferSchema": True,
caution
  • inferSchema will read datetime columns in as text columns.

At this point, your batch_spec_passthrough configuration should look like:

"batch_spec_passthrough": {
"reader_method": "csv",
"reader_options": {
"header": True,
"inferSchema": True,
},
},

And your full configuration will look like:

datasource_config: dict = {
"name": "my_datasource_name", # Preferably name it something relevant
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {
"name_of_my_inferred_data_connector": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "../data",
"default_regex": {},
"batch_spec_passthrough": {
"reader_method": "csv",
"reader_options": {
"header": True,
"inferSchema": True,
},
},
}
},
}

9. Configure your Data Connector's Data Assets

In an Inferred Asset Data Connector for filesystem data, a regular expression is used to group the files into Batches for a Data Asset. This is done with the value we will define for the Data Connector's default_regex key. The value for this key will consist of a dictionary that contains two values:

  • pattern: This is the regex pattern that will define your Data Asset's potential Batch or Batches.
  • group_names: This is a list of names that correspond to the groups you defined in pattern's regular expression.

The pattern in default_regex will be matched against the files in your base_directory, and everything that matches against the first group in your regex will become a Batch in a Data Asset that possesses the name of the matching text. Any files that have a matching string for the first group will become Batches in the same Data Asset.

This means that when configuring your Data Connector's regular expression, you have the option to implement it so that the Data Connector is only capable of returning a single Batch per Data Asset, or so that it is capable of returning multiple Batches grouped into individual Data Assets. Each type of configuration is useful in certain cases, so we will provide examples of both.

tip

If you are uncertain as to which type of configuration is best for your use case, please refer to our guide on how to choose between working with a single or multiple Batches of data.

Because of the simple regex matching that groups files into Batches for a given Data Asset, it is actually quite straight forward to create a Data Connector which has Data Assets that are only capable of providing a single Batch. All you need to do is define a regular expression that consists of a single group which corresponds to a unique portion of your data files' names that is unique for each file.

The simplest way to do this is to define a group that consists of the entire file name.

For this example, lets assume we have the following files in our data directory:

  • yellow_tripdata_sample_2020-01.csv
  • yellow_tripdata_sample_2020-02.csv
  • yellow_tripdata_sample_2020-03.csv

In this case you could define the pattern key as follows:

"pattern": "(.*)\\.csv",

This regex will match the full name of any file that has the .csv extension, and will put everything prior to .csv extension into a group.

Since each .csv file will necessarily have a unique name preceeding its extension, the content that matches this pattern will be unique for each file. This will ensure that only one file is included as a Batch for each Data Asset.

To correspond to the single group that was defined in your regex, you will define a single entry in the list for the group_names key. Since the first group in an Inferred Asset Data Connector is used to generate names for the inferred Data Assets, you should name that group as follows:

"group_names": ["data_asset_name"],

Looking back at our sample files, this regex will result in the InferredAssetFilesystemDataConnector providing three Data Assets, which can be accessed by the portion of the file that matches the first group in our regex. In future workflows you will be able to refer to one of these Data Assets in a Batch Request py providing one of the following data_asset_names:

  • yellow_tripdata_sample_2020-01
  • yellow_tripdata_sample_2020-02
  • yellow_tripdata_sample_2020-03
note

Since we did not include .csv in the first group of the regex we defined, the .csv portion of the filename will be dropped from the value that is recognized as a valid data_asset_name.

With all of these values put together into a single dictionary, your Data Connector configuration will look like this:

"name_of_my_inferred_data_connector": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "../data",
"default_regex": {
"pattern": "(.*)\\.csv",
"group_names": ["data_asset_name"],
},
}

And the full configuration for your Datasource should look like:

datasource_config: dict = {
"name": "my_datasource_name", # Preferably name it something relevant
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {
"name_of_my_inferred_data_connector": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "../data",
"default_regex": {
"pattern": "(.*)\\.csv",
"group_names": ["data_asset_name"],
},
"batch_spec_passthrough": {
"reader_method": "csv",
"reader_options": {
"header": True,
"inferSchema": True,
},
},
}
},
}

10. Test your configuration with .test_yaml_config(...)

Now that you have a full Datasource configuration, you can confirm that it is valid by testing it with the .test_yaml_config(...) method. To do this, execute the Python code:

data_context.test_yaml_config(yaml.dump(datasource_config))

When executed, test_yaml_config will instantiate the component described by the yaml configuration that is passed in and then run a self check procedure to verify that the component works as expected.

For a Datasource, this includes:

  • confirming that the connection works
  • gathering a list of available Data Assets
  • verifying that at least one Batch can be fetched from the Datasource

For more information on the .test_yaml_config(...) method, please see our guide on how to configure DataContext components using test_yaml_config.

11. (Optional) Add more Data Connectors to your configuration

The data_connectors dictionary in your datasource_config can contain multiple entries. If you want to add additional Data Connectors, just go through the process starting at step 7 again.

12. Add your new Datasource to your Data Context

Now that you have verified that you have a valid configuration you can add your new Datasource to your Data Context with the command:

data_context.add_datasource(**datasource_config)
caution

If the value of datasource_config["name"] corresponds to a Datasource that is already defined in your Data Context, then using the above command will overwrite the existing Datasource.

tip

If you want to ensure that you only add a Datasource when it won't overwrite an existing one, you can use the following code instead:

# add_datasource only if it doesn't already exist in your Data Context
try:
data_context.get_datasource(datasource_config["name"])
except ValueError:
data_context.add_datasource(**datasource_config)
else:
print(
f"The datasource {datasource_config['name']} already exists in your Data Context!"
)

Next Steps

Congratulations! You have fully configured a Datasource and verified that it can be used in future workflows to provide a Batch or Batches of data.

tip

For more information on using Batch Requests to retrieve data, please see our guide on how to get one or more Batches of data from a configured Datasource.

You can now move forward and create Expectations for your Datasource.