![]() Highly available and fault-tolerant enterprise and web-scale software deployments.Cloud foundational services (compute, network, storage, and databases).Designing, developing, and deploying business software at scale.This code writes five files to the Amazon S3 location /' I need to create a supporting function called "py_create_emr_scripts" so lets take a look at this code. To do this I am going to define a new task called "create_emr_scripts" using the PythonOperator. In this walkthrough however, I am going to create those scripts using the same variables we have defined to make sure that those scripts change dynamically as our needs change. We could just create these outside of Apache Airflow and upload them, and this is an option. If we look at the steps we are looking to automate, the first one is to upload our Apache Hive and Presto scripts to a location on Amazon S3 where we can run them from our Amazon EMR steps. Genre_t = Variable.get( "emr_genre_table", default_var= "undefined") Genre = Variable.get( "emr_genre", default_var= "undefined") In the previous workflow I had hardcoded the genre so this time I wanted to add it as a variable meaning we could create a single workflow, parameterise it and then run it as many times as we needed, just having to change that variable "genre" and "genre_t" s3_dlake = Variable.get( "s3_dlake", default_var= "undefined")Įmr_db = Variable.get( "emr_db", default_var= "undefined")Įmr_output = Variable.get( "emr_output", default_var= "undefined") ![]() I next part of our workflow is the same, except this time we have added some more variables. We can take a look at the documentation for this operator at the Apache Airflow website, Amazon EMR OperatorsĪs part of our workflow, we want to create an Amazon EMR cluster, add some steps to run some of the Presto and Apache Hive queries, and then terminate the cluster so we need to add those operators (EmrCreateJobFlowOperator, EmrAddStepsOperator, EmrTerminateJobFlowOperator and EmrStepSensor) in our DAG from airflow import DAG, settings, secretsįrom _operator import PythonOperator, BranchPythonOperatorįrom _operator import DummyOperatorįrom _add_steps_operator import EmrAddStepsOperatorįrom _create_job_flow_operator import EmrCreateJobFlowOperatorįrom _terminate_job_flow_operator import EmrTerminateJobFlowOperatorįrom _step_sensor import EmrStepSensorįrom _rule import TriggerRule However, this time we are using Amazon EMR and if we look at the available Apache Airflow operators we can see that there is an Amazon EMR operator which will make our life easy. Not surprisingly, this workflow begins in a very similar way to the previous one. Clean up and shut down any resources so we can minimise the cost of running this operation.Move the export csv file to a new location in the data lake.Export the new table as a csv file (again using the scripts we already uploaded).Create a new table that just contains the information we are looking for (in this example, films of a particular genre).Create tables to import the movie and ratings data (using the scripts we uploaded).Check to see if a database exists and create it if it does not exist.Create our Apache Hive and Presto SQL scripts and upload those to a location on Amazon S3.As we are automating this, a lot of the stuff we would not need to do because we absorb that as part of the manual work (for example, I already have a database called XX, so I do not need to re-create that) we need to build into the workflow. To recap: We are using the Movielens dataset, loaded it into our data lake on Amazon S3 and we have been asked to a) create a new table with a subset of the information we care about, in this instance a particular genre of films, and b) create a new file with the same subset of information available in the data lake.Īs part of the set of manual steps we are trying to automate, we are using Amazon EMR (again as for the previous post, if you want to see those manual steps, refer to the documentation in the GitHub repository) together with some Apache Hive and Presto SQL scripts to create tables and export files. ![]() All the code so you can reproduce this yourself can be found in the GitHub repository here. Make sure you recap the setup from Part One. In this post, Part Two, we will do the same thing but automate the same example ELT workflow using Amazon EMR. In Part One, we automated an example ELT workflow on Amazon Athena using Apache Airflow. ![]()
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |