當(dāng)前位置:首頁(yè) > IT技術(shù) > 數(shù)據(jù)庫(kù) > 正文

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新
2022-09-06 22:35:20


Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_自定義分詞庫(kù)


Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_02

文章目錄

一、源碼分析
1. 默認(rèn)熱更新

官方提供的熱更新方式

??https://github.com/medcl/elasticsearch-analysis-ik??

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_mysql_03

2. 熱更新分析

上圖是官方提供的一種熱更新詞庫(kù)的方式,是基于遠(yuǎn)程文件的,不太實(shí)用,但我們可以模仿這種方式自己實(shí)現(xiàn)一個(gè)基于 MySQL 的,官方提供的實(shí)現(xiàn)??org.wltea.analyzer.dic.Monitor??類中,以下是其完整代碼。

  • 1.向詞庫(kù)服務(wù)器發(fā)送Head請(qǐng)求
  • 2.從響應(yīng)中獲取Last-Modify、ETags字段值,判斷是否變化
  • 3.如果未變化,休眠1min,返回第①步
  • 4.如果有變化,調(diào)用 Dictionary#reLoadMainDict()方法重新加載詞典
  • 5.休眠1min,返回第①步
package org.wltea.analyzer.dic;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.wltea.analyzer.help.ESPluginLoggerFactory;

public class Monitor implements Runnable {

private static final Logger logger = ESPluginLoggerFactory.getLogger(Monitor.class.getName());

private static CloseableHttpClient httpclient = HttpClients.createDefault();
/*
* 上次更改時(shí)間
*/
private String last_modified;
/*
* 資源屬性
*/
private String eTags;

/*
* 請(qǐng)求地址
*/
private String location;

public Monitor(String location) {
this.location = location;
this.last_modified = null;
this.eTags = null;
}

public void run() {
SpecialPermission.check();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
this.runUnprivileged();
return null;
});
}

/**
* 監(jiān)控流程:
* ①向詞庫(kù)服務(wù)器發(fā)送Head請(qǐng)求
* ②從響應(yīng)中獲取Last-Modify、ETags字段值,判斷是否變化
* ③如果未變化,休眠1min,返回第①步
* ④如果有變化,重新加載詞典
* ⑤休眠1min,返回第①步
*/

public void runUnprivileged() {

//超時(shí)設(shè)置
RequestConfig rc = RequestConfig.custom().setConnectionRequestTimeout(10*1000)
.setConnectTimeout(10*1000).setSocketTimeout(15*1000).build();

HttpHead head = new HttpHead(location);
head.setConfig(rc);

//設(shè)置請(qǐng)求頭
if (last_modified != null) {
head.setHeader("If-Modified-Since", last_modified);
}
if (eTags != null) {
head.setHeader("If-None-Match", eTags);
}

CloseableHttpResponse response = null;
try {

response = httpclient.execute(head);

//返回200 才做操作
if(response.getStatusLine().getStatusCode()==200){

if (((response.getLastHeader("Last-Modified")!=null) && !response.getLastHeader("Last-Modified").getValue().equalsIgnoreCase(last_modified))
||((response.getLastHeader("ETag")!=null) && !response.getLastHeader("ETag").getValue().equalsIgnoreCase(eTags))) {

// 遠(yuǎn)程詞庫(kù)有更新,需要重新加載詞典,并修改last_modified,eTags
Dictionary.getSingleton().reLoadMainDict();
last_modified = response.getLastHeader("Last-Modified")==null?null:response.getLastHeader("Last-Modified").getValue();
eTags = response.getLastHeader("ETag")==null?null:response.getLastHeader("ETag").getValue();
}
}else if (response.getStatusLine().getStatusCode()==304) {
//沒有修改,不做操作
//noop
}else{
logger.info("remote_ext_dict {} return bad code {}" , location , response.getStatusLine().getStatusCode() );
}

} catch (Exception e) {
logger.error("remote_ext_dict {} error!",e , location);
}finally{
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}

}
3. 方法分析

??eLoadMainDict()???會(huì)調(diào)用??loadMainDict()???,進(jìn)而調(diào)用??loadRemoteExtDict()???加載了遠(yuǎn)程自定義詞庫(kù),同樣的調(diào)用??loadStopWordDict()??也會(huì)同時(shí)加載遠(yuǎn)程停用詞庫(kù)。 reLoadMainDict()方法新創(chuàng)建了一個(gè)詞典實(shí)例來(lái)重新加載詞典,然后替換原來(lái)的詞典,是一個(gè)全量替換。

void reLoadMainDict() {
logger.info("重新加載詞典...");
// 新開一個(gè)實(shí)例加載詞典,減少加載過(guò)程對(duì)當(dāng)前詞典使用的影響
Dictionary tmpDict = new Dictionary(configuration);
tmpDict.configuration = getSingleton().configuration;
tmpDict.loadMainDict();
tmpDict.loadStopWordDict();
_MainDict = tmpDict._MainDict;
_StopWords = tmpDict._StopWords;
logger.info("重新加載詞典完畢...");
}

/**
* 加載主詞典及擴(kuò)展詞典
*/
private void () {
// 建立一個(gè)主詞典實(shí)例
_MainDict = new DictSegment((char) 0);

// 讀取主詞典文件
Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);
loadDictFile(_MainDict, file, false, "Main Dict");
// 加載擴(kuò)展詞典
this.loadExtDict();
// 加載遠(yuǎn)程自定義詞庫(kù)
this.loadRemoteExtDict();
}

??loadRemoteExtDict()??方法的邏輯也很清晰:

  • 1.獲取遠(yuǎn)程詞典的 URL,可能有多個(gè)
  • 2.循環(huán)請(qǐng)求每個(gè) URL,取回遠(yuǎn)程詞典
  • 3.將遠(yuǎn)程詞典添加到主詞典中??_MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());??? 這里需要重點(diǎn)關(guān)注的是 ??fillSegment()??方法,它的作用是將一個(gè)詞加入詞典,與之相反的方法是??disableSegment()??,屏蔽詞典中的一個(gè)詞。
loadRemoteExtDict() {
List<String> remoteExtDictFiles = getRemoteExtDictionarys();
for (String location : remoteExtDictFiles) {
logger.info("[Dict Loading] " + location);
List<String> lists = getRemoteWords(location);
// 如果找不到擴(kuò)展的字典,則忽略
if (lists == null) {
logger.error("[Dict Loading] " + location + " load failed");
continue;
}
for (String theWord : lists) {
if (theWord != null && !"".equals(theWord.trim())) {
// 加載擴(kuò)展詞典數(shù)據(jù)到主內(nèi)存詞典中
logger.info(theWord);
_MainDict.fillSegment(theWord.trim().toLowerCase().toCharArray());
}
}
}

}

/**
* 加載填充詞典片段
* @param charArray
*/
void fillSegment(char[] charArray){
this.fillSegment(charArray, 0 , charArray.length , 1);
}

/**
* 屏蔽詞典中的一個(gè)詞
* @param charArray
*/
void disableSegment(char[] charArray){
this.fillSegment(charArray, 0 , charArray.length , 0);
}

??Monitor???類只是一個(gè)監(jiān)控程序,它是在??org.wltea.analyzer.dic.Dictionary類的initial()??方法被啟動(dòng)的,以下代碼的 29~35 行。

...
...
// 線程池
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
...
...

/**
* 詞典初始化 由于IK Analyzer的詞典采用Dictionary類的靜態(tài)方法進(jìn)行詞典初始化
* 只有當(dāng)Dictionary類被實(shí)際調(diào)用時(shí),才會(huì)開始載入詞典, 這將延長(zhǎng)首次分詞操作的時(shí)間 該方法提供了一個(gè)在應(yīng)用加載階段就初始化字典的手段
*
* @return Dictionary
*/
public static synchronized void initial(Configuration cfg) {
if (singleton == null) {
synchronized (Dictionary.class) {
if (singleton == null) {

singleton = new Dictionary(cfg);
singleton.loadMainDict();
singleton.loadSurnameDict();
singleton.loadQuantifierDict();
singleton.loadSuffixDict();
singleton.loadPrepDict();
singleton.loadStopWordDict();

if(cfg.isEnableRemoteDict()){
// 建立監(jiān)控線程
for (String location : singleton.getRemoteExtDictionarys()) {
// 10 秒是初始延遲可以修改的 60是間隔時(shí)間 單位秒
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
for (String location : singleton.getRemoteExtStopWordDictionarys()) {
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
}

}
}
}
}
二、詞庫(kù)熱更新

實(shí)現(xiàn)基于MySql的詞庫(kù)熱更新

2.1. 導(dǎo)入依賴

在項(xiàng)目根目錄的pom文件中修改es的版本,以及引入mysql8.0依賴

<properties>
<elasticsearch.version>7.15.2</elasticsearch.version>
</properties>

<!--mysql驅(qū)動(dòng)-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>

默認(rèn)是??7.14.0-SNAPSHOT??

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_04


調(diào)整版本為??7.15.2??

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_自定義分詞庫(kù)_05


Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_06

2.2. 數(shù)據(jù)庫(kù)

創(chuàng)建數(shù)據(jù)庫(kù)??dianpingdb???,初始化表結(jié)構(gòu)
???es_extra_main、es_extra_stopword??分別為主詞典和停用詞典。

CREATE TABLE `es_extra_main` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`word` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '詞',
`is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已刪除',
`update_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '更新時(shí)間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


CREATE TABLE `es_extra_stopword` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`word` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '詞',
`is_deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否已刪除',
`update_time` timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '更新時(shí)間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2.3. JDBC 配置

在項(xiàng)目的config文件夾下創(chuàng)建??jdbc.properties?? 文件,記錄 MySQL 的 url、driver、username、password,和查詢主詞典、停用詞典的 SQL,以及熱更新的間隔秒數(shù)。從兩個(gè) SQL 可以看出我的設(shè)計(jì)是增量更新,而不是官方的全量替換。

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_07


jdbc.properties內(nèi)容

jdbc.url=jdbc:mysql://192.168.92.128:3306/dianpingdb?useAffectedRows=true&characterEncoding=UTF-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useUnicode=true&serverTimezone=GMT%2B8&allowMultiQueries=true
jdbc.username=root
jdbc.password=123456
jdbc.driver=com.mysql.cj.jdbc.Driver
jdbc.update.main.dic.sql=SELECT * FROM `es_extra_main` WHERE update_time > ? order by update_time asc
jdbc.update.stopword.sql=SELECT * FROM `es_extra_stopword` WHERE update_time > ? order by update_time asc
jdbc.update.interval=10
2.4. 打包配置

??src/main/assemblies/plugin.xml?? 將 MySQL 驅(qū)動(dòng)的依賴寫入,否則打成 zip 后會(huì)沒有 MySQL 驅(qū)動(dòng)的 jar 包。

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_08

<!--這里 看我看我-->
<include>mysql:mysql-connector-java</include>
2.5. 權(quán)限策略

??src/main/resources/plugin-security.policy??? 添加??permission java.lang.RuntimePermission "setContextClassLoader";??,否則會(huì)因?yàn)闄?quán)限問(wèn)題拋出以下異常。

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_mysql_09

grant {
// needed because of the hot reload functionality
permission java.net.SocketPermission "*", "connect,resolve";
permission java.lang.RuntimePermission "setContextClassLoader";
};

不添加以上配置,拋出的異常信息:

java.lang.ExceptionInInitializerError: null
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_261]
at java.lang.Class.forName(Unknown Source) ~[?:1.8.0_261]
at com.mysql.cj.jdbc.NonRegisteringDriver.<clinit>(NonRegisteringDriver.java:97) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_261]
at java.lang.Class.forName(Unknown Source) ~[?:1.8.0_261]
at org.wltea.analyzer.dic.DatabaseMonitor.lambda$new$0(DatabaseMonitor.java:72) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_261]
at org.wltea.analyzer.dic.DatabaseMonitor.<init>(DatabaseMonitor.java:70) ~[?:?]
at org.wltea.analyzer.dic.Dictionary.initial(Dictionary.java:172) ~[?:?]
at org.wltea.analyzer.cfg.Configuration.<init>(Configuration.java:40) ~[?:?]
at org.elasticsearch.index.analysis.IkTokenizerFactory.<init>(IkTokenizerFactory.java:15) ~[?:?]
at org.elasticsearch.index.analysis.IkTokenizerFactory.getIkSmartTokenizerFactory(IkTokenizerFactory.java:23) ~[?:?]
at org.elasticsearch.index.analysis.AnalysisRegistry.buildMapping(AnalysisRegistry.java:379) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.index.analysis.AnalysisRegistry.buildTokenizerFactories(AnalysisRegistry.java:189) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.index.analysis.AnalysisRegistry.build(AnalysisRegistry.java:163) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.index.IndexService.<init>(IndexService.java:164) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.index.IndexModule.newIndexService(IndexModule.java:402) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.indices.IndicesService.createIndexService(IndicesService.java:526) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.indices.IndicesService.verifyIndexMetadata(IndicesService.java:599) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.gateway.Gateway.performStateRecovery(Gateway.java:129) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.gateway.GatewayService$1.doRun(GatewayService.java:227) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:751) ~[elasticsearch-6.7.2.jar:6.7.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-6.7.2.jar:6.7.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:1.8.0_261]
at java.lang.Thread.run(Unknown Source) [?:1.8.0_261]
Caused by: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "setContextClassLoader")
at java.security.AccessControlContext.checkPermission(Unknown Source) ~[?:1.8.0_261]
at java.security.AccessController.checkPermission(Unknown Source) ~[?:1.8.0_261]
at java.lang.SecurityManager.checkPermission(Unknown Source) ~[?:1.8.0_261]
at java.lang.Thread.setContextClassLoader(Unknown Source) ~[?:1.8.0_261]
at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.lambda$static$0(AbandonedConnectionCleanupThread.java:72) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(Unknown Source) ~[?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.addWorker(Unknown Source) ~[?:1.8.0_261]
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) ~[?:1.8.0_261]
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Unknown Source) ~[?:1.8.0_261]
at com.mysql.cj.jdbc.AbandonedConnectionCleanupThread.<clinit>(AbandonedConnectionCleanupThread.java:75) ~[?:?]
... 26 more
2.6. 修改 Dictionary
  • 1.在構(gòu)造方法中加載 jdbc.properties 文件
  • Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_10

();
  • 2.將 getProperty()改為 public
  • Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_11

  • 3.添加了幾個(gè)方法,用于增刪詞條
    在類的最后添加以下幾個(gè)方法
  • Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_12


  • Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_mysql_13

(String word) {
singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray());
}

/**
* 移除(屏蔽)詞條
*/
public static void disableWord(String word) {
singleton._MainDict.disableSegment(word.trim().toLowerCase().toCharArray());
}

/**
* 加載新停用詞
*/
public static void addStopword(String word) {
singleton._StopWords.fillSegment(word.trim().toLowerCase().toCharArray());
}

/**
* 移除(屏蔽)停用詞
*/
public static void disableStopword(String word) {
singleton._StopWords.disableSegment(word.trim().toLowerCase().toCharArray());
}

/**
* 加載 jdbc.properties
*/
public void loadJdbcProperties() {
Path file = PathUtils.get(getDictRoot(), DatabaseMonitor.PATH_JDBC_PROPERTIES);
try {
props.load(new FileInputStream(file.toFile()));
logger.info("====================================properties====================================");
for (Map.Entry<Object, Object> entry : props.entrySet()) {
logger.info("{}: {}", entry.getKey(), entry.getValue());
}
logger.info("====================================properties====================================");
} catch (IOException e) {
logger.error("failed to read file: " + DatabaseMonitor.PATH_JDBC_PROPERTIES, e);
}
}
  • 4.initial()啟動(dòng)自己實(shí)現(xiàn)的數(shù)據(jù)庫(kù)監(jiān)控線程
    搜索???initial(Configuration cfg)??方法
  • Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_14


  • Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_自定義分詞庫(kù)_15

// 建立數(shù)據(jù)庫(kù)監(jiān)控線程
pool.scheduleAtFixedRate(new DatabaseMonitor(), 10, Long.parseLong(getSingleton().getProperty(DatabaseMonitor.JDBC_UPDATE_INTERVAL)), TimeUnit.SECONDS);
2.7. 熱更新類

MySQL 熱更新的實(shí)現(xiàn)類 DatabaseMonitor

  • 1.??lastUpdateTimeOfMainDic、lastUpdateTimeOfStopword??? 記錄上次處理的最后一條的??updateTime??
  • 2.查出上次處理之后新增或刪除的記錄
  • 3.循環(huán)判斷??is_deleted??? 字段,為??true???則添加詞條,??false??則刪除詞條

在??org.wltea.analyzer.dic???包下創(chuàng)建??DatabaseMonitor??類

package org.wltea.analyzer.dic;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.SpecialPermission;
import org.wltea.analyzer.help.ESPluginLoggerFactory;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;

/**
* 通過(guò) mysql 更新詞典
*
* @author gblfy
* @date 2021-11-21
* @WebSite gblfy.com
*/
public class DatabaseMonitor implements Runnable {

private static final Logger logger = ESPluginLoggerFactory.getLogger(DatabaseMonitor.class.getName());
public static final String PATH_JDBC_PROPERTIES = "jdbc.properties";

private static final String JDBC_URL = "jdbc.url";
private static final String JDBC_USERNAME = "jdbc.username";
private static final String JDBC_PASSWORD = "jdbc.password";
private static final String JDBC_DRIVER = "jdbc.driver";
private static final String SQL_UPDATE_MAIN_DIC = "jdbc.update.main.dic.sql";
private static final String SQL_UPDATE_STOPWORD = "jdbc.update.stopword.sql";

/**
* 更新間隔
*/
public final static String JDBC_UPDATE_INTERVAL = "jdbc.update.interval";

private static final Timestamp DEFAULT_LAST_UPDATE = Timestamp.valueOf(LocalDateTime.of(LocalDate.of(2020, 1, 1), LocalTime.MIN));

private static Timestamp lastUpdateTimeOfMainDic = null;

private static Timestamp lastUpdateTimeOfStopword = null;

public String getUrl() {
return Dictionary.getSingleton().getProperty(JDBC_URL);
}

public String getUsername() {
return Dictionary.getSingleton().getProperty(JDBC_USERNAME);
}

public String getPassword() {
return Dictionary.getSingleton().getProperty(JDBC_PASSWORD);
}

public String getDriver() {
return Dictionary.getSingleton().getProperty(JDBC_DRIVER);
}

public String getUpdateMainDicSql() {
return Dictionary.getSingleton().getProperty(SQL_UPDATE_MAIN_DIC);
}

public String getUpdateStopwordSql() {
return Dictionary.getSingleton().getProperty(SQL_UPDATE_STOPWORD);
}

/**
* 加載MySQL驅(qū)動(dòng)
*/
public DatabaseMonitor() {
SpecialPermission.check();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
try {
Class.forName(getDriver());
} catch (ClassNotFoundException e) {
logger.error("mysql jdbc driver not found", e);
}
return null;
});


}

@Override
public void run() {
SpecialPermission.check();
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
Connection conn = getConnection();

// 更新主詞典
updateMainDic(conn);
// 更新停用詞
updateStopword(conn);
closeConnection(conn);

return null;
});

}

public Connection getConnection() {
Connection connection = null;
try {
connection = DriverManager.getConnection(getUrl(), getUsername(), getPassword());
} catch (SQLException e) {
logger.error("failed to get connection", e);
}
return connection;
}

public void closeConnection(Connection conn) {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("failed to close Connection", e);
}
}
}

public void closeRsAndPs(ResultSet rs, PreparedStatement ps) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("failed to close ResultSet", e);
}
}

if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
logger.error("failed to close PreparedStatement", e);
}
}

}

/**
* 主詞典
*/
public synchronized void updateMainDic(Connection conn) {

logger.info("start update main dic");
int numberOfAddWords = 0;
int numberOfDisableWords = 0;
PreparedStatement ps = null;
ResultSet rs = null;

try {
String sql = getUpdateMainDicSql();

Timestamp param = lastUpdateTimeOfMainDic == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfMainDic;

logger.info("param: " + param);

ps = conn.prepareStatement(sql);
ps.setTimestamp(1, param);

rs = ps.executeQuery();

while (rs.next()) {
String word = rs.getString("word");
word = word.trim();

if (word.isEmpty()) {
continue;
}

lastUpdateTimeOfMainDic = rs.getTimestamp("update_time");

if (rs.getBoolean("is_deleted")) {
logger.info("[main dic] disable word: {}", word);
// 刪除
Dictionary.disableWord(word);
numberOfDisableWords++;
} else {
logger.info("[main dic] add word: {}", word);
// 添加
Dictionary.addWord(word);
numberOfAddWords++;
}
}

logger.info("end update main dic -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);

} catch (SQLException e) {
logger.error("failed to update main_dic", e);
// 關(guān)閉 ResultSet、PreparedStatement
closeRsAndPs(rs, ps);
}
}

/**
* 停用詞
*/
public synchronized void updateStopword(Connection conn) {

logger.info("start update stopword");

int numberOfAddWords = 0;
int numberOfDisableWords = 0;
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = getUpdateStopwordSql();

Timestamp param = lastUpdateTimeOfStopword == null ? DEFAULT_LAST_UPDATE : lastUpdateTimeOfStopword;

logger.info("param: " + param);

ps = conn.prepareStatement(sql);
ps.setTimestamp(1, param);

rs = ps.executeQuery();

while (rs.next()) {
String word = rs.getString("word");
word = word.trim();


if (word.isEmpty()) {
continue;
}

lastUpdateTimeOfStopword = rs.getTimestamp("update_time");

if (rs.getBoolean("is_deleted")) {
logger.info("[stopword] disable word: {}", word);

// 刪除
Dictionary.disableStopword(word);
numberOfDisableWords++;
} else {
logger.info("[stopword] add word: {}", word);
// 添加
Dictionary.addStopword(word);
numberOfAddWords++;
}
}

logger.info("end update stopword -> addWord: {}, disableWord: {}", numberOfAddWords, numberOfDisableWords);

} catch (SQLException e) {
logger.error("failed to update main_dic", e);
} finally {
// 關(guān)閉 ResultSet、PreparedStatement
closeRsAndPs(rs, ps);
}
}
}
2.8. 編譯打包

直接??mvn clean package???,然后在 ??elasticsearch-analysis-ik/target/releases???目錄中找到 ??elasticsearch-analysis-ik-7.15.2.zip??? 壓縮包,上傳到??plugins??目錄下面(我的目錄是/app/elasticsearch-7.15.2/plugins)

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_16

2.9. 上傳

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_mysql_17

2.10. 修改記錄

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_自定義分詞庫(kù)_18

三、服務(wù)器操作
3.1. 分詞插件目錄

新建analysis-ik文件夾

cd /app/elasticsearch-7.15.2/plugins/
mkdir
3.2. 解壓es
unzip
3.3. 移動(dòng)文件

將解壓后的文件都移動(dòng)到 analysis-ik文件夾下面

mv
3.4. 目錄結(jié)構(gòu)

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_java_19

3.5. 配置轉(zhuǎn)移

將jdbc復(fù)制到指定目錄

啟動(dòng)時(shí)會(huì)加載??/app/elasticsearch-7.15.2/config/analysis-ik/jdbc.properties??

cd /app/elasticsearch-7.15.2/plugins/
cp
3.6. 重新啟動(dòng)es
cd /app/elasticsearch-7.15.2/
bin/elasticsearch -d && tail
3.7. 測(cè)試分詞

沒有添加任何自定義分詞的情況下,提前測(cè)試看效果

# 查閱凱悅分詞
GET /shop/_analyze
{
"analyzer": "ik_smart",
"text": "我叫凱悅"
}

GET /shop/_analyze
{
"analyzer": "ik_max_word",
"text": "我叫凱悅"
}

搜索結(jié)果:把??我叫凱悅??分詞成了單字組合形式

{
"tokens" : [
{
"token" : "我",
"start_offset" : 0,
"end_offset" : 1,
"type" : "CN_CHAR",
"position" : 0
},
{
"token" : "叫",
"start_offset" : 1,
"end_offset" : 2,
"type" : "CN_CHAR",
"position" : 1
},
{
"token" : "凱",
"start_offset" : 2,
"end_offset" : 3,
"type" : "CN_CHAR",
"position" : 2
},
{
"token" : "悅",
"start_offset" : 3,
"end_offset" : 4,
"type" : "CN_CHAR",
"position" : 3
}
]
}

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_20

3.8. 新增分詞

在是數(shù)據(jù)庫(kù)中的??es_extra_main???表中添加自定義分析??“我叫凱瑞”?? ,提交事務(wù)

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_21

3.9. es控制臺(tái)監(jiān)控

從下面截圖中更可以看出,已經(jīng)加載到咱么剛才添加的自定義??“我叫凱瑞”??分詞了

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_加載_22

3.10. 重新查看分詞
# 查閱凱悅分詞
GET /shop/_analyze
{
"analyzer": "ik_smart",
"text": "我叫凱悅"
}

GET /shop/_analyze
{
"analyzer": "ik_max_word",
"text": "我叫凱悅"
}
3.11. 分詞數(shù)據(jù)

從截圖中可以看出,把 ??“我叫凱瑞”??作為一個(gè)整體的分詞了

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_自定義分詞庫(kù)_23

3.12. 修改后的源碼

??https://gitee.com/gb_90/elasticsearch-analysis-ik??

Elasticsearch7.15.2 修改IK分詞器源碼實(shí)現(xiàn)基于MySql8的詞庫(kù)熱更新_自定義分詞庫(kù)_24


本文摘自 :https://blog.51cto.com/g

開通會(huì)員,享受整站包年服務(wù)立即開通 >