Tired of maintaining complex SQL queries! It's Spark's turn

First Spark SQL

It's hard to say what Apache Spark is, but this article describes Spark's (probably the most important feature) ** Spark SQL **. If you don't know this, you won't understand the rest of Spark, and Spark SQL alone has a huge range of applications for real-world apps.

When to use

This is the purpose of Spark SQL in a nutshell.

Using multiple tabular data as materials, process and aggregate and output different tabular data

For example

If it is the sales data of the store →
Graphing how sales differ by date or product,
If you are running a website →
What kind of search keyword did the visitor enter, what kind of page transition did each visit, and what kind of page transition did / did not reach the purchase
For software development →
Aggregate how the number of test executions and the number of failures have changed,

There are various things. I think that every company is doing it, even if it has skill in the method.

Well, when the data is small, ** Excel or Google Spreadsheet is enough **, but as the size increases, it gradually becomes unmanageable.

If it is an old and huge system, it will be a category of processing called "batch processing", but this kind of thing makes you want to flexibly change the way of data processing. I think it's a common story to try to divide what you were just looking at by date by time of day or take the weather into consideration.

Also, it is often the case that the processing is ** multi-stage **. It is a pattern such as creating daily data from detailed sales data, processing it into monthly data, and finally making a graph of the same month of the previous year.

Although it is an image diagram, it seems that data processing is multi-stage.

DataFlow.jpg

The good thing about Spark

Now, when this kind of data processing is necessary and the data size is too large for Excel to keep up with, I have come to believe in Spark. Especially if the current process applies next.

Next, I will explain in a little more detail.

If you are forcibly doing it with SQL

This is a case where the target data is output by making full use of SQL VIEW, JOIN, GROUP BY and so-called stored procedures. I think everyone is aware of the poor maintainability in this case.

SELECT
  C.CustomerID, C.CustomerName,
  C.CustomerType, C.Address1, C.City,
  C.State, S.TotalSales
FROM
  Customers C
INNER JOIN
  (SELECT CustomerID, SUM(Sales) as TotalSales FROM Sales GROUP BY CustomerID) S
ON
  C.CustomerID = S.CustomerID

This is easy to find on the Web, but I'm sure there are countless more horrifying SQLs out there. Moreover, since I want to change the way data is cut, there are many cases where I often end up maintaining such strange SQL. I would be grateful if you could comment if you feel that this is no longer possible to maintain.

Compared to SQL, Spark has these advantages.

When data is fetched by SQL and then aggregated by a normal program

I think this is also a common pattern. It is a type that gently fetches data with a very simple SQL, then loops to fetch the data, and makes full use of ʻArray, HashMap`, etc. to aggregate the data. A simple aggregation is fine, but this method also increases memory consumption and execution time as the amount of data increases, and a complicated analysis flow (there are many intermediate data in the above figure, and the flow branches). If there are multiple final data), the code will be complicated.

In the case of Spark, load distribution is easy because ** clusters can be formed ** quite easily. Of course, it is impossible to aggregate data that essentially requires you to see all the data, but if you understand the habits, it is a big point that you can support clusters in the same way as when developing and testing with a single node.

As an image,

It is a flow. It is very useful for table JOIN.

Languages that can be used

Scala seems to be the original implementation of Spark,

Is available. Is it Java or Python in terms of affinity from general apps?

Query example

With Spark SQL, you can comfortably write aggregate code as if you were focusing on SQL keywords and functions. The following is a sample that quickly cuts out ** "Spark SQL-like oozing" in Java **. Please read while imagining what kind of coding style it will be.

withColumn

withColumn dynamically creates a new column using the data of an existing column. This creates a new column, datetime, which is a string of millisecond integer times in the timestamp column in the yyyy-MM-dd HH: mm: ss format.

python


        data = data.withColumn("datetime", date_format(
            col("timestamp")
                .divide(1000)
                .cast(DataTypes.TimestampType), "yyyy-MM-dd HH:mm:ss")
        ));

groupBy

groupBy divides the target record into groups with the same key value and aggregates within that group to create a new table, similar to that in SQL.

This groups all records with the same name column, and each one

To create a new table.

python


            Dataset<Row> data = data.groupBy("name").agg(
                        sum("score").as("totalScore"),
                        avg("duration").as("durationAvg"),
                        stddev("duration").as("durationStddev"),
                        count("*").as("count"));

In this way, common processing can be done with the one provided by Spark, and if it is not in time, you can write the processing with the power of Java.

Wouldn't it be better than trying hard to maintain the painful SQL?

If you write in Java like this, the JAR after build will be deployed to the cluster and distributed processing will be possible! Or, of course, you can pay the fee only when you put it on AWS Lambda and perform the aggregation process.

RDD and Dataset

This is where it's easy to trip over when learning Spark SQL. Both are classes for dealing with huge table "wind" data, and I'm not sure at first what's different (I was the same), but to keep in mind:

If you look up Spark related information, you will often see DataFrame, which is an alias for Dataset \ <Row >. Row is a class that can be used for general-purpose row data without specifying a type.

Relationship with Hadoop

Hadoop is the origin of this kind of distributed processing framework rather than Spark, and Spark also uses the Hadoop File System behind the scenes. However, while Hadoop proceeds while saving the result to disk for each series of processing, in the case of Spark, the data is only in memory unless explicitly saved, so if you save only the really important intermediate data during processing You can gain a lot of speed just by doing it. However, it may not be possible for Spark to analyze a huge amount of data that cannot be overcome by adding up the memory of all the nodes in the cluster.

Bad place

I've only praised Spark so far, but of course there are some things that don't work. The most annoying thing is that the error message is unfriendly, especially vulnerable to type errors (I tried to read it as an int but the actual data was a string, etc.). In such a case, be careful as you will get a suspicious error message that you can not think of with common sense.

This is an example of an error message when you make a slight mistake in the type. You generate the source code generated.java and compile it ... I don't intend to write a process that requires such a complicated technique, but I think there is some reason.

python


Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 562, Column 35: 
failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 562, Column 35:
A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import at
 org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1304) at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1376) at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1373) at 
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) at

Before I get used to it, I tend to think that "Spark that can't give a decent error message is shit", but I'm used to it. Spark is more valuable than it is to make up for these shortcomings. It may be a little better if the version upgrade progresses, but in some cases I'm thinking about fixing it myself and contributing.

You there who are tired of complex SQL queries! Spark is good!

Recommended Posts

Tired of maintaining complex SQL queries! It's Spark's turn