下你所需,载你所想!
IT技术源码资料下载网站

Apache Spark结构化Streaming流编程指南

:其他软件 2020-09-08 07:59:55

Apache Spark结构化Streaming流编程指南

Scala语言
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()

import spark.implicits._
接下来,让我们创建一个流数据框架,该数据框架表示从在localhost:9999上侦听的服务器接收的文本数据,并对数据框架进行转换以计算字数。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
此linesDataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此它目前未接收任何数据。接下来,我们使用将该DataFrame转换为String的Dataset .as[String],以便我们可以应用该flatMap操作将每一行拆分为多个单词。结果words数据集包含所有单词。最后,我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。
现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算start()。
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
执行此代码后,流计算将在后台开始。该query对象是该活动流查询的句柄,我们已决定等待查询终止,awaitTermination()以防止查询处于活动状态时退出该过程。
要实际执行此示例代码,可以在自己的Spark应用程序中编译代码,也可以在 下载Spark之后直接 运行示例。我们正在展示后者。您首先需要通过使用以下命令将Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行
$ nc -lk 9999
然后,在另一个终端中,您可以通过使用
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
然后,将对运行netcat服务器的终端中键入的任何行进行计数并每秒打印一次。它将类似于以下内容。
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
...

# TERMINAL 2: RUNNING StructuredNetworkWordCount
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
Java语言
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.Iterator;
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
接下来,让我们创建一个流数据框架,该数据框架表示从在localhost:9999上侦听的服务器接收的文本数据,并对数据框架进行转换以计算字数。
// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// Split the lines into words
Dataset words = lines
.as(Encoders.STRING())
.flatMap((FlatMapFunction) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset wordCounts = words.groupBy("value").count();
此linesDataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此它目前未接收任何数据。接下来,我们使用将该DataFrame转换为String的Dataset .as(Encoders.STRING()),以便我们可以应用该flatMap操作将每一行拆分为多个单词。结果words数据集包含所有单词。最后,我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。
现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算start()。
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
执行此代码后,流计算将在后台开始。该query对象是该活动流查询的句柄,我们已决定等待查询终止,awaitTermination()以防止查询处于活动状态时退出该过程。
要实际执行此示例代码,可以在自己的Spark应用程序中编译代码,也可以在 下载Spark之后直接 运行示例。我们正在展示后者。您首先需要通过使用以下命令将Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行
$ nc -lk 9999
然后,在另一个终端中,您可以通过使用
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
然后,将对运行netcat服务器的终端中键入的任何行进行计数并每秒打印一次。它将类似于以下内容。
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
...

TERMINAL 2: RUNNING JavaStructuredNetworkWordCount
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
Python语言
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
接下来,让我们创建一个流数据框架,该数据框架表示从在localhost:9999上侦听的服务器接收的文本数据,并对数据框架进行转换以计算字数。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
此linesDataFrame表示一个包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此它目前未接收任何数据。接下来,我们使用了两个内置的SQL函数-split和explode,将每行拆分为多行,每行各有一个单词。另外,我们使用该函数alias将新列命名为“ word”。最后,我们wordCounts通过对数据集中的唯一值进行分组并对其进行计数来定义DataFrame。请注意,这是一个流数据帧,它表示流的运行字数。
现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算start()。
# Start running the query that prints the running counts to the console
query = wordCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
执行此代码后,流计算将在后台开始。该query对象是该活动流查询的句柄,我们已决定等待查询终止,awaitTermination()以防止查询处于活动状态时退出该过程。
要实际执行此示例代码,可以在自己的Spark应用程序中编译代码,也可以在 下载Spark之后直接 运行示例。我们正在展示后者。您首先需要通过使用以下命令将Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行
$ nc -lk 9999
然后,在另一个终端中,您可以通过使用
./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
然后,将对运行netcat服务器的终端中键入的任何行进行计数并每秒打印一次。它将类似于以下内容。
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
...

# TERMINAL 2: RUNNING structured_network_wordcount.py
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
R语言
sparkR.session(appName = "StructuredNetworkWordCount")
接下来,让我们创建一个流数据框架,该数据框架表示从在localhost:9999上侦听的服务器接收的文本数据,并对数据框架进行转换以计算字数。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)
# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# Generate running word count
wordCounts <- count(group_by(words, "word"))
此linesSparkDataFrame表示包含流文本数据的无界表。该表包含一列名为“值”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们正在设置转换,并且尚未开始转换,因此它目前未接收任何数据。接下来,我们有一个带有两个SQL函数的SQL表达式-split和explode,将每一行拆分成多行,每行各有一个单词。另外,我们将新列命名为“ word”。最后,我们wordCounts通过按SparkDataFrame中的唯一值进行分组并对其进行计数来定义SparkDataFrame。请注意,这是一个流式SparkDataFrame,它表示流的运行字数。
现在,我们对流数据进行了查询。剩下的就是实际开始接收数据并计算计数了。为此,我们将其设置outputMode("complete")为在每次更新计数时将完整的计数集(由指定)打印到控制台。然后使用开始流计算start()。
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
执行此代码后,流计算将在后台开始。该query对象是该活动流查询的句柄,我们已决定等待查询终止,awaitTermination()以防止查询处于活动状态时退出该过程。
要实际执行此示例代码,可以在自己的Spark应用程序中编译代码,也可以在 下载Spark之后直接 运行示例。我们正在展示后者。您首先需要通过使用以下命令将Netcat(在大多数类Unix系统中找到的一个小实用程序)作为数据服务器运行
$ nc -lk 9999
然后,在另一个终端中,您可以通过使用
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
然后,将对运行netcat服务器的终端中键入的任何行进行计数并每秒打印一次。它将类似于以下内容。
# TERMINAL 1:
# Running Netcat
$ nc -lk 9999
apache spark
apache hadoop
...

# TERMINAL 2: RUNNING structured_network_wordcount.R
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 1|
| spark| 1|
+------+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache| 2|
| spark| 1|
|hadoop| 1|
+------+-----+
...
程式设计模型
结构化流传输中的关键思想是将实时数据流视为被连续添加的表。这导致了一个新的流处理模型,该模型与批处理模型非常相似。您将像在静态表上一样将流计算表示为类似于批处理的标准查询,Spark 在无界输入表上将其作为增量查询运行。让我们更详细地了解此模型。
基本概念
将输入数据流视为“输入表”。流上到达的每个数据项都像是将新行附加到输入表中。
作为表格流
对输入的查询将生成“结果表”。在每个触发间隔(例如,每1秒钟),新行将附加到输入表中,并最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部接收器。
模型
“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:
完整模式 -整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。
追加模式 -仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。
更新模式 -仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等同于追加模式。