Credit: IBM
Offloading your Informix data in Spark, Part
4
Add external data sources, bring more insights!
Content series:
This content is part # of # in the series: Offloading your Informix data in Spark, Part 4
https://www.ibm.com/developerworks/library/?series_title_by=**auto**
Stay tuned for additional content in this series.
This content is part of the series:Offloading your Informix data in Spark, Part 4
Stay tuned for additional content in this series.
Where
are we? Where are we going?
In the first three parts of this series, you learned how to:
You could argue that everything you have done so far is doable with a
traditional relational database. And you would be completely right. Now
it’s time to discover some of the power of Spark by adding two other data
sources.
More data
why?
Yesterday morning, as you were celebrating International Coffee Day, you
overheard the vice president of sales talking to the new sales guy. They were
discussing sales zones, where people make money, and the retail shops in
the area. As a quick reminder, your company sells sports equipment in
wholesale to retail shops around the United States. Although you did not fully
understand their sales lingo, you were pretty upset with their knowledge
of data. You had to jump in: “I can analyze the sales data, cross it with
median revenue per ZIP code and the population size for this area, you can
then see which ZIP code needs more attention.”
You knew you would make a few new friends with such statements. Now you
need to deliver.
Even more
data!
Your first reflex of visiting IBM’s developerWorks is good. Nevertheless,
you need two additional datasets: the population per ZIP code and the
revenue per ZIP code.
For those datasets, rely on two U.S. administrations: the United States Census
Bureau and the Internal Revenue Service (IRS). For purposes of this tutorial, a
curated version of the Census Bureau is used because the raw data is a bit
challenging to understand.
Experiment
with the IRS
With the IRS data, the first exercise is to
find the number of households above a pre-defined revenue of $75,000/year
in your sales area using the original IRS dataset.
You can download the code and data from GitHub. For this part, the labs
are in the net.jgp.labs.informix2spark.l4xx
package. The data
is in the data directory. The IRS provides a technical explanation of the
dataset. It is named 14zpdoc.doc
and available in the
repository’s data directory.
Basically, each area is defined by its ZIP code. Each area is divided into
six adjusted gross income (AGI) groups, based on revenue brackets:
- Below $25,000
- Between $25,000 and below $50,000
- Between $50,000 and below $75,000
- Between $75,000 and below $100,000
- Between $100,000 and below $200,000
- More than $200,000
The target for this tutorial is the three upper income groups.
All the code is provided to give you a global view.
package net.jgp.labs.informix2spark.l400; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class HouseholdsAboveMedianRevenuePerZipApp { public static void main(String[] args) { HouseholdsAboveMedianRevenuePerZipApp app = new HouseholdsAboveMedianRevenuePerZipApp();
Run the application by passing the ZIP code to analyze.
app.start(27514); } private void start(int zip) {
You create a spark session in local mode.
SparkSession spark = SparkSession .builder() .appName("Number of households in your ZIP Code") .master("local") .getOrCreate();
It is simple to load CSV files in Spark: use the format
method
with the csv
argument. The CSV loader accepts many options
(as you know, CSV can be tricky). The IRS files follow a fairly simple
scenario, where you use two options:
- The first row in the file is a header
- Have Spark infer the schema so it can determine what datatypes you
will use.
If you don’t have Spark infer the schema, all columns of this dataframe
will be considered strings. CSV parsing has many more options, and each
option is explained in my blog.
Finally, note that the filename uses a wildcard. Spark loads all the files
matching 14zpallagi*.csv
in the data directory. You can use a
regular expression here:
14zpallagi*.csv
reads all the files starting with
14zpallagi and finishing with .csv,14zpallagi-part[1-3].csv
reads:
14zpallagi-part1.csv
,14zpallagi-part2.csv
,
and14zpallagi-part3.csv
.
String filename = "data/14zpallagi*.csv"; Dataset<Row> df = spark .read() .format("csv") .option("inferSchema", "true") .option("header", "true") .load(filename);
You can now inspect the loaded data:
df.printSchema();
First look at the long schema. It is long because the IRS generously shares
127 columns (I have removed a lot of them).
root |-- STATEFIPS: integer (nullable = true) |-- STATE: string (nullable = true) |-- zipcode: integer (nullable = true) |-- agi_stub: integer (nullable = true) |-- N1: double (nullable = true) |-- mars1: double (nullable = true) |-- MARS2: double (nullable = true) |-- MARS4: double (nullable = true) |-- PREP: double (nullable = true) |-- N2: double (nullable = true) |-- NUMDEP: double (nullable = true) |-- TOTAL_VITA: double (nullable = true) |-- VITA: double (nullable = true) |-- TCE: double (nullable = true) |-- A00100: double (nullable = true) |-- N02650: double (nullable = true) …
Look at a sample of the data. The sample()
method takes up to three parameters, replacement – or record independence, a fraction, and
(optionally) a seed.
df.sample(true, 0.01, 4589).show(2); System.out.println("Dataframe has " + df.count() + " rows and " + df.columns().length + " columns.");
I limited the output to two rows!
+---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+ |STATEFIPS|STATE|zipcode|agi_stub| N1| mars1| MARS2|MARS4| PREP| N2| NUMDEP|TOTAL_VITA|VITA|TCE| A00100| N02650| A02650| N00200| A00200| N00300| A00300| N00600| A00600| N00650| A00650| N00700| A00700| N00900| A00900| N01000| A01000|N01400| A01400| N01700| A01700| SCHF|N02300|A02300| N02500| A02500| N26270| A26270| N02900| A02900|N03220|A03220|N03300| A03300|N03270| A03270|N03150| A03150|N03210|A03210|N03230|A03230|N03240| A03240| N04470| A04470| A00101| N18425| A18425|N18450|A18450| N18500| A18500| N18300| A18300| N19300| A19300| N19700| A19700| N04800| A04800| N05800| A05800| N09600| A09600|N05780|A05780| N07100| A07100| N07300| A07300|N07180|A07180|N07230|A07230|N07240|A07240|N07220|A07220|N07260|A07260| N09400| A09400|N85770|A85770|N85775|A85775|N09750|A09750| N10600| A10600|N59660|A59660|N59720|A59720|N11070|A11070|N10960|A10960|N11560|A11560| N06500| A06500| N10300| A10300| N85530| A85530| N85300| A85300| N11901| A11901| N11902| A11902| +---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+ | 1| AL| 0| 6|51270.0|4160.0|45860.0|910.0|39250.0|148020.0|50820.0| 80.0|80.0|0.0|2.2652783E7|51270.0|2.3073614E7|44660.0|1.1051815E7|40610.0|264670.0|32640.0|791136.0|31530.0|620013.0|29090.0|96411.0|13720.0|855431.0|31500.0|2331256.0|9340.0|444369.0|14820.0|833058.0|2630.0| 520.0|1842.0|11220.0|292923.0|22620.0|4633716.0|22380.0|420831.0|1080.0| 267.0|2770.0|86011.0|8520.0|92498.0|1140.0|10827.0| 0.0| 0.0| 0.0| 0.0|1660.0|61679.0|47840.0|2499208.0|2.1437993E7|45930.0|761404.0|1440.0|4867.0|45630.0|168515.0|47820.0|962132.0|36310.0|456194.0|45490.0|920293.0|51240.0|1.9692443E7|51230.0|5274998.0|15590.0|77387.0| 0.0| 0.0|21020.0|54852.0|15290.0|25542.0|3230.0|1747.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|1410.0| 459.0|14770.0|124954.0| 0.0| 0.0| 0.0| 0.0| 100.0| 219.0|50690.0|5185930.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|51170.0|5204318.0|51230.0|5472274.0|19090.0|33930.0|25450.0|89886.0|28340.0|774418.0|15560.0|243494.0| | 1| AL| 35004| 5| 590.0| 40.0| 530.0| 0.0| 300.0| 1660.0| 550.0| 0.0| 0.0|0.0| 74554.0| 590.0| 75493.0| 560.0| 64835.0| 260.0| 150.0| 140.0| 236.0| 130.0| 149.0| 350.0| 310.0| 100.0| 1671.0| 100.0| 364.0| 60.0| 1459.0| 150.0| 3991.0| 0.0| 0.0| 0.0| 90.0| 1796.0| 40.0| 1408.0| 240.0| 939.0| 30.0| 8.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 130.0| 132.0| 0.0| 0.0| 0.0| 0.0| 450.0| 9296.0| 57663.0| 430.0| 2268.0| 0.0| 0.0| 420.0| 353.0| 450.0| 2766.0| 400.0| 2900.0| 420.0| 2321.0| 590.0| 56931.0| 590.0| 9612.0| 0.0| 0.0| 0.0| 0.0| 310.0| 448.0| 40.0| 3.0| 120.0| 66.0| 70.0| 99.0| 0.0| 0.0| 190.0| 249.0| 0.0| 0.0| 60.0| 228.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 590.0| 10176.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 60.0| 59.0| 0.0| 0.0| 580.0| 9179.0| 580.0| 9419.0| 0.0| 0.0| 0.0| 0.0| 200.0| 625.0| 380.0| 1553.0| +---------+-----+-------+--------+-------+------+-------+-----+-------+--------+-------+----------+----+---+-----------+-------+-----------+-------+-----------+-------+--------+-------+--------+-------+--------+-------+-------+-------+--------+-------+---------+------+--------+-------+--------+------+------+------+-------+--------+-------+---------+-------+--------+------+------+------+-------+------+-------+------+-------+------+------+------+------+------+-------+-------+---------+-----------+-------+--------+------+------+-------+--------+-------+--------+-------+--------+-------+--------+-------+-----------+-------+---------+-------+-------+------+------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+------+-------+--------+------+------+------+------+------+------+-------+---------+------+------+------+------+------+------+------+------+------+------+-------+---------+-------+---------+-------+-------+-------+-------+-------+--------+-------+--------+ only showing top 2 rows The dataframe has 166719 rows and 127 columns.
You do not have to clean your dataset, but I find it more readable. To
clean your dataset, filter on the ZIP code and drop the extra columns.
Dataset<Row> df2 = df.filter(df.col("zipcode").equalTo(zip)); String[] colsToDrop = { "STATEFIPS", "mars1", "MARS2", "MARS4", "PREP", "N2", "NUMDEP", "TOTAL_VITA", "VITA", "TCE", "A00100", "N02650", "N00200", "A00200", "N00300", "A00300", "N00600", "A00600", "N00650", "A00650", "N00700", "A00700", "N00900", "A00900", "N01000", "A01000", "N01400", "A01400", "N01700", "A01700", "SCHF", "N02300", "A02300", "N02500", "A02500", "N26270", "A26270", "N02900", "A02900", "N03220", "A03220", "N03300", "A03300", "N03270", "A03270", "N03150", "A03150", "N03210", "A03210", "N03230", "A03230", "N03240", "A03240", "N04470", "A04470", "A00101", "N18425", "A18425", "N18450", "A18450", "N18500", "A18500", "N18300", "A18300", "N19300", "A19300", "N19700", "A19700", "N04800", "A04800", "N05800", "A05800", "N09600", "A09600", "N05780", "A05780", "N07100", "A07100", "N07300", "A07300", "N07180", "A07180", "N07230", "A07230", "N07240", "A07240", "N07220", "A07220", "N07260", "A07260", "N09400", "A09400", "N85770", "A85770", "N85775", "A85775", "N09750", "A09750", "N10600", "A10600", "N59660", "A59660", "N59720", "A59720", "N11070", "A11070", "N10960", "A10960", "N11560", "A11560", "N06500", "A06500", "N10300", "A10300", "N85530", "A85530", "N85300", "A85300", "N11901", "A11901", "N11902", "A11902" }; for (String colName : colsToDrop) { df2 = df2.drop(colName); } df2.printSchema(); df2.show(); System.out.println("Dataframe has " + df2.count() + " rows and " + df2 .columns().length + " columns.");
Now look at the results. You get something a lot more intelligible.
root |-- STATE: string (nullable = true) |-- zipcode: integer (nullable = true) |-- agi_stub: integer (nullable = true) |-- N1: double (nullable = true) |-- A02650: double (nullable = true) +-----+-------+--------+------+--------+ |STATE|zipcode|agi_stub| N1| A02650| +-----+-------+--------+------+--------+ | NC| 27514| 1|3590.0| 42542.0| | NC| 27514| 2|2030.0| 74332.0| | NC| 27514| 3|1040.0| 65651.0| | NC| 27514| 4| 800.0| 71410.0| | NC| 27514| 5|1690.0|249042.0| | NC| 27514| 6|1650.0|843353.0| +-----+-------+--------+------+--------+ Dataframe has 6 rows and 5 columns.
With this smaller dataset, you can see that what you are interested in is
records where agi_stub
is greater than 3. You want to count
them, grouped by the ZIP code, and summed by the number of returns in
column N1. In Spark code, it gives:
Dataset<Row> df3 = df2.filter(df2.col("agi_stub").$greater(3)); df3 = df3.groupBy("zipcode").sum("N1").withColumnRenamed("sum(N1)", "households"); df3.show(); } }
And you get:
+-------+----------+ |zipcode|households| +-------+----------+ | 27514| 4140.0| +-------+----------+
What do these results mean? In this ZIP code, 4,140 tax returns were filed
by people making more than $75,000. It is not a 1:1 to households,
but it gives you a good idea of the potential.
Did it
Spark?
The revenue dataset is almost 200MB, of which you will use just a fraction.
The curated census data is only 400KB. Does it make sense to onboard this
dataset in your Informix® database, which you primarily use for your
sales and warehouse transactions? Probably not. Don’t get me wrong,
Informix is perfectly capable of handling these datasets, but is it its
role?
As you move through your data scientist path, you will add more and more
datasets, experiment with them (and maybe use dedicated tools like IBM Watson Studio).
However, you probably do not want all these datasets in your production
database for each experiment.
Back to
business
After showing these first results to your sales folks, you found out, as a
team, a good index for potential revenue.
Basically, the idea is:
- Find the best sales area.
- Consider the key figures as reference: population (ref_pop),
current revenue you are doing in this area (ref_rev), and
income (ref_inc). - Adjust the potential revenues for each ZIP code, comparing the
population (pop), the revenue (rev), and the average
income (inc).
Note: pl. indices is referred to as “index” in the rest of
this article. Do not confuse this index with a database index.
For each area or ZIP code, apply:
So, this is nice, but how do you do this in Spark?
From
business to development
You can find this example in the
net.jgp.labs.informix2spark.l420
package, the application
called SalesTargetApp
.
Initialization
You can walk through the code, with first the import, then the
initialization.
package net.jgp.labs.informix2spark.l420; import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.lit; import java.math.BigDecimal; import java.sql.Connection; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.jdbc.JdbcDialect; import org.apache.spark.sql.jdbc.JdbcDialects; import net.jgp.labs.informix2spark.utils.Config; import net.jgp.labs.informix2spark.utils.ConfigManager; import net.jgp.labs.informix2spark.utils.InformixJdbcDialect; import net.jgp.labs.informix2spark.utils.K; import scala.collection.Seq; public class SalesTargetApp { SparkSession spark; public SalesTargetApp() { init(); } private void init() { this.spark = SparkSession .builder() .appName("Sales Target") .master("local") .getOrCreate(); } public static void main(String[] args) { SalesTargetApp app = new SalesTargetApp(); app.start(); }
As with all the previous examples, everything begins with the
start()
method.
private void start() { Dataset<Row> householdDf = getHouseholdDataframe(); Dataset<Row> populationDf = getPopulationDataframe(); Dataset<Row> indexDf = joinHouseholdPopulation(householdDf, populationDf); Dataset<Row> salesDf = getSalesData();
You are building four dataframes by calling methods. Let’s dig into them.
Household
data
This method is very similar to the first experiment in this tutorial with
the IRS data. Start by reading the CSV files.
private Dataset<Row> getHouseholdDataframe() { String filename = "data/14zpallagi*.csv"; Dataset<Row> df = spark.read().format("csv") .option("inferSchema", "true") .option("header", "true").load(filename);
Like in SQL, select the columns that interest you. This is an alternate
method to dropping all the columns you do not want.
df = df.select( df.col("zipcode"), df.col("agi_stub"), df.col("N1"), df.col("A02650"),
In the last column, you want the total revenue on all tax returns for each
group. Remember, the IRS splits the data into six groups.
df.col("N1").multiply(df.col("A02650")));
This operation creates a column at the end of the dataframe, named
(N1 *
. That is not a descriptive
A02650)
column name, so you decide to rename it income
.
df.columns()
returns the list of column names,
df.columns()[df.columns().length - 1]
provides the name of
the last column.
df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "income");
Because you are not interested in analyzing each AGI per ZIP code, you need
to group the different AGI category by ZIP code. While doing this, you can
also add all the incomes into a column renamed
total_income
.
df = df.groupBy("zipcode").sum("income"); df = df.withColumnRenamed(df.columns()[df.columns().length - 1], "total_income"); return df; }
If you displayed the dataframe at this point, you would see:
+-------+------------+ |zipcode|total_income| +-------+------------+ | 35071| 4.7763307E8| | 36525| 6306850.0| | 36538| 201690.0| | 85253|9.45181039E9| | 85321| 1.290294E7| +-------+------------+ only showing top 5 rows
Population
Loading the population dataframe is straightforward with all the experience
you gained so far.
private Dataset<Row> getPopulationDataframe() { String filename = "data/2010+Census+Population+By+Zipcode+(ZCTA).csv"; Dataset<Row> df = spark.read().format("csv") .option("inferSchema", "true") .option("header", "true") .load(filename);
There is another way to rename columns, but you need to have the column
names to do this (and you do have the column names).
df = df.withColumnRenamed("Zip Code ZCTA", "zipcode"); df = df.withColumnRenamed("2010 Census Population", "pop"); return df; }
And your dataframe looks like:
+-------+-----+ |zipcode| pop| +-------+-----+ | 1001|16769| | 1002|29049| | 1003|10372| | 1005| 5079| | 1007|14649| +-------+-----+ only showing top 5 rows
Joining household income and population
As you learned in part 3, you are joining the two datasets on the ZIP Code, in an
outer way – it means you will potentially have some null value.
private Dataset<Row> joinHouseholdPopulation( Dataset<Row> householdDf, Dataset<Row> populationDf) { Dataset<Row> df = householdDf .join( populationDf, householdDf.col("zipcode").equalTo(populationDf.col("zipcode")), "outer") .drop(populationDf.col("zipcode"))
Now, create a new column named income_per_inh
. This column
contains the result of the division of the total income by the population.
This gives you an estimate of the income per inhabitant.
.withColumn( "income_per_inh", householdDf.col("total_income").divide(populationDf.col("pop"))); return df; }
withColumn()
allows you to create a new column in your
dataframe, using one or multiple columns as well as functions.
withColumn()
might become your favorite method as you do more
and more transformations. It is easier to use than trying to find the last
column to rename.
Here’s how this new dataframe looks now:
+-------+------------+-----+------------------+ |zipcode|total_income| pop| income_per_inh| +-------+------------+-----+------------------+ | 1088| 1144910.0| 670|1708.8208955223881| | 1238| 7.228838E7| 6047|11954.420373739043| | 1342| 4992920.0| 1492| 3346.461126005362| | 2122|1.09356174E9|23479| 46576.16338004174| | 2142| 1.0935586E8| 3141| 34815.61922954473| +-------+------------+-----+------------------+ only showing top 5 rows
Sales data
For the sales data, reuse the code you wrote for Part 3 of this tutorial series. You just need to put it in a
method.
private Dataset<Row> getSalesData() { … return salesDf; }
Remember, all the code is available on GitHub.
Your sales dataframe now contains the ZIP code and the revenues you are
making.
+-------+-------+ |zipcode|revenue| +-------+-------+ | 94062|1390.00| | 94040| 562.00| | 94022| 448.00| | 19898|1131.00| | 74006|1614.00| +-------+-------+ only showing top 5 rows
Combining all the datasets, forecasting sales
You now have all the datasets and are ready to build the indices you
defined with the sales team.
Dataset<Row> salesIndexDf = salesDf .join(indexDf, salesDf.col("zipcode").equalTo(indexDf.col("zipcode")), "left") .drop(indexDf.col("zipcode"));
Look for the revenue per inhabitant.
salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue") .divide(salesIndexDf.col("pop")));
Now sort to identify the best sales area.
salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc());
Extract the “best row.” The best row contains all the values to use as a
reference from the sales team’s best sector.
Row bestRow = salesIndexDf.first(); double bestRevenuePerInhabitant = ((BigDecimal) bestRow.getAs("revenue_by_inh")) .doubleValue(); int populationOfBestRevenuePerInhabitant = bestRow.getAs("pop"); double incomeOfBestRevenuePerInhabitant = bestRow.getAs("income_per_inh");
Next, create a column in your dataframe. You know you can
use the withColumn()
method. But what if you want to add a
column with a specific value? You can do this by taking a numeric column,
dividing it by its value (so you have 1), and then multiplying it by the
value.
salesIndexDf = salesIndexDf.withColumn( "best_revenue_per_inh", salesIndexDf.col("pop").divide(salesIndexDf.col("pop")) .multiply(bestRevenuePerInhabitant));
Or you can use the lit()
static function:
salesIndexDf = salesIndexDf.withColumn( "pop_of_best", lit(populationOfBestRevenuePerInhabitant)); salesIndexDf = salesIndexDf.withColumn( "income_of_best", lit(incomeOfBestRevenuePerInhabitant));
Now you are ready to create the three indices.
salesIndexDf = salesIndexDf.withColumn( "idx_revenue", salesIndexDf.col("best_revenue_per_inh") .divide(salesIndexDf.col("revenue_by_inh"))); salesIndexDf = salesIndexDf.withColumn( "idx_pop", salesIndexDf.col("pop").divide(salesIndexDf.col("pop_of_best"))); salesIndexDf = salesIndexDf.withColumn( "idx_income", salesIndexDf.col("income_per_inh").divide(salesIndexDf.col("income_of_best")));
It is now time to create the final index for each area, which is the
multiplication of each index.
salesIndexDf = salesIndexDf.withColumn( "index", salesIndexDf.col("idx_revenue").multiply(salesIndexDf.col("idx_pop") .multiply(salesIndexDf.col("idx_income"))));
And apply the index to the existing revenue.
salesIndexDf = salesIndexDf.withColumn( "potential_revenue", salesIndexDf.col("revenue").multiply(salesIndexDf.col("index")));
You can drop a few columns to enhance the output. You could have dropped
more, but you also need to have your report look scientific enough.
Finally, make sure you order by the potential revenue, in a descending
order.
salesIndexDf = salesIndexDf .drop("idx_income") .drop("idx_pop") .drop("idx_revenue") .drop("income_of_best") .drop("pop_of_best") .drop("best_revenue_per_inh") .orderBy(salesIndexDf.col("potential_revenue").desc());
You can look at the resulting data.
salesIndexDf.show(); }
+-------+-------+-----+------------------+------------------+------------------+ |zipcode|revenue| pop| income_per_inh| index| potential_revenue| +-------+-------+-----+------------------+------------------+------------------+ | 94025| 84.00|40526| 840368.1256477323|1610.5247342457083|135284.07767663948| | 08540|1499.97|47115|469565.43117903004| 68.11481294046366|102170.17596630729| | 94086|1200.00|45697| 244836.9227739239| 41.76194133319635| 50114.32959983562| | 94062|1390.00|25876| 738260.2450146854| 34.85768977158333| 48452.18878250083| | 80219| 232.00|61296|104358.72308144088| 165.6588621914614|38432.856028419046| | 94022| 448.00|18500|1081220.6994594594| 80.96352858991159|36271.660808280394| | 94040| 562.00|32996|257082.76791126197| 48.8167513518355| 27435.01425973155| | 32256| 438.00|38483|142462.90881688017| 47.21447575076384|20679.940378834563| | 85016| 584.00|33896| 94057.98914326174| 18.13799917867292|10592.591520344986| | 94063|5592.00|30949| 59561.31635917154| 1.0| 5592.0| | 94085| 450.00|21247| 95544.70842942533| 9.395047814603645| 4227.77151657164| | 74006|1614.00|25750| 63162.90718446602|2.5434469314609442| 4105.123347377964| | 08002| 654.00|22274|59319.770584538026| 4.410905752434176| 2884.732362091951| | 60406| 824.00|25460| 36702.44422623723|2.8300506455361445| 2331.961731921783| | 94026|1451.80| null| null| null| null| | 19898|1131.00| null| null| null| null| +-------+-------+-----+------------------+------------------+------------------+
Results
and verification
Your reference was the sales in ZIP code 94063, where you sold for $5,592.
This area has an index of 1.
Based on the index you built, you can see that the best potential is in ZIP
code 94025, which is Menlo Park, Calif. (funny, that’s the birthplace of
Informix). The data shows that the revenue per household is among the
highest in the nation here. Therefore, it makes sense that a higher sales
potential is in ZIP code 94025.
The area with the least potential is 60406, Blue
Island, Ill., a less populated district, south of Chicago.
You can look at the top five ZIP codes with the most potential: Menlo Park,
Calif., Princeton, N.J., Sunnyvale, Calif., Redwood City, Calif., and
Denver, Colo. Your
sales manager will probably draw the conclusion that it makes sense to
increase its sales effort in those areas.
What you
learned
In Part 4 of this tutorial series, you learned:
- How to perform advanced analytics. Even though your enterprise might
not have all the data internally, you can download external datasets
from the administration or Open Data portals. - Spark performs equally as well on data coming from RDBMS or external
files. - With all APIs or languages, there are different ways to obtain the
same thing. It’s best to work toward consistency and clarity above
all; this will help your code maintenance. - How to verify your results. Your findings are in line with the gut
feeling of California being where people have more money to spend on
sports equipment.
Go further
Downloadable resources
Credit: IBM