【Sparkstreaming_01】
文章目录
- SparkStreaming
- 1.流处理 /实时计算
- 2.批处理/离线计算
- SparkStreaming简单介绍:
- SparkStreaming数据源:
- 总结:
- SparkStreaming运行(工作原理):
- 流式处理:
- spark程序入口总结:
- idea里创建SparkStreaming代码:
- 1.如何构建DStream
- 如何构建DStream:两种
- 总结: 什么是Receiver:
- 2.转换算子
- sparkstreaming处理数据的方式:
- 1.为什么要指定checkpoint
- 2.checkpoint 目录 生产上 得指定到 hdfs上进行存储
- 2.1 checkpoint 的作用 针对sparkstreaming来说 :
- 需求: wc案例结果写mysql里面
- 1.第一种方式:某个东西没有进行序列化
- 2.正确写法:
- 3.连接池:
- 4.sparksql的方式写出
SparkStreaming
spark 提供的实时计算的模块:SparkStreaming、structuredStreaming
1.流处理 /实时计算
实时:storm、flink (来一条数据处理一条数据 ) event 真正的实时计算
近实时:
SparkStreaming 来一批数据处理一批数据 源源不断的来 mini-batch
flink t0 级 : 实时处理用sql 方式进行开发
structuredStream : 离线去开发实时通过 df api/sql
SparkStreaming
双流join :api:
flink、ss =》 api
SparkStreaming code 很多 =》 api join state
状态+ eventtime+watermaker =》 延迟数据处理:
1.processtime + udf
2.eventtime+watermaker
数据和离线 对不上 处理数据的时候 延迟数据 丢了 没有进行处理
状态、checkpoint
2.批处理/离线计算
1.一次性处理某一个批次的数据 数据是有始有终的
流处理:
水龙头 数据是远远不断的来 数据没有始终
技术选型:
1.生产上主要占比:
SparkStreaming、structuredStreaming 10% spark
flink 90%
storm 2% 几乎不用
开发角度 :code/sql 处理 实时计算
业务角度:
1.实时指标:flink和structuredStreaming差不多
2.实时数仓:1.代码上 差不多【缺点:不好维护】
2.sql文件: flinksql 维护实时数仓好维护
SparkStreaming简单介绍:
1.Spark Streaming is an extension of the core Spark API
sss开发 与sparkcore 算子开发 差不多
2.spark Streaming 数据源:Kafka, Kinesis, or TCP sockets =》 input
3.处理: 算子的方式进行处理 =》 todo
- pushed out to filesystems, databases, and live dashboards. =》 output
SparkStreaming数据源:
kafka **** 流式引擎 + kafka cp
flume 可以使用 一般不用 没有数据缓冲的作用
hdfs 很少使用
tcp sockets =》 测试 +运营商数据(采集数据 )
总结:
建议不要使用flume 缓冲能力很弱 之后数据计算 直接把数据干到 spark里面 会导致 spark计算程序挂掉
SparkStreaming运行(工作原理):
- receives live input data streams 接收数据
2.divides the data into batches 把接收数据 拆分成batches
eg:
按照时间进行拆分
sparkstreaming =》 kafka :
1.5s钟处理一次数据
2.会5s接收的数据切分成batch
3.把batch 交给 sparkengine 处理
4.处理完结果 也是 batch
sparkstreaming编程模型:DStream
a DStream is represented as a sequence of RDDs.(一个rdd的集合)
sparkcore:rdd
sparksql :ds、df
Stream data =》 按照时间(5s一个批次) data 拆分一个一个的 batch
一个一个的rdd
DStream 就是由一串 rdd构成
流式处理:
对 DStream进行转换操作
实际上就是对 DStream里面的rdd进行操作
对rdd进行操作就是对 rdd里面分区的元素进行操作
spark程序入口总结:
sparkstreaming :StreamingContext
sparkcore: sparkcontext
sparksql: sparksession
idea里创建SparkStreaming代码:
1.引入
org.apache.spark
spark-streaming_2.12
3.2.1
2.构建程序入口
打印的东西:
1.spark处理 当前批次的数据的结果
2.不能处理 累计批次的数据
累计批次:多个批次之间又联系的
1.如何构建DStream
如何构建DStream:两种
1.外部数据源【kafka】
2.高阶算子方式转换
1.Input DStreams 【输入流 kafka】 *****
2.Receivers 【接收流 测试使用 生产上不用】: 为面试准备
并不是所有的接收数据都需要接收器
1.Receivers:底层逐级调用的类
socketTextStream:
socketStream
SocketInputDStream
master =》 local[2] => local[1] code能否处理数据:不能
sparkstreaming: 1 cpu =》 1 core
1.接收流式数据 ok
2.流式数据 切分成 batch进行处理 no ok cpu不够 数据没有资源进行处理
sparkstreaming要求:
n > 1 in local mode if you have receivers to get data
master cpu 个数 一定要大于Receiver 数量
总结: 什么是Receiver:
指的就是 ReceiverInputDStream(接收器)
2.转换算子
1.transform ***
2.updateStateByKey
sparkstreaming处理数据的方式:
1.默认仅仅是计算当前批次的数据 只是计算10s一个批次的数据
需求:
统计 从现在时间点开始 b出现的累计次数?
updateStateByKey 用于解决 有状态问题
对于 累计批次的需求? 官方引出一个概念 状态
状态:State:
1.有状态 前后批次有联系的
2.无状态 前后批次是没有联系的
累计批次的需求?
1.updateStateByKey 算子解决
1.Define the state
2.Define the state update function
注意:
3.得指定 The checkpoint directory has not been set
1.为什么要指定checkpoint
1.维护 当前批次和以前的累计批次的数据state
2.checkpoint 目录 生产上 得指定到 hdfs上进行存储
存在问题:
1.checkpoint 每个批次都会产生 文件 =》 hdfs 扛不住 挂掉的风险
2.1 checkpoint 的作用 针对sparkstreaming来说 :
1.作用:
1.为了容错
2.恢复作业【实时计算作业 挂掉之后 可以恢复起来】
2.checkpoint存储的东西:
1.Metadata 元数据
Configuration 作业里面配置信息
DStream operations 作业code里面的算子操作
Incomplete batches 未完成的批次
2.Data
每个批次里面真正传过来的数据 +stateful(状态)
3.使用场景
1.Usage of stateful transformations
2.Recovering from failures of the driver running the application(恢复作业)
4.如何正确使用checkpint?
如果你想要 恢复application 需要 正确编写 checkpoint设置代码
注意:
checkpoint缺点:
1.小文件多
2.修改代码程序就用不了【修改业务逻辑代码】
checkpoint 用不了生产上 =》 累计批次指标统计问题 updateStateByKey这个算子 也用不了!!!
那么如何实现 累计批次统计需求?
一: 100%来处理
1.把每个批次数据 写到外部存储
2.然后利用外部存储系统再统计即可
二:90%都没有解决
checkpoint 【解决 checkpoint 导致修改代码 报错问题+小文件问题解决】
三:面试:
3.输出算子:
1.print
2.foreachRDD =》 db
需求: wc案例结果写mysql里面
1.mysql创建一个表
create table wc(
word varchar(10),
cnt int(10)
);
Serialization stack:
- object not serializable (class: java.lang.Object, value: java.lang.Object@4c03b7b3)
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {UTF-8=java.lang.Object@4c03b7b3})
- field (class: com.mysql.jdbc.ConnectionImpl, name: charsetConverterMap, type: interface java.util.Map)
- object (class com.mysql.jdbc.JDBC4Connection, com.mysql.jdbc.JDBC4Connection@2d0220cb)
- element of array (index: 0)
1.第一种方式:某个东西没有进行序列化
1.MySQL连接驱动没有进行序列化 【做不了】
2.ClosureCleaner:Closure 闭包的意思
闭包:方法内使用了方法外的变量
2.正确写法:
rdd.foreachPartition{
mysql 连接次数 会减少 rdd有多少个分区 就有多少个连接
}
3.连接池:
如果 rdd.foreachPartition 写数据 存储性能问题: 【一般不用,可以使用!!!】
1.可以使用连接池
2.rdd.coalse =》 减少rdd分区数