How to Use Great Expectations in AWS Glue
This Guide demonstrates how to set up, initialize and run validations against your data on AWS Glue Spark Job. We will cover case with RuntimeDataConnector and use S3 as metadata store.
0. Pre-requirements
- Configure great_expectations.yaml and upload to your S3 bucket or generate it dynamically from code
 
config_version: 3.0
datasources:
  spark_s3:
    module_name: great_expectations.datasource
    class_name: Datasource
    execution_engine:
      module_name: great_expectations.execution_engine
      class_name: SparkDFExecutionEngine
    data_connectors:
      default_inferred_data_connector_name:
        class_name: InferredAssetS3DataConnector
        bucket: bucket_name
        prefix: data_folder_prefix/
        default_regex:
          pattern: (.*)
          group_names:
            - data_asset_name
      default_runtime_data_connector_name:
        batch_identifiers:
          - runtime_batch_identifier_name
        module_name: great_expectations.datasource.data_connector
        class_name: RuntimeDataConnector
config_variables_file_path: great_expectations/uncommitted/config_variables.yml
plugins_directory: great_expectations/plugins/
stores:
  expectations_S3_store:
    class_name: ExpectationsStore
    store_backend:
      class_name: TupleS3StoreBackend
      bucket: 'bucket_name'
      prefix: 'path_from_root/great_expectations/expectations/'
  validations_S3_store:
    class_name: ValidationsStore
    store_backend:
      class_name: TupleS3StoreBackend
      bucket: 'bucket_name'
      prefix: 'path_from_root/great_expectations/uncommitted/validations/'
  evaluation_parameter_store:
    class_name: EvaluationParameterStore
  checkpoint_S3_store:
    class_name: CheckpointStore
    store_backend:
      class_name: TupleS3StoreBackend
      bucket: 'bucket_name'
      prefix: 'path_from_root/great_expectations/checkpoints/'
expectations_store_name: expectations_S3_store
validations_store_name: validations_S3_store
evaluation_parameter_store_name: evaluation_parameter_store
checkpoint_store_name: checkpoint_S3_store
data_docs_sites:
  s3_site:
    class_name: SiteBuilder
    show_how_to_buttons: false
    store_backend:
      class_name: TupleS3StoreBackend
      bucket: bucket_name
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
anonymous_usage_statistics:
  enabled: True
1. Install Great Expectations
You need to add to your AWS Glue Spark Job Parameters to install great expectations module. Glue at least v2
  — additional-python-modules great_expectations
Then import necessary libs:
import boto3
import yaml
from awsglue.context import GlueContext
from pyspark.context import SparkContext
import great_expectations as gx
from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context.types.base import (
    DataContextConfig,
    S3StoreBackendDefaults,
)
from great_expectations.util import get_context
2. Set up Great Expectations
Here we initialize a Spark and Glue, and read great_expectations.yaml
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
s3_client = boto3.client("s3")
response = s3_client.get_object(
    Bucket="bucket", Key="bucket/great_expectations/great_expectations.yml"
)
config_file = yaml.safe_load(response["Body"])
3. Connect to your data
config = DataContextConfig(
    config_version=config_file["config_version"],
    datasources=config_file["datasources"],
    expectations_store_name=config_file["expectations_store_name"],
    validations_store_name=config_file["validations_store_name"],
    evaluation_parameter_store_name=config_file["evaluation_parameter_store_name"],
    plugins_directory="/great_expectations/plugins",
    validation_operators=config_file["validation_operators"],
    stores=config_file["stores"],
    data_docs_sites=config_file["data_docs_sites"],
    config_variables_file_path=config_file["config_variables_file_path"],
    anonymous_usage_statistics=config_file["anonymous_usage_statistics"],
    checkpoint_store_name=config_file["checkpoint_store_name"],
    store_backend_defaults=S3StoreBackendDefaults(
        default_bucket_name=config_file["data_docs_sites"]["s3_site"]["store_backend"][
            "bucket"
        ]
    ),
)
context_gx = get_context(project_config=config)
4. Create Expectations
expectation_suite_name = "suite_name"
suite = context_gx.create_expectation_suite(expectation_suite_name)
batch_request = RuntimeBatchRequest(
    datasource_name="spark_s3",
    data_asset_name="datafile_name",
    batch_identifiers={"runtime_batch_identifier_name": "default_identifier"},
    data_connector_name="default_inferred_data_connector_name",
    runtime_parameters={"path": "s3a://bucket_name/path_to_file.format"},
)
validator = context_gx.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)
print(validator.head())
validator.expect_column_values_to_not_be_null(
    column="passenger_count"
)  ## add some test
validator.save_expectation_suite(discard_failed_expectations=False)
5. Validate your data
checkpoint_config = {
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name,
        }
    ],
}
checkpoint = SimpleCheckpoint(
    f"_tmp_checkpoint_{expectation_suite_name}", context_gx, **checkpoint_config
)
results = checkpoint.run(result_format="SUMMARY", run_name="test")
validation_result_identifier = results.list_validation_result_identifiers()[0]
6. Congratulations!
Your data docs built on S3 and you can see index.html at the bucket