第1关:车辆相关信息实时计算
任务描述
本关卡属于警务大数据中的实时计算模块,对车辆相关信息进行实时计算,实时计算出车辆在线数、车辆活跃数、致命故障数量、严重故障数量、一般故障数量、轻微故障数量、房车报警数量、旅行车报警数量、桥跑车报警数量、跑车报警数量、敞篷车报警数量等,为下一关的实时数据可视化提供数据基础。
本关任务:编写一个 SparkStructuredStreaming
程序完成车辆相关信息实时统计。
package net.educoder.spark
import org.apache.spark.sql.SparkSession
object KafkaSparkStreaming {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").appName("demo").getOrCreate()
spark.sparkContext.setLogLevel("error")
/** ******begin *********/
val straem = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", "demo")
.load()
val frame = straem.selectExpr("CAST(value AS STRING)")
import spark.implicits._
val query = frame.as[String].map(x => {
val arr = x.split("\t")
val carType = arr(1).toInt
val warning = arr(3).toInt
val fault = arr(4).toInt
val speed = arr(5).toInt
// 在线数、活跃数、致命故障数量、严重故障数量、一般故障数量、轻微故障数量、房车报警数量、旅行车报警数量、桥跑车报警数量、跑车报警数量、敞篷车报警数量
val online = 1
val activeCount = if (speed > 0) 1 else 0
val fault_4 = if (fault == 4) 1 else 0
val fault_3 = if (fault == 3) 1 else 0
val fault_2 = if (fault == 2) 1 else 0
val fault_1 = if (fault == 1) 1 else 0
val warning_5 = if (warning == 1 || carType == 5) 1 else 0
val warning_4 = if (warning == 1 || carType == 4) 1 else 0
val warning_3 = if (warning == 1 || carType == 3) 1 else 0
val warning_2 = if (warning == 1 || carType == 2) 1 else 0
val warning_1 = if (warning == 1 || carType == 1) 1 else 0
event(online, activeCount, fault_4, fault_3, fault_2, fault_1, warning_5, warning_4, warning_3, warning_2, warning_1, 0)
})
query.groupBy("flag")
.sum("onlineCount", "activeCount", "fault_4", "fault_3", "fault_2", "fault_1", "warning_4", "warning_4", "warning_3", "warning_2", "warning_1")
.map(x => {
var result = ""
for (i <- 1 to 11) {
result = result + x.get(i) + ","
}
result.substring(0, result.length - 1)
})
.writeStream.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("checkpointLocation", "/root/sparkStreming")
.option("topic", "demo2")
.start().awaitTermination()
/** ******end *********/
}
// 在线数、活跃数、致命故障数量、严重故障数量、一般故障数量、轻微故障数量、房车报警数量、旅行车报警数量、桥跑车报警数量、跑车报警数量、敞篷车报警数量
case class event(onlineCount: Int, activeCount: Int, fault_4: Int, fault_3: Int, fault_2: Int, fault_1: Int, warning_5: Int, warning_4: Int, warning_3: Int, warning_2: Int, warning_1: Int, flag: Int)
}
第2关:实时数据可视化
任务描述
本关卡属于警务大数据中的实时可视化模块,在上一关的数据基础上进行实时数据展示,让车辆监控人员更容易地制定出监控策略、更可靠地保障交通安全、发挥交通基础设施效能、提升交通系统运行效率和管理水平,为通畅的公众出行和可持续的经济发展服务。
本关任务:使用WebSocket
完成Kafka
数据推送及数据可视化。
WebSocketServer.java
package net.educoder.app.service;
import net.educoder.app.utils.kafkaClient;
import org.springframework.stereotype.Component;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@ServerEndpoint("/websocket")
@Component
public class WebSocketServer {
/**
* 需求如下:
* 客户端与Websocket服务端建立连接之后,启动 kafkaClient 线程,将当前 session 作为参数传入
*/
/********** begin **********/
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
//启动 kafkaClient 线程
new Thread(new kafkaClient(session)).start();
}
/********** end **********/
}
kafkaClient.java
package net.educoder.app.utils;
import com.alibaba.fastjson.JSON;
import net.educoder.app.entity.Event;
import net.educoder.app.service.WebSocketServer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import javax.websocket.Session;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
public class kafkaClient implements Runnable {
private Session session;
public kafkaClient(Session session) {
this.session = session;
}
/********** begin **********/
@Override
public void run() {
// 1. 创建 Properties 对象
Properties props = new Properties();
// 2. 配置连接 kafka 的参数
/**
* bootstrap.servers:127.0.0.1:9092
* group.id:my_group
* enable.auto.commit:true
* auto.commit.interval.ms:1000
* key.deserializer:org.apache.kafka.common.serialization.StringDeserializer
* value.deserializer:org.apache.kafka.common.serialization.StringDeserializer
*/
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "my_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//3. 创建 KafkaConsumer 对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//4. 订阅名为 demo2 的 topic
consumer.subscribe(Arrays.asList("demo2"));
//5. 死循环,不断消费订阅的数据
while (true) {
//6. 使用 KafkaConsumer 拉取数据
ConsumerRecords<String, String> records = consumer.poll(100);
//7. 遍历数据,将数据封装到 Event 对象中,使用 fastjson 将 Event 对象转换成 JSON 字符串,最后调用 session.getBasicRemote().sendText(String msg); 将数据推送到前端页面
/**
*
* kafka消费的数据如下:
* 在线数,活跃数,致命故障数量,严重故障数量,一般故障数量,轻微故障数量,房车报警数量,旅行车报警数量,桥跑车报警数量,跑车报警数量,敞篷车报警数量
* 3608335,1802435,25809,63260,15879,38612,77507,29697,10542,67913,42963
* 1745818,1365579,29449,46912,58208,29464,46830,55611,90398,94499,89332
* 3768443,2243235,32830,12980,26930,61768,44310,20354,11672,91021,52017
*
*
* Event对象属性如下:
* private String onlineCount; 在线数
* private String activeCount; 活跃数
* private String fault4Count; 致命故障数量
* private String fault3Count; 严重故障数量
* private String fault2Count; 一般故障数量
* private String fault1Count; 轻微故障数量
* private String warning5Count; 房车报警数量
* private String warning4Count; 旅行车报警数量
* private String warning3Count; 桥跑车报警数量
* private String warning2Count; 跑车报警数量
* private String warning1Count; 敞篷车报警数量
*
* Event 对象有参构造如下:
* public Event(String onlineCount, String activeCount, String fault4Count, String fault3Count, String fault2Count, String fault1Count, String warning5Count, String warning4Count, String warning3Count, String warning2Count, String warning1Count){....}
*
*/
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
String[] arr = value.split(",");
if (arr.length == 11) {
// 在线数、活跃数、致命故障数量、严重故障数量、一般故障数量、轻微故障数量、房车报警数量、旅行车报警数量、桥跑车报警数量、跑车报警数量、敞篷车报警数量
Event event = new Event(arr[0], arr[1], arr[2], arr[3], arr[4], arr[5], arr[6], arr[7], arr[8], arr[9], arr[10]);
String s = JSON.toJSONString(event);
try {
session.getBasicRemote().sendText(s);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
/********** end **********/
}
index.html
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>智慧警务-交通大数据监控平台</title>
<link rel="stylesheet" th:href="@{css/common.css}">
<link rel="stylesheet" th:href="@{css/map.css}">
</head>
<body>
<div class="data">
<div class="data-title">
<div class="title-left fl"></div>
<div class="title-center fl"></div>
<div class="title-right fr"></div>
</div>
<div class="data-content">
<div class="con-left fl">
<div class="left-top">
<div class="info">
<div class="info-title">实时统计</div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
<div class="info-main">
<div class="info-1">
<div class="info-img fl">
<img th:src="@{img/info-img-1.png}" alt="">
</div>
<div class="info-text fl">
<p>车辆总数(辆)</p>
<p id="CountNum">12,457</p>
</div>
</div>
<div class="info-2">
<div class="info-img fl">
<img th:src="@{img/info-img-2.png}" alt="">
</div>
<div class="info-text fl">
<p>当前在线数(辆)</p>
<p id="OnlineNum">12,457</p>
</div>
</div>
<div class="info-3">
<div class="info-img fl">
<img th:src="@{img/info-img-3.png}" alt="">
</div>
<div class="info-text fl">
<p>当前活跃数(辆)</p>
<p id="activeNum">12,457</p>
</div>
</div>
<div class="info-4">
<div class="info-img fl">
<img th:src="@{img/info-img-4.png}" alt="">
</div>
<div class="info-text fl">
<p>当前活跃率(%)</p>
<p id="liveness">83</p>
</div>
</div>
</div>
</div>
<div class="top-bottom">
<div class="title"></div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
<div id="echarts_1" class="charts"></div>
</div>
</div>
<div class="left-bottom">
<div class="title"></div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
<div id="echarts_2" class="charts"></div>
</div>
</div>
<div class="con-center fl">
<div class="map-num">
<p>实时行驶车辆(辆)</p>
<div class="num" id="runningNum">
<span>1</span>
<span>,</span>
<span>2</span>
<span>3</span>
<span>4</span>
<span>,</span>
<span>5</span>
<span>6</span>
<span>7</span>
</div>
</div>
<div class="cen-top" id="map"></div>
<div class="cen-bottom">
<div class="title"></div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
<div id="echarts_3" class="charts"></div>
</div>
</div>
<div class="con-right fr">
<div class="right-top">
<div class="title">汽车故障统计</div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
<div id="echarts_5" class="charts"></div>
</div>
<div class="right-center">
<div class="title">电池报警车辆统计</div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
<div id="echarts_6" class="charts"></div>
</div>
<div class="right-bottom">
<div class="title"></div>
<img th:src="@{img/bj-1.png}" alt="" class="bj-1">
<img th:src="@{img/bj-2.png}" alt="" class="bj-2">
<img th:src="@{img/bj-3.png}" alt="" class="bj-3">
<img th:src="@{img/bj-4.png}" alt="" class="bj-4">
</div>
</div>
</div>
</div>
</body>
<script th:src="@{js/jquery-2.1.1.min.js}"></script>
<script th:src="@{js/echarts.min.js}"></script>
<script th:src="@{js/china.js}"></script>
<script th:src="@{js/echarts.js}"></script>
<script type="text/javascript" th:inline="javascript">
var socket;
if (typeof(WebSocket) == "undefined") {
console.log("您的浏览器不支持WebSocket");
} else {
console.log("您的浏览器支持WebSocket");
//实现化WebSocket对象,指定要连接的服务器地址与端口 建立连接
var href = window.location.href;
var arr = href.split(":");
var ip = arr[1];
var port = arr[2];
socket = new WebSocket("ws://" + ip + ":" + port + "/websocket");
//打开事件
socket.onopen = function () {
console.log("Socket 已打开");
};
//获得消息事件
socket.onmessage = function (msg) {
/********** begin **********/
//1.获取 WebSocket 服务端推送过来的数据并将其转换成 JSON 对象并命名为 d
var d = JSON.parse(msg.data);
//2.从 JSON 对象 d 中获取carCount、onlineCount、activeCount、active 并将其分别替换 id 为 CountNum、OnlineNum、activeNum 的文本内容
$("#CountNum").text(d.carCount);
$("#OnlineNum").text(d.onlineCount);
$("#activeNum").text(d.activeCount)
$("#liveness").text(d.active)
var runningNum = numberHandle(d.activeCount);
$("#runningNum").html(runningNum)
//3.汽车故障图表生成,创建一个存储4种故障类型数量的数组(fault4Count、fault3Count、fault2Count、fault1Count),调用 fault(Array arr) 函数生成图表
var faultList = new Array(d.fault4Count, d.fault3Count, d.fault2Count, d.fault1Count)
fault(faultList)
//4.汽车电池警告图表生成,创建一个存储5种电池警告类型数量的数组(warning5Count、warning4Count、warning3Count、warning2Count,warning1Count),调用 warning(Array arr) 函数生成图表
var warningList = new Array(d.warning5Count, d.warning4Count, d.warning3Count, d.warning2Count, d.warning1Count)
warning(warningList)
/********** end **********/
};
//关闭事件
socket.onclose = function () {
console.log("Socket已关闭");
};
//发生了错误事件
socket.onerror = function () {
alert("Socket发生了错误");
}
}
function numberHandle(str) {
var arr = str.split("");
var result = "";
for (var x = 0; x < arr.length; x++) {
result += "<span>" + arr[x] + "</span>"
}
return result;
}
function fault(dataList) {
var myChart = echarts.init(document.getElementById('echarts_5'));
var xData = function () {
var data = ['致命故障', '严重故障', '一般故障', '轻微故障'];
return data;
}();
option = {
tooltip: {
show: "true",
trigger: 'item',
backgroundColor: 'rgba(0,0,0,0.4)', // 背景
padding: [8, 10], //内边距
// extraCssText: 'box-shadow: 0 0 3px rgba(255, 255, 255, 0.4);', //添加阴影
formatter: function (params) {
if (params.seriesName != "") {
return params.name + ' : ' + params.value + ' 辆';
}
},
},
grid: {
borderWidth: 0,
top: 20,
bottom: 35,
left: 55,
right: 30,
textStyle: {
color: "#fff"
}
},
xAxis: [{
type: 'category',
axisTick: {
show: false
},
axisLine: {
show: true,
lineStyle: {
color: '#363e83',
}
},
axisLabel: {
inside: false,
textStyle: {
color: '#bac0c0',
fontWeight: 'normal',
fontSize: '12',
},
},
data: xData,
}, {
type: 'category',
axisLine: {
show: false
},
axisTick: {
show: false
},
axisLabel: {
show: false
},
splitArea: {
show: false
},
splitLine: {
show: false
},
data: xData,
}],
yAxis: {
type: 'value',
axisTick: {
show: false
},
axisLine: {
show: true,
lineStyle: {
color: '#32346c',
}
},
splitLine: {
show: true,
lineStyle: {
color: '#32346c ',
}
},
axisLabel: {
textStyle: {
color: '#bac0c0',
fontWeight: 'normal',
fontSize: '12',
},
formatter: '{value}',
},
},
series: [{
type: 'bar',
itemStyle: {
normal: {
show: true,
color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [{
offset: 0,
color: '#00c0e9'
}, {
offset: 1,
color: '#3b73cf'
}]),
barBorderRadius: 50,
borderWidth: 0,
},
emphasis: {
shadowBlur: 15,
shadowColor: 'rgba(105,123, 214, 0.7)'
}
},
zlevel: 2,
barWidth: '20%',
data: dataList
},
{
name: '',
type: 'bar',
xAxisIndex: 1,
zlevel: 1,
itemStyle: {
normal: {
color: '#121847',
borderWidth: 0,
shadowBlur: {
shadowColor: 'rgba(255,255,255,0.31)',
shadowBlur: 10,
shadowOffsetX: 0,
shadowOffsetY: 2,
},
}
},
barWidth: '20%'
}
]
};
myChart.setOption(option);
window.addEventListener("resize", function () {
myChart.resize();
});
}
function warning(dataList) {
var myChart = echarts.init(document.getElementById('echarts_6'));
var xData = function () {
var data = ['房车', '旅行轿车', '轿跑车', '跑车', '敞篷车'];
return data;
}();
option = {
// backgroundColor: "#141f56",
tooltip: {
show: "true",
trigger: 'item',
backgroundColor: 'rgba(0,0,0,0.4)', // 背景
padding: [8, 10], //内边距
// extraCssText: 'box-shadow: 0 0 3px rgba(255, 255, 255, 0.4);', //添加阴影
formatter: function (params) {
if (params.seriesName != "") {
return params.name + ' : ' + params.value + ' 辆';
}
},
},
grid: {
borderWidth: 0,
top: 20,
bottom: 35,
left: 55,
right: 30,
textStyle: {
color: "#fff"
}
},
xAxis: [{
type: 'category',
axisTick: {
show: false
},
axisLine: {
show: true,
lineStyle: {
color: '#363e83',
}
},
axisLabel: {
inside: false,
textStyle: {
color: '#bac0c0',
fontWeight: 'normal',
fontSize: '12',
},
},
data: xData,
}, {
type: 'category',
axisLine: {
show: false
},
axisTick: {
show: false
},
axisLabel: {
show: false
},
splitArea: {
show: false
},
splitLine: {
show: false
},
data: xData,
}],
yAxis: {
type: 'value',
axisTick: {
show: false
},
axisLine: {
show: true,
lineStyle: {
color: '#32346c',
}
},
splitLine: {
show: true,
lineStyle: {
color: '#32346c ',
}
},
axisLabel: {
textStyle: {
color: '#bac0c0',
fontWeight: 'normal',
fontSize: '12',
},
formatter: '{value}',
},
},
series: [{
type: 'bar',
itemStyle: {
normal: {
show: true,
color: new echarts.graphic.LinearGradient(0, 0, 0, 1, [{
offset: 0,
color: '#00c0e9'
}, {
offset: 1,
color: '#3b73cf'
}]),
barBorderRadius: 50,
borderWidth: 0,
},
emphasis: {
shadowBlur: 15,
shadowColor: 'rgba(105,123, 214, 0.7)'
}
},
zlevel: 2,
barWidth: '20%',
data: dataList,
},
{
name: '',
type: 'bar',
xAxisIndex: 1,
zlevel: 1,
itemStyle: {
normal: {
color: '#121847',
borderWidth: 0,
shadowBlur: {
shadowColor: 'rgba(255,255,255,0.31)',
shadowBlur: 10,
shadowOffsetX: 0,
shadowOffsetY: 2,
},
}
},
barWidth: '20%'
}
]
};
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
window.addEventListener("resize", function () {
myChart.resize();
});
}
</script>
</html>
版权声明:本文为qq_48664727原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。