Skip to main content

How to Use Great Expectations in EMR Serverless

This Guide demonstrates how to set up, initialize and run validations against your data on AWS EMR Serverless. We will cover case with RuntimeDataConnector and use S3 as metadata store.

0. Pre-requirements

  • Configure great_expectations.yaml and upload to your S3 bucket or generate it dynamically from code, notice critical moment, that you need to add endpoint_url to data_doc section
config_version: 3.0
datasources:
spark_s3:
module_name: great_expectations.datasource
class_name: Datasource
execution_engine:
module_name: great_expectations.execution_engine
class_name: SparkDFExecutionEngine
data_connectors:
default_inferred_data_connector_name:
class_name: InferredAssetS3DataConnector
bucket: bucket_name
prefix: data_folder/
default_regex:
pattern: (.*)
group_names:
- data_asset_name
default_runtime_data_connector_name:
batch_identifiers:
- runtime_batch_identifier_name
module_name: great_expectations.datasource.data_connector
class_name: RuntimeDataConnector
config_variables_file_path: great_expectations/uncommitted/config_variables.yml


plugins_directory: great_expectations/plugins/

stores:
expectations_S3_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'great_expectations/expectations/'

validations_S3_store:
class_name: ValidationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'great_expectations/uncommitted/validations/'

evaluation_parameter_store:
class_name: EvaluationParameterStore

checkpoint_S3_store:
class_name: CheckpointStore
store_backend:
class_name: TupleS3StoreBackend
bucket: 'bucket_name'
prefix: 'great_expectations/checkpoints/'

expectations_store_name: expectations_S3_store
validations_store_name: validations_S3_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_S3_store

data_docs_sites:
s3_site:
class_name: SiteBuilder
show_how_to_buttons: false
store_backend:
class_name: TupleS3StoreBackend
bucket: crystall-profiling-test
boto3_options:
endpoint_url: https://bucket_name.s3.region.amazonaws.com/
site_index_builder:
class_name: DefaultSiteIndexBuilder

anonymous_usage_statistics:
enabled: True

1. Install Great Expectations

Create a Dockerfile and build it to generate virtualenv archive and upload this tar.gz output to S3 bucket. At requirements.txt you should have great_expectations package and everything else what you want to install

FROM --platform=linux/amd64 amazonlinux:2 AS base

RUN yum install -y python3

ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY ./requirements.txt /
RUN python3 -m pip install --upgrade pip && \
python3 -m pip install -r requirements.txt --no-cache-dir

RUN mkdir /output && venv-pack -o /output/pyspark_ge.tar.gz

FROM scratch AS export
COPY --from=base /output/pyspark_ge.tar.gz /

When you will configure a job, it's necessary to define additional params to Spark properties:

--conf spark.archives=s3://bucket/folder/pyspark_ge.tar.gz#environment 
--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

Then import necessary libs:

import boto3
import yaml
from pyspark import SQLContext

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context.types.base import (
DataContextConfig,
S3StoreBackendDefaults,
)
from great_expectations.util import get_context

2. Set up Great Expectations

Here we initialize a Spark, and read great_expectations.yaml

if __name__ == "__main__":
### critical part to reinitialize spark context
sc = gx.core.util.get_or_create_spark_application()
spark = SQLContext(sc)

spark_file = "pyspark_df.parquet"
suite_name = "pandas_spark_suite"
session = boto3.Session()
s3_client = session.client("s3")
response = s3_client.get_object(
Bucket="bucket_name",
Key="bucket_name/great_expectations/great_expectations.yml",
)
config_file = yaml.safe_load(response["Body"])

3. Connect to your data

df_spark = spark.read.parquet("s3://bucket_name/data_folder/" + spark_file)

config = DataContextConfig(
config_version=config_file["config_version"],
datasources=config_file["datasources"],
expectations_store_name=config_file["expectations_store_name"],
validations_store_name=config_file["validations_store_name"],
evaluation_parameter_store_name=config_file["evaluation_parameter_store_name"],
plugins_directory="/great_expectations/plugins",
stores=config_file["stores"],
data_docs_sites=config_file["data_docs_sites"],
config_variables_file_path=config_file["config_variables_file_path"],
anonymous_usage_statistics=config_file["anonymous_usage_statistics"],
checkpoint_store_name=config_file["checkpoint_store_name"],
store_backend_defaults=S3StoreBackendDefaults(
default_bucket_name=config_file["data_docs_sites"]["s3_site"][
"store_backend"
]["bucket"]
),
)

context_gx = get_context(project_config=config)

4. Create Expectations

expectation_suite_name = suite_name
suite = context_gx.get_expectation_suite(suite_name)

batch_request = RuntimeBatchRequest(
datasource_name="spark_s3",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="datafile_name",
batch_identifiers={"runtime_batch_identifier_name": "default_identifier"},
runtime_parameters={"path": "s3a://bucket_name/path_to_file.format"},
)
validator = context_gx.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
validator.expect_column_values_to_not_be_null(
column="passenger_count"
) ## add some test
validator.save_expectation_suite(discard_failed_expectations=False)

5. Validate your data

my_checkpoint_name = "in_memory_checkpoint"
python_config = {
"name": my_checkpoint_name,
"class_name": "Checkpoint",
"config_version": 1,
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
"action_list": [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
],
"validations": [
{
"batch_request": {
"datasource_name": "spark_s3",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "pyspark_df",
},
"expectation_suite_name": expectation_suite_name,
}
],
}
context_gx.add_checkpoint(**python_config)

results = context_gx.run_checkpoint(
checkpoint_name=my_checkpoint_name,
run_name="run_name",
batch_request={
"runtime_parameters": {"batch_data": df_spark},
"batch_identifiers": {
"runtime_batch_identifier_name": "default_identifier"
},
},
)

validation_result_identifier = results.list_validation_result_identifiers()[0]
context_gx.build_data_docs()

6. Congratulations!

Your data docs built on S3 and you can see index.html at the bucket

This documentation has been contributed by Bogdan Volodarskiy from Provectus