Skip to main content

How to connect to data on a filesystem using Spark

This guide will help you connect to your data stored on a filesystem using Spark. This will allow you to ValidateThe act of applying an Expectation Suite to a Batch. and explore your data.

Prerequisites: This how-to guide assumes you have:
  • Completed the Getting Started Tutorial
  • A working installation of Great Expectations
  • Have access to a working Spark installation
  • Have access to data on a filesystem

Steps​

1. Choose how to run the code in this guide​

Get an environment to run the code in this guide. Please choose an option below.

If you use the Great Expectations CLICommand Line Interface, run this command to automatically generate a pre-configured Jupyter Notebook. Then you can follow along in the YAML-based workflow below:

great_expectations datasource new

2. πŸ’‘ Instantiate your project's DataContext​

Import these necessary packages and modules.

from ruamel import yaml

import great_expectations as gx
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest

Please proceed only after you have instantiated your DataContext.

3. Configure your Datasource​

Using this example configuration, add in your path to a directory that contains some of your data:

datasource_yaml = rf"""
name: my_filesystem_datasource
class_name: Datasource
execution_engine:
class_name: SparkDFExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetFilesystemDataConnector
base_directory: <YOUR_PATH>
default_regex:
group_names:
- data_asset_name
pattern: (.*)\.csv
"""

Run this code to test your configuration.

context.test_yaml_config(datasource_yaml)

If you specified a path containing CSV files you will see them listed as Available data_asset_names in the output of test_yaml_config().

Feel free to adjust your configuration and re-run test_yaml_config() as needed.

4. Save the Datasource configuration to your DataContext​

Save the configuration into your Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components. by using the add_datasource() function.

context.add_datasource(**yaml.load(datasource_yaml))

5. Test your new Datasource​

Verify your new DatasourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. by loading data from it into a ValidatorUsed to run an Expectation Suite against data. using a Batch RequestProvided to a Datasource in order to create a Batch..

Add the path to your CSV in the path key under runtime_parameters in your BatchRequest.

batch_request = RuntimeBatchRequest(
datasource_name="my_filesystem_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="<YOUR_MEANGINGFUL_NAME>", # this can be anything that identifies this data_asset for you
runtime_parameters={"path": "<PATH_TO_YOUR_DATA_HERE>"}, # Add your path here.
batch_identifiers={"default_identifier_name": "default_identifier"},
)

Then load data into the Validator.

context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())

πŸš€πŸš€ Congratulations! πŸš€πŸš€ You successfully connected Great Expectations with your data.

Additional Notes​

How to read-in multiple CSVs as a single Spark Dataframe​

More advanced configuration for reading in CSV files through the SparkDFExecutionEngine is possible through the batch_spec_passthrough parameter. batch_spec_passthrough allows for reader-methods to be directly specified, and backend-specific reader_options to be passed through to the actual reader-method, in this case spark.read.csv(). The following example shows how batch_spec_passthrough parameters can be added to the BatchRequest. However, the same parameters can be added to the Datasource configuration at the DataConnector level.

If you have a directory with 3 CSV files with each file having 10,000 lines each:

  taxi_data_files/yellow_tripdata_sample_2019-1.csv
taxi_data_files/yellow_tripdata_sample_2019-2.csv
taxi_data_files/yellow_tripdata_sample_2019-3.csv

You could write a BatchRequest that reads in the entire folder as a single Spark Dataframe by specifying the reader_method to be csv, header to be set to True in the reader_options.

batch_request = RuntimeBatchRequest(
datasource_name="my_filesystem_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="example_data_asset",
runtime_parameters={"path": "taxi_data_files"},
batch_identifiers={"default_identifier_name": 1234567890},
batch_spec_passthrough={"reader_method": "csv", "reader_options": {"header": True}},
)

Once that step is complete, then we can confirm that our Validator contains a BatchA selection of records from a Data Asset. with the expected 30,000 lines.

context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)

print(validator.head())
print(validator.active_batch.data.dataframe.count()) # should be 30,000

To view the full scripts used in this page, see them on GitHub:

Next Steps​

Now that you've connected to your data, you'll want to work on these core skills: