Telefoni:

26404420 (Latvija)

26340419 (Natalie)
26153648 (Grigory)

e-pasts: dngru@yahoo.com

Informējam, ka savu darbību Latvijā esam pārtraukuši.

Tālrunis Maskavā: +7(909)6883137


PySpark

 Yarn

Python SOLUTION code (pySpark – Yarn wordcount.py):

 

import sys

from operator import add

 

from pyspark import SparkContext

 

 

if __name__ == "__main__":

    if len(sys.argv) != 2:

        print >> sys.stderr, "Usage: wordcount.py <file>"

        exit(-1)

    sc = SparkContext(appName="PythonWordCount")

    lines = sc.textFile(sys.argv[1], 1)

    counts = lines.flatMap(lambda x: x.split(' '))

                  .map(lambda x: (x, 1))

                  .reduceByKey(add)

    output = counts.collect()

    for (word, count) in output:

        print "%s: %i" % (word, count)

 

    sc.stop()

 

 

Spark Shell

Python SOLUTION code (pySpark – SPARK-SHELL):

devDF = spark.read.json("/loudacre/devices.json")

 

devDF.printSchema()

 

devDF.show(5)

 

rows = devDF.take(5)

for row in rows: print row

 

devDF.count()

 

makeModelDF = devDF.select("make","model")

makeModelDF.printSchema()

 

makeModelDF.show()

 

devDF.select("devnum","make","model").here("make = 'Ronin'").show()

 

 

 

 

 

Scala SOLUTION code (scalaSpark – SPARK-SHELL):

val devDF = spark.read.json("/loudacre/devices.json")

 

devDF.printSchema

 

devDF.show(5)

 

val rows = devDF.take(5)

rows.foreach(println)

 

devDF.count()

 

val makeModelDF = devDF.select("make","model")

makeModelDF.printSchema

 

makeModelDF.show

 

devDF.select("devnum","make","model").where("make = 'Ronin'").show

 

 

DataFrames and Schemas

Python SOLUTION code (pySpark - DataFrames):

accountsDF = spark.read.table("accounts")

 

accountsDF.printSchema()

 

accountsDF.where("zipcode = '94913'").write.option("header","true").csv("/loudacre/accounts_zip94913")

 

test1DF = spark.read.option("header","true").csv("/loudacre/accounts_zip94913")

test2DF = spark.read.option("header","true").option("inferSchema","true").csv("/loudacre/accounts_zip94913")

 

test1DF.printSchema()

test2DF.printSchema()

 

# upload the data file

# hdfs dfs -put $DEVDATA/devices.json /loudacre/

 

# create a DataFrame based on the devices.json file

devDF = spark.read.json("/loudacre/devices.json")

devDF.printSchema()

 

from pyspark.sql.types import *

 

devColumns = [

   StructField("devnum",LongType()),

   StructField("make",StringType()),

   StructField("model",StringType()),

   StructField("release_dt",TimestampType()),

   StructField("dev_type",StringType())]

 

devSchema = StructType(devColumns)

 

devDF = spark.read.schema(devSchema).json("/loudacre/devices.json")

devDF.printSchema()

devDF.show()

 

devDF.write.parquet("/loudacre/devices_parquet") 

 

# $ parquet-tools schema hdfs://master-1/loudacre/devices_parquet

 

spark.read.parquet("/loudacre/devices_parquet").printSchema() 

 

 

Scala SOLUTION code (scalaSpark - DataFrames):

 

val accountsDF = spark.read.table("accounts")

 

accountsDF.printSchema

 

accountsDF.where("zipcode = '94913'").write.option("header","true").csv("/loudacre/accounts_zip94913")

 

val test1DF = spark.read.option("header","true").csv("/loudacre/accounts_zip94913")

val test2DF = spark.read.option("header","true").option("inferSchema","true").csv("/loudacre/accounts_zip94913")

 

test1DF.printSchema

test2DF.printSchema

 

 

// upload the data file

// hdfs dfs -put $DEVDATA/devices.json /loudacre/

 

// create a DataFrame based on the devices.json file

val devDF = spark.read.json("/loudacre/devices.json")

devDF.printSchema

 

import org.apache.spark.sql.types._

 

val devColumns = List(

  StructField("devnum",LongType),

  StructField("make",StringType),

  StructField("model",StringType),

  StructField("release_dt",TimestampType),

  StructField("dev_type",StringType))

 

val devSchema = StructType(devColumns)

 

val devDF = spark.read.schema(devSchema).json("/loudacre/devices.json")

devDF.printSchema

defDF.show

 

devDF.write.parquet("/loudacre/devices_parquet") 

 

// $ parquet-tools schema hdfs://master-1/loudacre/devices_parquet/

 

spark.read.parquet("/loudacre/devices_parquet").printSchema()

 

 

Analyzing Data with DataFrame Queries

 

Python SOLUTION code (pySpark - Analyze):

# Create a DataFrame based on the Hive accounts table

accountsDF = spark.read.table("accounts")

 

# Perform a simple query using both syntaxes for column reference

accountsDF.select(accountsDF["first_name"]).show()

accountsDF.select(accountsDF.first_name).show()

 

# Create a column reference referring to the first_name column in the accounts table

fnCol = accountsDF["first_name"]

 

# Create and use a column expression to select users named Lucy in the first_name column

lucyCol = (fnCol == "Lucy")

accountsDF.select(accountsDF.first_name,accountsDF.last_name,lucyCol).show()

accountsDF.where(lucyCol).show(5)

accountsDF.where(fnCol == "Lucy").show(5)

accountsDF.select("city", "state", accountsDF.phone_number.substr(1,3)).show(5)

accountsDF.select("city", "state", accountsDF.phone_number.substr(1,3).alias("area_code")).show(5)

 

accountsDF.where(accountsDF.first_name.substr(1,2) == accountsDF.last_name.substr(1,2)).select("first_name","last_name").show(5)

 

accountsDF.groupBy("last_name").count().show(5)

accountsDF.groupBy("last_name","first_name").count().show(5)

 

 

baseDF = spark.read.parquet("/loudacre/base_stations.parquet")

 

 

accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF, baseDF.zip==accountsDF.zipcode).show()

 

# ------ Count active devices ---------

 

# Load accountdevice data to HDFS in another terminal window

# $ hdfs dfs -put $DEVDATA/accountdevice/ /loudacre/

 

# Create a DataFrame from the account device data

accountDeviceDF = spark.read.option("header","true").option("inferSchema","true").csv("/loudacre/accountdevice")

 

# Create a DataFrame with only active accounts

activeAccountsDF = accountsDF.where(accountsDF.acct_close_dt.isNull())

 

# Create a DataFrame with a device model IDs for only devices used by active accounts

activeAcctDevsDF =  accountDeviceDF.join(accountDeviceDF,activeAccountsDF.acct_num == accountDeviceDF.account_id).select("device_id")

 

# Sum up the total number of each device model

sumDevicesDF = activeAcctDevsDF.groupBy("device_id").count().withColumnRenamed("count","active_num")

 

# Order by count

orderDevicesDF = sumDevicesDF.orderBy(sumDevicesDF.active_num.desc())

 

# create a DataFrame based on the devices.json file

devDF = spark.read.json("/loudacre/devices.json")

 

# Join the list of device model totals with the list of devices

# to get the make and model for each device

joinDevicesDF = orderDevicesDF.join(devDF,sumDevicesDF.device_id == devDF.devnum)

 

# Write out the data with the correct columns

# use overwrite mode so solution can be run multiple times

joinDevicesDF.select("device_id","make","model",joinDevicesDF.active_num).write.mode("overwrite").save("/loudacre/top_devices")

 

# Review exercise results

# $ parquet-tools head hdfs://master-1/loudacre/top_devices

 

 

Scala SOLUTION code (scalaSpark - Analyze):

 

// Create a DataFrame based on the Hive accounts table

val accountsDF = spark.read.table("accounts")

 

// Perform a simple query using both syntaxes for column reference

accountsDF. select(accountsDF("first_name")).show

accountsDF.select($"first_name").show

 

// Create a column reference referring to the first_name column in the accounts table

val fnCol = accountsDF("first_name")

 

// Create and use a column expression to select users named Lucy in the first_name column

val lucyCol = (fnCol === "Lucy")

accountsDF.select($"first_name",$"last_name",lucyCol).show

accountsDF.where(lucyCol).show(5)

accountsDF.where(fnCol === "Lucy").show(5)

 

accountsDF.select($"city", $"state",$"phone_number".substr(1,3)).show(5)

accountsDF.select($"city", $"state",$"phone_number".substr(1,3).alias("area_code")).show(5)

 

accountsDF.where($"first_name".substr(1,2) === $"last_name".substr(1,2)).select("first_name","last_name").show(5)

 

accountsDF.groupBy("last_name").count.show(5)

accountsDF.groupBy("last_name","first_name").count.show(5)

 

val baseDF = spark.read.parquet("/loudacre/base_stations.parquet")

 

accountsDF.select("acct_num","first_name","last_name","zipcode").join(baseDF, $"zip" === $"zipcode").show()

 

 

// ------ Count active devices ---------

 

// Load accountdevice data to HDFS in another terminal window

// $ hdfs dfs -put $DEVDATA/accountdevice/ /loudacre/

 

// Create a DataFrame from the account device data

val accountDeviceDF = spark.read.option("header","true").option("inferSchema","true").csv("/loudacre/accountdevice")

 

// Create a DataFrame with only active accounts

val activeAccountsDF = accountsDF.where(accountsDF("acct_close_dt").isNull)

 

// Create a DataFrame with a device model IDs for only devices used by active accounts

val activeAcctDevsDF =  activeAccountsDF.join(accountDeviceDF,activeAccountsDF("acct_num") === accountDeviceDF("account_id")).select("device_id")

 

// Sum up the total number of each device model

val sumDevicesDF = activeAcctDevsDF.groupBy("device_id").count().withColumnRenamed("count","active_num")

 

// Order by count

val orderDevicesDF = sumDevicesDF.orderBy($"active_num".desc)

 

// create a DataFrame based on the devices.json file

val devDF = spark.read.json("/loudacre/devices.json")

 

// Join the list of device model totals with the list of devices

// to get the make and model for each device

val joinDevicesDF = orderDevicesDF.join(devDF,orderDevicesDF("device_id") === devDF("devnum"))

 

// Write out the data with the correct columns

// use overwrite mode so solution can be run multiple times

joinDevicesDF.select("device_id","make","model","active_num").write.mode("overwrite").save("/loudacre/top_devices")

 

// Review exercise results

// $ parquet-tools head hdfs://master-1/loudacre/top_devices

 

 

Working With RDDs

Shell SOLUTION code (Shell - rdds):

 

hdfs dfs -put  $DEVDATA/frostroad.txt  /loudacre/

hdfs dfs -put  $DEVDATA/makes*.txt  /loudacre/

 

Python SOLUTION code (pySpark - RDDs):

 

# $ hdfs dfs -put $DEVDATA/frostroad.txt /loudacre/

 

# Create an RDD based on a data file

myRDD = sc.textFile("/loudacre/frostroad.txt")

 

# Count the number of elements in the RDD

myRDD.count()

 

# Return all the elements in the RDD as a list of strings

lines = myRDD.collect()

 

# Loop through and display the elements of the returned array

for line in lines: print line

 

# $ hdfs dfs -put $DEVDATA/makes*.txt /loudacre/

 

# Read, union, and de-duplicate mobile device makes

makes1RDD = sc.textFile("/loudacre/makes1.txt")

for make in makes1RDD.collect(): print make

 

makes2RDD = sc.textFile("/loudacre/makes2.txt")

for make in makes2RDD.collect(): print make

 

allMakesRDD = makes1RDD.union(makes2RDD)

for make in allMakesRDD.collect(): print make

 

distinctMakesRDD = allMakesRDD.distinct()

for make in distinctMakesRDD.collect(): print make

 

# Optional: explore other transformations

for make in makes2RDD.zip(makes1RDD).collect(): print make

for make in makes2RDD.intersection(makes1RDD).collect(): print make

for make in makes2RDD.subtract(makes1RDD).collect(): print make

 

 

Scala SOLUTION code (scalaSpark - RDDs):

// $hdfs dfs -put $DEVDATA/frostroad.txt /loudacre/

 

// create an RDD based on a data file

val myRDD = sc.textFile("/loudacre/frostroad.txt")

 

// count the number of elements in the RDD

myRDD.count

 

// return all the elements in the RDD as an array of strings

val lines = myRDD.collect

 

// Loop through and display the elements of the returned array

for(line <- lines) println(line)

 

// $ hdfs dfs -put $DEVDATA/makes*.txt /loudacre/

 

// Read, union, and de-duplicate mobile device makes

val makes1RDD = sc.textFile("/loudacre/makes1.txt")

for (make <- makes1RDD.collect()) println(make)

 

val makes2RDD = sc.textFile("/loudacre/makes2.txt")

for (make <- makes2RDD.collect()) println(make)

 

val allMakesRDD = makes1RDD.union(makes2RDD)

for (make <- allMakesRDD.collect()) println(make)

 

val distinctMakesRDD = allMakesRDD.distinct

for (make <- distinctMakesRDD.collect()) println(make)

 

// Optional: explore other transformations

makes2RDD.zip(makes1RDD).collect.foreach(println)

makes2RDD.intersection(makes1RDD).collect.foreach(println)

makes2RDD.subtract(makes1RDD).collect.foreach(println)

 

 

 

Izstrādāts sadarbībā ar DeepKey Solutions
Top.LV