elasticsearch|elasticSearch-搜索实战

一:创建db表并插入数据:

CREATE TABLE `position` ( `companyName` varchar(300) DEFAULT NULL, `id` double DEFAULT NULL, `positionAdvantage` varchar(300) DEFAULT NULL, `companyId` double DEFAULT NULL, `positionName` varchar(240) DEFAULT NULL, `salary` varchar(120) DEFAULT NULL, `salaryMin` double DEFAULT NULL, `salaryMax` double DEFAULT NULL, `salaryMonth` double DEFAULT NULL, `education` varchar(60) DEFAULT NULL, `workYear` varchar(60) DEFAULT NULL, `jobNature` varchar(120) DEFAULT NULL, `chargeField` blob, `createTime` datetime DEFAULT NULL, `email` varchar(300) DEFAULT NULL, `publishTime` varchar(150) DEFAULT NULL, `isEnable` double DEFAULT NULL, `isIndex` double DEFAULT NULL, `city` varchar(150) DEFAULT NULL, `orderby` double DEFAULT NULL, `isAdvice` double DEFAULT NULL, `showorder` double DEFAULT NULL, `publishUserId` double DEFAULT NULL, `workAddress` varchar(300) DEFAULT NULL, `generateTime` datetime DEFAULT NULL, `bornTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `isReward` double DEFAULT NULL, `rewardMoney` varchar(60) DEFAULT NULL, `isExpired` double DEFAULT NULL, `positionDetailPV` double DEFAULT NULL, `offlineTime` datetime DEFAULT NULL, `positionDetailPV_cnbeta` double DEFAULT NULL, `adviceTime` datetime DEFAULT NULL, `comeFrom` varchar(150) DEFAULT NULL, `receivedResumeCount` double DEFAULT NULL, `refuseResumeCount` double DEFAULT NULL, `markCanInterviewCount` double DEFAULT NULL, `haveNoticeInterCount` double DEFAULT NULL, `isForbidden` double DEFAULT NULL, `reason` varchar(768) DEFAULT NULL, `verifyTime` datetime DEFAULT NULL, `adWord` double DEFAULT NULL, `adRankAndTime` varchar(120) DEFAULT NULL, `adTimes` double DEFAULT NULL, `adStartTime` datetime DEFAULT NULL, `adEndTime` datetime DEFAULT NULL, `adBeforeDetailPV` double DEFAULT NULL, `adAfterDetailPV` double DEFAULT NULL, `adBeforeReceivedCount` double DEFAULT NULL, `adAfterReceivedCount` double DEFAULT NULL, `adjustScore` double DEFAULT NULL, `weightStartTime` datetime DEFAULT NULL, `weightEndTime` datetime DEFAULT NULL, `isForward` bit(1) DEFAULT NULL, `forwardEmail` varchar(300) DEFAULT NULL, `isSchoolJob` bit(1) DEFAULT NULL, `type` tinyint(4) DEFAULT NULL, `prolong_offline_time` datetime DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

二:pom.xml
4.0.0org.springframework.boot spring-boot-starter-parent 2.1.0.RELEASE com.lagou lagou-es-project 0.0.1-SNAPSHOT lagou-es-project Demo project for Spring Boot【elasticsearch|elasticSearch-搜索实战】7.3.0 org.elasticsearch.client elasticsearch-rest-high-level-client ${elasticsearch.version} org.elasticsearch elasticsearch ${elasticsearch.version} org.elasticsearch.client elasticsearch-rest-high-level-client 7.3.0 org.elasticsearch elasticsearch org.elasticsearch elasticsearch 7.3.0 org.springframework.boot spring-boot-starter-thymeleaf org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-devtools runtime true org.springframework.boot spring-boot-configuration-processor true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.apache.httpcomponents httpclient 4.5.3 com.alibaba fastjson 1.2.58 mysql mysql-connector-java runtime org.apache.commons commons-lang3 3.9 junit junit 4.12 test org.springframework.boot spring-boot-devtools true true org.springframework.boot spring-boot-maven-plugin

三:application.yml 文件
spring: devtools: restart: enabled: true #设置开启热部署 additional-paths: src/main/java #重启目录 exclude: WEB-INF/** freemarker: cache: false #页面不加载缓存,修改即时生效 elasticsearch: rest: uris: 192.168.211.136:9200,192.168.211.136:9201,192.168.211.136:9202 server: port: 8083 logging: level: root: info com.xdclass.search: debug

四:model
package com.es.model; import com.alibaba.fastjson.annotation.JSONType; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.format.annotation.DateTimeFormat; import java.util.Date; @Data @NoArgsConstructor @AllArgsConstructor public class Position {//主键 private String id; //公司名称 private String companyName; //职位名称 private String positionName; //职位诱惑 private String positionAdvantage; //薪资 private String salary; //薪资下限 private int salaryMin; //薪资上限 private int salaryMax; //学历 private String education; //工作年限 private String workYear; //发布时间 private String publishTime; //工作城市 private String city; //工作地点 private String workAddress; // 发布时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; // 工作模式 private String jobNature; }

五:ES配置类
package com.es.config; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EsConfig {@Value("${spring.elasticsearch.rest.uris}") private String hostlist; @Bean public RestHighLevelClient client() { //解析hostlist配置信息 String[] split = hostlist.split(","); //创建HttpHost数组,其中存放es主机和端口的配置信息 HttpHost[] httpHostArray = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) { String item = split[i]; System.out.println(item); httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http"); } //创建RestHighLevelClient客户端 return new RestHighLevelClient(RestClient.builder(httpHostArray)); } }

六:mysql连接工具
package com.es.util; import java.sql.Connection; import java.sql.DriverManager; public class DBHelper {public static final String url = "jdbc:mysql://192.168.211.136:3306/lagou_position? useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"; public static final String name = "com.mysql.cj.jdbc.Driver"; public static final String user = "root"; public static final String password = "123456"; public static Connection conn = null; public static Connection getConn() { try { Class.forName(name); conn = DriverManager.getConnection(url, user, password); //获取连接 } catch (Exception e) { e.printStackTrace(); } return conn; } }

七:接口和实现
public interface PositionService {/*** 分页查询 * @param keyword * @param pageNo * @param pageSize * @return */ public List searchPos(String keyword, int pageNo, int pageSize) throws IOException; /*** 导入数据 */ void importAll() throws IOException; }

package com.lagou.es.service.impl; import com.es.config.EsConfig; import com.es.service.PositionService; import com.es.util.DBHelper; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @Service public class PositionServiceImpl implements PositionService {private static final Logger logger = LogManager.getLogger(PositionServiceImpl.class); @Autowired private RestHighLevelClient client; private static final String POSITIOIN_INDEX = "position"; //查找职位 public List searchPos(String keyword, int pageNo, int pageSize) throws IOException { if (pageNo <= 1) { pageNo = 1; } //getPosition(keyword); // 条件搜索 SearchRequest searchRequest = new SearchRequest(POSITIOIN_INDEX); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //分页 index = (当前页-1)*一页显示条数 searchSourceBuilder.from((pageNo - 1) * pageSize); searchSourceBuilder.size(pageSize); //精准匹配 // TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("positionName",keyword); // searchSourceBuilder.query(termQueryBuilder); QueryBuilder builder = QueryBuilders.matchQuery("positionName", keyword); searchSourceBuilder.query(builder); searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //执行搜索 searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); ArrayList list = new ArrayList<>(); SearchHit[] hits = searchResponse.getHits().getHits(); System.out.println(hits.length); for (SearchHit hit : hits) { list.add(hit.getSourceAsMap()); } return list; }@Override public void importAll() throws IOException { writeMysqlDataToES(POSITIOIN_INDEX); }/** * 讲数据批量写入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); logger.info("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要 设置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = https://www.it610.com/article/rs.getMetaData(); ArrayList dataList = new ArrayList(); // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的 方式,故笔者在此将查出来的数据转换成hashMap方式 HashMap map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap(128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条写一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { logger.info("Mysql handle data number : " + count); // 将数据添加到 bulkProcessor 中 for (HashMap hashMap2 : dataList) { bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便将map与list清空 map.clear(); dataList.clear(); } } // 处理未提交的数据 for (HashMap hashMap2 : dataList) { bulkProcessor.add(new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } logger.info("-------------------------- Finally insert number total : " + count); // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的 刷新时间 bulkProcessor.flush(); } catch (Exception e) { logger.error(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); logger.info(terminatedFlag); } catch (Exception e) { logger.error(e.getMessage()); } } }private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("Try to insert data number : " + request.numberOfActions()); }@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: " + executionId); }@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId); } }; BiConsumer> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSecon ds(1L), 3)); // 注意点:让参数设置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { logger.error(e1.getMessage()); } } return bulkProcessor; } }

BulkProcessor 官网介绍
https://www.elastic.co/guide/en/elasticsearch/client/java-api/7.3/java-docs-bulk-processor.html
八:Controller
package com.lagou.es.controller; import com.fasterxml.jackson.databind.ObjectMapper; import com.lagou.es.service.PositionService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.io.IOException; import java.util.List; import java.util.Map; @Controller public class PositionController {@Autowired private PositionService service; //测试范文页面 @GetMapping({"/", "index"}) public String indexPage() { return "index"; }@GetMapping("/search/{keyword}/{pageNo}/{pageSize}") @ResponseBody public List searchPosition(@PathVariable("keyword") String keyword, @PathVariable("pageNo") int pageNo, @PathVariable("pageSize") int pageSize) throws IOException { List list = service.searchPos(keyword, pageNo, pageSize); System.out.println(list); return list; }@RequestMapping("/importAll") @ResponseBody public String importAll() { try { service.importAll(); } catch (IOException e) { e.printStackTrace(); } return "success"; } }

十:启动类
package com.lagou.es; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SearchApplication {public static void main(String[] args) { SpringApplication.run(SearchApplication.class, args); } }


    推荐阅读