01 Identifying individuals, variables and categorical variables in a data set#

<iframe width="700" height="400" src="https://www.youtube.com/embed/EqeVXI4WNHM/" frameborder="0" allowfullscreen></iframe>
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import seaborn as sns
from sklearn import preprocessing
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.3,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.3 pyspark-shell'

import findspark

from pyspark.context import SparkContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName("statistics").master("local").getOrCreate()
import json
import uuid
from confluent_kafka import Producer
from ksql import KSQLAPI
client = KSQLAPI(url='http://localhost:8088')
Identifying individuals, variables and categorical variables in a data set fig 1Identifying individuals, variables and categorical variables in a data set fig 2



dataset = {
    "Drink": [
        "Brewed coffee",
        "Caffe latte",
        "Caffe mocha",
        "Iced brewed coffee",
        "Chai latte",
    "Type": ["Hot", "Hot", "Hot", "Hot", "Cold", "Hot"],
    "Calories": [4, 100, 170, 60, 60, 120],
    "Sugars (g)": [0, 14, 27, 8, 15, 25],
    "Caffein (mg)": [260, 75, 95, 75, 120, 60],

Pandas Dataframe#

df = pd.DataFrame(dataset).set_index("Drink")
Type Calories Sugars (g) Caffein (mg)
Brewed coffee Hot 4 0 260
Caffe latte Hot 100 14 75
Caffe mocha Hot 170 27 95
Cappuccino Hot 60 8 75
Iced brewed coffee Cold 60 15 120
Chai latte Hot 120 25 60

Spark Dataframe#

sdf = spark.createDataFrame(zip(*dataset.values()), schema=list(dataset.keys()))
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|
|     Brewed coffee| Hot|       4|         0|         260|
|       Caffe latte| Hot|     100|        14|          75|
|       Caffe mocha| Hot|     170|        27|          95|
|        Cappuccino| Hot|      60|         8|          75|
|Iced brewed coffee|Cold|      60|        15|         120|
|        Chai latte| Hot|     120|        25|          60|

Kafka producer#

p = Producer({'bootstrap.servers': bootstrap_servers})
for i in zip(*dataset.values()):
    x = dict(zip(dataset.keys(), i))
    record_key = str(uuid.uuid4())
    record_value = json.dumps(x)

    p.produce(topic, value=record_value)

KSQL stream#

                     columns_type=['`Drink` string','`Type` string', '`Calories` bigint','`Sugars (g)` bigint', '`Caffein (mg)` bigint'],
res = client.query(f"select * from {topic}")
while True:
    except RuntimeError:
[{"header":{"queryId":"transient_STATS0101E908401_8697743092071727740","schema":"`Drink` STRING, `Type` STRING, `Calories` BIGINT, `Sugars (g)` BIGINT, `Caffein (mg)` BIGINT"}},

{"row":{"columns":["Brewed coffee","Hot",4,0,260]}},

{"row":{"columns":["Caffe latte","Hot",100,14,75]}},

{"row":{"columns":["Caffe mocha","Hot",170,27,95]}},


{"row":{"columns":["Iced brewed coffee","Cold",60,15,120]}},

{"row":{"columns":["Chai latte","Hot",120,25,60]}},


Spark stream#

read stream#

df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', bootstrap_servers) \
  .option("startingOffsets", "earliest") \
  .option('subscribe', topic) \
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

write stream#

df_json = df_raw.selectExpr('CAST(value AS STRING) as json')
df_json.select(F.from_json(df_json.json, sdf.schema).alias('stream_data')) \
  .writeStream \
  .queryName(topic) \
  .trigger(once=True) \
  .format("memory") \
  .start() \
spark.sql(f"select * from {topic}").show(truncate=False)
|stream_data                            |
|{Brewed coffee, Hot, 4, 0, 260}        |
|{Caffe latte, Hot, 100, 14, 75}        |
|{Caffe mocha, Hot, 170, 27, 95}        |
|{Cappuccino, Hot, 60, 8, 75}           |
|{Iced brewed coffee, Cold, 60, 15, 120}|
|{Chai latte, Hot, 120, 25, 60}         |

Find and Replace#


df_fr = df.copy()
df_fr.replace({"Type": {"Hot": 1, "Cold": 0}}, inplace=True)
Type Calories Sugars (g) Caffein (mg)
Brewed coffee 1 4 0 260
Caffe latte 1 100 14 75
Caffe mocha 1 170 27 95
Cappuccino 1 60 8 75
Iced brewed coffee 0 60 15 120
Chai latte 1 120 25 60


sdf_fr = sdf.withColumn(
    "Type", F.when(F.col("Type") == "Hot", 1).when(F.col("Type") == "Cold", 0)
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|
|     Brewed coffee|   1|       4|         0|         260|
|       Caffe latte|   1|     100|        14|          75|
|       Caffe mocha|   1|     170|        27|          95|
|        Cappuccino|   1|      60|         8|          75|
|Iced brewed coffee|   0|      60|        15|         120|
|        Chai latte|   1|     120|        25|          60|

Label Encoding#


df_lc = df.copy()
df_lc["Type_Hot"] = df_lc["Type"].astype("category").cat.codes
Type Calories Sugars (g) Caffein (mg) Type_Hot
Brewed coffee Hot 4 0 260 1
Caffe latte Hot 100 14 75 1
Caffe mocha Hot 170 27 95 1
Cappuccino Hot 60 8 75 1
Iced brewed coffee Cold 60 15 120 0
Chai latte Hot 120 25 60 1


df_lc = df.copy()
df_lc["Type_Hot"] = preprocessing.LabelEncoder().fit_transform(df["Type"])
Type Calories Sugars (g) Caffein (mg) Type_Hot
Brewed coffee Hot 4 0 260 1
Caffe latte Hot 100 14 75 1
Caffe mocha Hot 170 27 95 1
Cappuccino Hot 60 8 75 1
Iced brewed coffee Cold 60 15 120 0
Chai latte Hot 120 25 60 1


sdf_lc = StringIndexer(inputCol="Type", outputCol="Type_Cold").fit(sdf).transform(sdf)
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|Type_Cold|
|     Brewed coffee| Hot|       4|         0|         260|      0.0|
|       Caffe latte| Hot|     100|        14|          75|      0.0|
|       Caffe mocha| Hot|     170|        27|          95|      0.0|
|        Cappuccino| Hot|      60|         8|          75|      0.0|
|Iced brewed coffee|Cold|      60|        15|         120|      1.0|
|        Chai latte| Hot|     120|        25|          60|      0.0|

One-Hot Encoding#


df_ohc = pd.get_dummies(df, columns=["Type"], prefix="Type")
Calories Sugars (g) Caffein (mg) Type_Cold Type_Hot
Brewed coffee 4 0 260 0 1
Caffe latte 100 14 75 0 1
Caffe mocha 170 27 95 0 1
Cappuccino 60 8 75 0 1
Iced brewed coffee 60 15 120 1 0
Chai latte 120 25 60 0 1


x = (
    .fit_transform(df["Type"].values.reshape(-1, 1))
df_ohc = pd.concat(
        pd.DataFrame(x, columns=["Type_Cold", "Type_Hot"]),
Calories Sugars (g) Caffein (mg) Type_Cold Type_Hot
Brewed coffee 4 0 260 0.0 1.0
Caffe latte 100 14 75 0.0 1.0
Caffe mocha 170 27 95 0.0 1.0
Cappuccino 60 8 75 0.0 1.0
Iced brewed coffee 60 15 120 1.0 0.0
Chai latte 120 25 60 0.0 1.0


sdf_ohc = (
    OneHotEncoder(inputCol="Type_Cold", outputCol="Type_Vec")
|             Drink|Type|Calories|Sugars (g)|Caffein (mg)|Type_Cold|     Type_Vec|
|     Brewed coffee| Hot|       4|         0|         260|      0.0|(1,[0],[1.0])|
|       Caffe latte| Hot|     100|        14|          75|      0.0|(1,[0],[1.0])|
|       Caffe mocha| Hot|     170|        27|          95|      0.0|(1,[0],[1.0])|
|        Cappuccino| Hot|      60|         8|          75|      0.0|(1,[0],[1.0])|
|Iced brewed coffee|Cold|      60|        15|         120|      1.0|    (1,[],[])|
|        Chai latte| Hot|     120|        25|          60|      0.0|(1,[0],[1.0])|