Traditional ETL tools are playing catch up in handling semi-structured and unstructured data that is generated continuously from multiple sources. This raw data needs to be transformed into intelligence in real time. Spark is a great tool for building ETL pipelines to continuously clean, process and aggregate stream data before loading to a data store. These 10 concepts are learnt from a lot of research done over the past one year in building complex Spark streaming ETL applications to deliver real time business intelligence. This is my contribution to the Big Data Developer community in consolidating key learnings that would benefit the community by and large, we are going to discuss 10 important concepts that will accelerate your transition from using traditional ETL tool to Apache Spark for ETL
Unlike many traditional ETL tools, Spark’s uses a master/worker architecture. The master creates, schedules, distributes and executes the tasks on the workers.
- Data structures
RDD (Resilient Distributed Data) is the basic data structure in Spark. The name signifies that the data is recoverable from failures and is distributed across all the nodes. A Dataframe, is conceptually equivalent to a relational database table, has features like schema and built in query optimization. It is recommended to use Dataframe API as much as possible.
- Spark configurations
To be able to create an RDD/Dataframe and perform operations on it, every spark application must make a connection to the spark cluster by creating an object called SparkContext. Spark provides flexibility to configure properties like number of executors, number of cores per executor, and amount of memory per executor for each application. These properties are set by creating a configuration object called SparkConf, which will be used to create SparkContext
- RDD transformations
Transformations are functions applied on a RDD to create one or more RDDs. The operations are applied in parallel on the RDD partitions. map transformation is one of the most widely used transformation, however it is important to understand the benefits of using mapPartitions function to achieve better performance
map vs mapPartions
When we apply a map function on an RDD, it passes each element of the RDD through the function itself. We may create static values/objects that are used while applying the function on each element of the RDD. There are scenarios where we don’t want to create an object (size limitations or operation expense) or we cannot serialize an object to the worker nodes, for each element of the RDD. If we have 100k elements in an RDD, the function is invoked 100k times. map function is not the best option in this case.
Conversely, mapPartitions allows us to create the static values/objects once for each partition of data and invoke the function once per partition, thereby gaining performance improvements by being able to apply the function on more than one element. If we are creating database connections or creating an object that takes up executor resources(costly), mapPartitions is recommended to use.
- Lazy evaluation
Typically, an ETL job comprises of transformations that are applied on the input data before loading the final data into the target data store. Let’s assume we have an ETL with one step to extract the data into spark environment, five steps of transformations and one step to load the data into an external datastore. In Spark, the transformations are not executed/materialized until the action to load data into the external datastore is called. Spark generates a DAG (graph/lineage) of all the transformations, makes many optimization decisions based on the generated DAG and breaks the DAG into jobs, stages and tasks that get distributed to the workers. The tasks are serialized when sent from the driver to the executor. The executors de-serializes the tasks using the assembly jar shipped to each executor. The workers are also responsible of loading the part of the final output data to the external datastore in parallel.
- Caching and broadcasting data
An RDD is a collection of partitions that are immutable subsets and are distributed across the nodes in the cluster. Spark distributes the tasks generated from the DAG graph to the workers to be applied on the partitions of data they hold. To speed up transformations that are iterative in nature, the data can be cached in the worker nodes using cache() or persist(). Certain transformations require static data like lookup data available to all workers locally. This is achieved by broadcasting the data to all the workers using broadcast() function.
It is a programming module used to load data into Spark from variety of sources and provides a very easy interface to run SQL Queries on loaded data. This feature gives traditional ETL developers an easy transition into Spark programming.
Each worker node in Hadoop cluster has compute resources like memory and CPU. YARN is the resource manager in Hadoop and is the ultimate authority in allocating/managing the resources to spark jobs. Each node also has a node manager, responsible for monitoring resource usage of individual containers/executors, track overall node’s health and report to the resource manager(YARN) periodically. When a Spark Job is launched, a yarn application master is initiated in one of the nodes in the cluster. Now the application master is responsible for negotiating the resources with YARN. Once the resources are allocated to the application master, it requests the node managers to launch yarn containers/executors with the amount of memory and CPU specified in the spark-submit. After the job is successfully launched, YARN provides the ability to look at the logs using the application id of the spark job.
- Spark History Server
For every spark job submitted on the cluster, a web UI is launched to display useful information like list of jobs/stages/tasks, memory/disk utilization and other information of each executor assigned to the job. If the Spark event log is enabled, the information is available even after the spark application finishes. For spark streaming applications that run for indefinite period of time, enabling event log will increase the disk utilization rapidly. So, it is recommended to turn off spark history server event logging for long running jobs or have a script that restarts the streaming job periodically and clean the event logs.
Spark submit is used to launch a spark application on cluster. A typical project may contain more than one job and an ETL job for semi-structured/unstructured data may need to import helper functions to perform operations on the data. To package all the library dependencies and to keep all ETL jobs in a project in one place, a build tool like SBT is used to generate an assembly jar. Launching a spark job is done using a shell script with details like job name (class name), assembly jar file path and spark configurations to control the resources allocated to the job. A sample spark-submit shell script looks like:
spark-submit –class path.to.your.class –deploy-mode cluster –master yarn –executor-memory 2g –num-executors 3 path.to.your.assemblyJar &