当前位置: 首页 > news >正文

【Sparkstreaming_01】

文章目录

  • SparkStreaming
    • 1.流处理 /实时计算
    • 2.批处理/离线计算
    • SparkStreaming简单介绍:
    • SparkStreaming数据源:
      • 总结:
    • SparkStreaming运行(工作原理):
      • 流式处理:
    • spark程序入口总结:
    • idea里创建SparkStreaming代码:
    • 1.如何构建DStream
      • 如何构建DStream:两种
      • 总结: 什么是Receiver:
      • 2.转换算子
      • sparkstreaming处理数据的方式:
        • 1.为什么要指定checkpoint
        • 2.checkpoint 目录 生产上 得指定到 hdfs上进行存储
      • 2.1 checkpoint 的作用 针对sparkstreaming来说 :
    • 需求: wc案例结果写mysql里面
      • 1.第一种方式:某个东西没有进行序列化
      • 2.正确写法:
      • 3.连接池:
      • 4.sparksql的方式写出

SparkStreaming

​ spark 提供的实时计算的模块:SparkStreaming、structuredStreaming

1.流处理 /实时计算

​ 实时:storm、flink (来一条数据处理一条数据 ) event 真正的实时计算
​ 近实时:
​ SparkStreaming 来一批数据处理一批数据 源源不断的来 mini-batch

	flink 	t0 级 : 实时处理用sql 方式进行开发 
	structuredStream  : 离线去开发实时通过 df api/sql 
	SparkStreaming 
 	双流join  :api:  
			flink、ss   =》 api 
			SparkStreaming code 很多 =》 api join state 
	
	状态+ eventtime+watermaker =》 延迟数据处理: 
			1.processtime + udf
			2.eventtime+watermaker 
	数据和离线 对不上 处理数据的时候 延迟数据 丢了 没有进行处理
	状态、checkpoint 

2.批处理/离线计算

​ 1.一次性处理某一个批次的数据 数据是有始有终的

流处理:
​ 水龙头 数据是远远不断的来 数据没有始终

技术选型:
1.生产上主要占比:
SparkStreaming、structuredStreaming 10% spark
flink 90%
storm 2% 几乎不用

	开发角度 :code/sql 处理 实时计算
	业务角度:
		1.实时指标:flink和structuredStreaming差不多
		2.实时数仓:1.代码上 差不多【缺点:不好维护】
					2.sql文件: flinksql 维护实时数仓好维护

SparkStreaming简单介绍:

1.Spark Streaming is an extension of the core Spark API
sss开发 与sparkcore 算子开发 差不多
2.spark Streaming 数据源:Kafka, Kinesis, or TCP sockets =》 input
3.处理: 算子的方式进行处理 =》 todo

  1. pushed out to filesystems, databases, and live dashboards. =》 output

SparkStreaming数据源:

kafka **** 流式引擎 + kafka cp
​ flume 可以使用 一般不用 没有数据缓冲的作用
​ hdfs 很少使用
​ tcp sockets =》 测试 +运营商数据(采集数据 )

总结:

​ 建议不要使用flume 缓冲能力很弱 之后数据计算 直接把数据干到 spark里面 会导致 spark计算程序挂掉

SparkStreaming运行(工作原理):

  1. receives live input data streams 接收数据
    2.divides the data into batches 把接收数据 拆分成batches
eg: 
	按照时间进行拆分 
sparkstreaming =》 kafka : 
	1.5s钟处理一次数据  
	2.会5s接收的数据切分成batch
	3.把batch 交给 sparkengine 处理 
	4.处理完结果 也是 batch 

sparkstreaming编程模型:DStream 
a DStream is represented as a sequence of RDDs.(一个rdd的集合)
sparkcore:rdd 
sparksql :ds、df 

Stream data =》 按照时间(5s一个批次) data 拆分一个一个的 batch
一个一个的rdd

DStream 就是由一串 rdd构成

流式处理:

​ 对 DStream进行转换操作
​ 实际上就是对 DStream里面的rdd进行操作
​ 对rdd进行操作就是对 rdd里面分区的元素进行操作

spark程序入口总结:

​ sparkstreaming :StreamingContext
​ sparkcore: sparkcontext
​ sparksql: sparksession

idea里创建SparkStreaming代码:

​ 1.引入


​ org.apache.spark
​ spark-streaming_2.12
​ 3.2.1

​ 2.构建程序入口

打印的东西:
1.spark处理 当前批次的数据的结果
2.不能处理 累计批次的数据
累计批次:多个批次之间又联系的

1.如何构建DStream

如何构建DStream:两种

​ 1.外部数据源【kafka】
​ 2.高阶算子方式转换

​ 1.Input DStreams 【输入流 kafka】 *****
​ 2.Receivers 【接收流 测试使用 生产上不用】: 为面试准备
​ 并不是所有的接收数据都需要接收器

1.Receivers:底层逐级调用的类 
	socketTextStream: 
		socketStream
			SocketInputDStream

	master =》 local[2] => local[1] code能否处理数据:不能
sparkstreaming: 1  cpu =》 1 core 
	1.接收流式数据 ok
	2.流式数据 切分成 batch进行处理 no ok  cpu不够 数据没有资源进行处理
sparkstreaming要求: 
n > 1 in local mode if you have receivers to get data
master  cpu 个数 一定要大于Receiver 数量

总结: 什么是Receiver:

指的就是 ReceiverInputDStream(接收器)

2.转换算子

​ 1.transform ***
​ 2.updateStateByKey

sparkstreaming处理数据的方式:

​ 1.默认仅仅是计算当前批次的数据 只是计算10s一个批次的数据

需求:
统计 从现在时间点开始 b出现的累计次数?
updateStateByKey 用于解决 有状态问题

对于 累计批次的需求? 官方引出一个概念 状态 
状态:State: 
	1.有状态	前后批次有联系的 
	2.无状态    前后批次是没有联系的

累计批次的需求?
1.updateStateByKey 算子解决
1.Define the state
2.Define the state update function

注意:
3.得指定 The checkpoint directory has not been set

1.为什么要指定checkpoint

​ 1.维护 当前批次和以前的累计批次的数据state

2.checkpoint 目录 生产上 得指定到 hdfs上进行存储

​ 存在问题:
​ 1.checkpoint 每个批次都会产生 文件 =》 hdfs 扛不住 挂掉的风险

2.1 checkpoint 的作用 针对sparkstreaming来说 :

1.作用: 
	1.为了容错 
	2.恢复作业【实时计算作业 挂掉之后 可以恢复起来】
2.checkpoint存储的东西:
	1.Metadata 元数据 
		Configuration  作业里面配置信息
		DStream operations  作业code里面的算子操作
		Incomplete batches  未完成的批次
	2.Data
		每个批次里面真正传过来的数据 +stateful(状态)
3.使用场景
	1.Usage of stateful transformations
	2.Recovering from failures of the driver running the application(恢复作业)
4.如何正确使用checkpint? 
	如果你想要 恢复application 需要 正确编写 checkpoint设置代码 
注意: 
	checkpoint缺点:
		1.小文件多
		2.修改代码程序就用不了【修改业务逻辑代码】
checkpoint 用不了生产上 =》 累计批次指标统计问题 updateStateByKey这个算子 也用不了!!!
那么如何实现 累计批次统计需求? 
	一: 100%来处理 
		1.把每个批次数据 写到外部存储 
		2.然后利用外部存储系统再统计即可
	二:90%都没有解决 
		checkpoint 【解决 checkpoint 导致修改代码 报错问题+小文件问题解决】
	三:面试: 

3.输出算子:
1.print
2.foreachRDD =》 db

需求: wc案例结果写mysql里面

1.mysql创建一个表

create table wc(
word varchar(10),
cnt int(10)
);

Serialization stack:
- object not serializable (class: java.lang.Object, value: java.lang.Object@4c03b7b3)
- writeObject data (class: java.util.HashMap)
- object (class java.util.HashMap, {UTF-8=java.lang.Object@4c03b7b3})
- field (class: com.mysql.jdbc.ConnectionImpl, name: charsetConverterMap, type: interface java.util.Map)
- object (class com.mysql.jdbc.JDBC4Connection, com.mysql.jdbc.JDBC4Connection@2d0220cb)
- element of array (index: 0)

1.第一种方式:某个东西没有进行序列化

​ 1.MySQL连接驱动没有进行序列化 【做不了】

2.ClosureCleaner:Closure 闭包的意思 
闭包:方法内使用了方法外的变量

2.正确写法:

​ rdd.foreachPartition{
​ mysql 连接次数 会减少 rdd有多少个分区 就有多少个连接
​ }

3.连接池:

如果 rdd.foreachPartition 写数据 存储性能问题: 【一般不用,可以使用!!!】
1.可以使用连接池
2.rdd.coalse =》 减少rdd分区数

4.sparksql的方式写出

相关文章:

  • 旅游便宜的网站建设/怎么制作网站详细流程
  • 网站左悬浮代码/电话投放小网站
  • 湖南seo优化/c盘优化大师
  • 做网站毕设任务书/seo外链推广
  • 融资平台公司是什么意思/网站seo重庆
  • 做免费小说网站怎样赚钱/网络优化seo薪酬
  • Express做后端服务详细步骤,从零到一
  • 华为OD机试 - 网上商城优惠活动
  • 【C语言练习】 二进制中1的个数
  • 校招求职HR常问问题汇总
  • 响应式流的核心机制——背压机制
  • prometheus的remotewrite解析
  • 基于 TensorFlow 的深度学习图像识别模型的自动化测试(完整代码+数据)
  • Android 设置搜索功能
  • Linux关于 gdb 调试器的使用
  • 数字IC设计、验证、FPGA笔试必会 - Verilog经典习题 (八)使用generate…for语句简化代码
  • 【寒假每日一题】洛谷 P8254 [NOI Online 2022 入门组] 王国比赛
  • 【SpringBoot 学习】52、SpringBoot 使用 grpc 实现远程服务调用