SpringCloud-Day9-elasticsearch-数据聚合、自动补全、数据同步、集群
elasticsearch-数据聚合、自动补全、数据同步、集群
1.elasticsearch-数据聚合
1.1聚合的分类
1.2DSL实现聚合
1.2.1 DSL实现Bucket聚合
1.2.2 DSL实现Metrics聚合
1.3RestAPI实现
2.elasticsearch-自动补全
2.1拼音分词器
2.2自定义分词器
2.3自动补全查询
2.4实现酒店搜索框自动补全
3.elasticsearch-数据同步
3.1数据同步思路分析
3.2实现elasticsearch与数据库数据同步
声明exchange、queue、Routingkey
package cn.itcast.hotel.constants;
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE="hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY="hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY="hotel.delete";
}
package cn.itcast.hotel.config;
import cn.itcast.hotel.constants.MqConstants;
import com.sun.org.apache.bcel.internal.generic.NEW;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
增删改查,发送消息
@RestController
@RequestMapping("hotel")
public class HotelController {
@Autowired
private IHotelService hotelService;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id){
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
){
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
}
完成消息监听,并更新elasticsearch中数据
package cn.itcast.hotel.mq;
import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 监听酒店新增或修改的业务
* @param id 酒店id
*/
@RabbitListener(queues= MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 监听酒店删除的业务
* @param id 酒店id
*/
@RabbitListener(queues= MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDeleteOrUpdate(Long id){
hotelService.deleteById(id);
}
}
@Override
public void deleteById(Long id) {
try {
DeleteRequest request = new DeleteRequest("hotel",id.toString());
client.delete(request,RequestOptions.DEFAULT);
}catch (IOException e){
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
//0.根据id查询酒店数据
Hotel hotel = getById(id);
//转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//1.准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//2.准备Json文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request,RequestOptions.DEFAULT);
}catch (IOException e){
throw new RuntimeException(e);
}
}
4.elasticsearch-集群
4.1搭建ES集群
4.2集群脑裂问题
4.3集群故障转移
4.4集群分布式存储
4.5集群分布式查询
版权声明:本文为qq_45498432原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。