08 Marginal and conditional distributions#

%%html
<iframe width="700" height="400" src="https://www.youtube.com/embed/Iw9fEYIpPMA/" frameborder="0" allowfullscreen></iframe>
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
import findspark

findspark.init()
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("statistics").master("local").getOrCreate()
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/runner/work/statistics/spark-3.1.3-bin-hadoop3.2/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/07/21 02:32:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

khanacademy

Marginal and conditional distributions fig 1

dataset = {
    "#": ["80-100", "60-79", "40-59", "20-39", "0-19"],
    "0-20": [0, 0, 2, 10, 2],
    "21-40": [4, 20, 4, 2, 0],
    "41-60": [16, 30, 32, 8, 0],
    ">60": [20, 10, 32, 0, 8],
}
df = pd.DataFrame(dataset).set_index("#")
df = df.append(df.sum().rename("Total"))
df
/tmp/ipykernel_4481/167602499.py:2: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(df.sum().rename("Total"))
0-20 21-40 41-60 >60
#
80-100 0 4 16 20
60-79 0 20 30 10
40-59 2 4 32 32
20-39 10 2 8 0
0-19 2 0 0 8
Total 14 30 86 70
sdf = spark.createDataFrame(zip(*dataset.values()), schema=list(dataset.keys()))
sdf.registerTempTable("sdf_table")
sdf.show()
[Stage 0:>                                                          (0 + 1) / 1]
+------+----+-----+-----+---+
|     #|0-20|21-40|41-60|>60|
+------+----+-----+-----+---+
|80-100|   0|    4|   16| 20|
| 60-79|   0|   20|   30| 10|
| 40-59|   2|    4|   32| 32|
| 20-39|  10|    2|    8|  0|
|  0-19|   2|    0|    0|  8|
+------+----+-----+-----+---+
                                                                                
df = df.append(
    pd.DataFrame(
        [df.iloc[-1] / df.iloc[-1].sum() * 100],
        index=["Percentage"],
        columns=df.columns,
    )
)
df
/tmp/ipykernel_4481/4144914823.py:1: FutureWarning: The frame.append method is deprecated and will be removed from pandas in a future version. Use pandas.concat instead.
  df = df.append(
0-20 21-40 41-60 >60
80-100 0.0 4.0 16.0 20.0
60-79 0.0 20.0 30.0 10.0
40-59 2.0 4.0 32.0 32.0
20-39 10.0 2.0 8.0 0.0
0-19 2.0 0.0 0.0 8.0
Total 14.0 30.0 86.0 70.0
Percentage 7.0 15.0 43.0 35.0
df["Total"] = df.sum(axis=1)
df["Percentage"] = df["Total"] / df["Total"].sum() * 100
df.loc["Total"]["Percentage"] = np.nan
df.loc["Percentage"]["Percentage"] = np.nan
df
0-20 21-40 41-60 >60 Total Percentage
80-100 0.0 4.0 16.0 20.0 40.0 8.0
60-79 0.0 20.0 30.0 10.0 60.0 12.0
40-59 2.0 4.0 32.0 32.0 70.0 14.0
20-39 10.0 2.0 8.0 0.0 20.0 4.0
0-19 2.0 0.0 0.0 8.0 10.0 2.0
Total 14.0 30.0 86.0 70.0 200.0 NaN
Percentage 7.0 15.0 43.0 35.0 100.0 NaN
sdf = sdf.withColumn(
    "Total", F.col("0-20") + F.col("21-40") + F.col("41-60") + F.col(">60")
)
sdf = sdf.withColumn(
    "Percentage", F.col("Total") / sdf.select(F.sum("Total")).collect()[0][0] * 100
)
sdf.show()
+------+----+-----+-----+---+-----+----------+
|     #|0-20|21-40|41-60|>60|Total|Percentage|
+------+----+-----+-----+---+-----+----------+
|80-100|   0|    4|   16| 20|   40|      20.0|
| 60-79|   0|   20|   30| 10|   60|      30.0|
| 40-59|   2|    4|   32| 32|   70|      35.0|
| 20-39|  10|    2|    8|  0|   20|      10.0|
|  0-19|   2|    0|    0|  8|   10|       5.0|
+------+----+-----+-----+---+-----+----------+
marginal_dist_v_df = df.iloc[-2:]
marginal_dist_v_df
0-20 21-40 41-60 >60 Total Percentage
Total 14.0 30.0 86.0 70.0 200.0 NaN
Percentage 7.0 15.0 43.0 35.0 100.0 NaN
marginal_dist_h_df = df.loc[:, "Total":]
marginal_dist_h_df
Total Percentage
80-100 40.0 8.0
60-79 60.0 12.0
40-59 70.0 14.0
20-39 20.0 4.0
0-19 10.0 2.0
Total 200.0 NaN
Percentage 100.0 NaN
sdf.select("#", "Total", "Percentage").show()
+------+-----+----------+
|     #|Total|Percentage|
+------+-----+----------+
|80-100|   40|      20.0|
| 60-79|   60|      30.0|
| 40-59|   70|      35.0|
| 20-39|   20|      10.0|
|  0-19|   10|       5.0|
+------+-----+----------+
conditional_dist_df = pd.DataFrame(df.loc[:, "41-60"])
conditional_dist_df = conditional_dist_df.iloc[:-2]
conditional_dist_df["Percentage"] = (
    conditional_dist_df["41-60"] / conditional_dist_df["41-60"].sum() * 100
)
conditional_dist_df
41-60 Percentage
80-100 16.0 18.604651
60-79 30.0 34.883721
40-59 32.0 37.209302
20-39 8.0 9.302326
0-19 0.0 0.000000
sdf.select("#", "41-60").withColumn(
    "Percentage", F.col("41-60") / sdf.select(F.sum("41-60")).collect()[0][0] * 100
).show()
+------+-----+------------------+
|     #|41-60|        Percentage|
+------+-----+------------------+
|80-100|   16|  18.6046511627907|
| 60-79|   30|34.883720930232556|
| 40-59|   32|  37.2093023255814|
| 20-39|    8|  9.30232558139535|
|  0-19|    0|               0.0|
+------+-----+------------------+