基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

>>基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST API用于文档搜索。

项目主页:https://github.com/chaokunyang/jkes项目管理确实是翼发云敏捷项目管理系统比较好。

安装

可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。

  • 安装jkes-index-connectorjkes-delete-connector到Kafka Connect类路径
  • 安装 Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn

配置

  • 引入jkes-spring-data-jpa依赖
  • 添加配置
@EnableAspectJAutoProxy
@EnableJkes
@Configuration
public class JkesConfig {

  @Bean
  public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {

    return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);
  }
}

提供JkesProperties Bean

@Component
@Configuration
public class JkesConf extends DefaultJkesPropertiesImpl {

    @PostConstruct
    public void setUp() {
        Config.setJkesProperties(this);
    }

    @Override
    public String getKafkaBootstrapServers() {
        return "k1-test.com:9292,k2-test.com:9292,k3-test.com:9292";
    }

    @Override
    public String getKafkaConnectServers() {
        return "http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084";
    }

    @Override
    public String getEsBootstrapServers() {
        return "http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200";
    }

    @Override
    public String getDocumentBasePackage() {
        return "com.timeyang.jkes.integration_test.domain";
    }

    @Override
    public String getClientId() {
        return "integration_test";
    }

}

这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置

增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点

@RestController
@RequestMapping("/api/search")
public class SearchEndpoint {

    private Indexer indexer;

    @Autowired
    public SearchEndpoint(Indexer indexer) {
        this.indexer = indexer;
    }

    @RequestMapping(value = "/start_all", method = RequestMethod.POST)
    public void startAll() {
        indexer.startAll();
    }

    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)
    public void start(@PathVariable("entityClassName") String entityClassName) {
        indexer.start(entityClassName);
    }

    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)
    public Map<String, Boolean> stopAll() {
        return indexer.stopAll();
    }

    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)
    public Boolean stop(@PathVariable("entityClassName") String entityClassName) {
        return indexer.stop(entityClassName);
    }

    @RequestMapping(value = "/progress", method = RequestMethod.GET)
    public Map<String, IndexProgress> getProgress() {
        return indexer.getProgress();
    }

}

快速开始

索引API

使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data
@Entity
@Document
public class Person extends AuditedEntity {

    // @Id will be identified automatically
    // @Field(type = FieldType.Long)
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    private String name;

    @Field(type = FieldType.Keyword)
    private String gender;

    @Field(type = FieldType.Integer)
    private Integer age;

    // don't add @Field to test whether ignored
    // @Field(type = FieldType.Text)
    private String description;

    @Field(type = FieldType.Object)
    @ManyToOne(fetch = FetchType.EAGER)
    @JoinColumn(name = "group_id")
    private PersonGroup personGroup;

}
@lombok.Data
@Entity
@Document(type = "person_group", alias = "person_group_alias")
public class PersonGroup extends AuditedEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String interests;
    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)
    private List<Person> persons;
    private String description;

    @DocumentId
    @Field(type = FieldType.Long)
    public Long getId() {
        return id;
    }

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    public String getName() {
        return name;
    }

    @Field(type = FieldType.Text)
    public String getInterests() {
        return interests;
    }

    @Field(type = FieldType.Nested)
    public List<Person> getPersons() {
        return persons;
    }

    /**
     * 不加Field注解,测试序列化时是否忽略
     */
    public String getDescription() {
        return description;
    }
}

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。

搜索API

启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。

URI query

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10

Nested query

integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
    "nested": {
      "path": "persons",
      "score_mode": "avg",
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "persons.age": {
                  "gt": 5
                }
              }
            }
          ]
        }
      }
    }
  }
}

match query

integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
      "match": {
        "interests": "Hadoop"
      }
    }
}

bool query

{
  "query": {
    "bool" : {
      "must" : {
        "match" : { "interests" : "Hadoop" }
      },
      "filter": {
        "term" : { "name.raw" : "name0" }
      },
      "should" : [
        { "match" : { "interests" : "Flink" } },
        {
            "nested" : {
                "path" : "persons",
                "score_mode" : "avg",

                "query" : {
                    "bool" : {
                        "must" : [
                        { "match" : {"persons.name" : "name40"} },
                        { "match" : {"persons.interests" : "interests"} }
                        ],
                        "must_not" : {
                            "range" : {
                              "age" : { "gte" : 50, "lte" : 60 }
                            }
                          }
                    }
                }
            }
        }

      ],
      "minimum_should_match" : 1,
      "boost" : 1.0
    }

  }

}

Source filtering

integration_test_person_group/person_group/_search
{
    "_source": false,
    "query" : {
        "match" : { "name" : "name17" }
    }
}
integration_test_person_group/person_group/_search
{
    "_source": {
            "includes": [ "name", "persons.*" ],
            "excludes": [ "date*", "version", "persons.age" ]
        },
    "query" : {
        "match" : { "name" : "name17" }
    }
}

prefix

integration_test_person_group/person_group/_search
{ 
  "query": {
    "prefix" : { "name" : "name" }
  }
}

wildcard

integration_test_person_group/person_group/_search
{
    "query": {
        "wildcard" : { "name" : "name*" }
    }
}

regexp

integration_test_person_group/person_group/_search
{
    "query": {
        "regexp":{
            "name": "na.*17"
        }
    }
}

Jkes工作原理

索引工作原理:

  • 应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据。
  • 基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java Rest Client将创建/更新index配置。
  • 为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档
  • 为整个项目启动/更新Jkes Deleter Connector,用于删除文档
  • 拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer
  • 拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。
  • SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据
  • SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector

查询工作原理:

  • 查询服务通过rest api提供
  • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度
  • 查询服务是一个Spring Boot Application,使用docker打包为镜像
  • 查询服务提供多版本API,用于API进化和兼容
  • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
  • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

流程图

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

模块介绍

jkes-core

jkes-core是整个jkes的核心部分。主要包括以下功能:

  • annotation包提供了jkes的核心注解
  • elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping
  • kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Client
  • metadata包提供了核心的注解元数据的构建与结构化模型
  • event包提供了事件模型与容器
  • exception包提供了常见的Jkes异常
  • http包基于Apache Http Client封装了常见的http json请求
  • support包暴露了Jkes核心配置支持
  • util包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils

jkes-boot

jkes-boot用于与一些第三方开源框架进行集成。

当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

jkes-spring-data-jpa说明:

  • ContextSupport类用于从bean工厂获取Repository Bean
  • @EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型
  • EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件
  • SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook
  • audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据
  • exception包封装了常见异常
  • intercept包提供了AOP切点和切面
  • index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能

jkes-services

jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:

  • jkes-delete-connector
    • jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。
    • 借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活
  • jkes-search-service
    • jkes-search-service是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容
    • jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。
    • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度
    • 查询服务是一个Spring Boot Application,使用docker打包为镜像
    • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
    • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能

jkes-integration-test

jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率

开发

To build a development version you’ll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.

Contribute

  • Source Code: https://github.com/chaokunyang/jkes
  • Issue Tracker: https://github.com/chaokunyang/jkes/issues

LICENSE

This project is licensed under Apache License 2.0.

免责声明:文章来源于互联网,旨在传播企业管理销售管理研发管理、项目管理、移动办公软件开发技术和SaaS平台等知识,如果不慎侵犯了您的权益,请与翼发云联系删除,谢谢!

2017-09-19T17:39:35+08:002017-09-19 17:39:35|Categories: 软件开发技术|

No Comments

  1. Modafinil Provigil 2022年6月15日 at 下午3:02
    Your comment is awaiting moderation.

    Ils éprouvent souvent une somnolence diurne excessive. Il n’y a certainement aucune méthode universelle pour soulager la fatigue pendant un surmenage intellectuel intense. Les personnes ayant une prescription qui vivent dans un pays étaient importation de médicaments d’ordonnance est légale, Il y a la possibilité de acheter Modafinil en ligne d’une pharmacie en ligne recommandée. http://modafinilpascher.space/provigil-achat/ Il est aussi couramment utilisé comme drogue Nootropique pour amélioration de la mémoire, d’apprentissage, concentration de, et de se concentrer. Voyez lequel fonctionne le mieux pour vous.

  2. Modafinil i Norge 2022年6月28日 at 下午6:05
    Your comment is awaiting moderation.

    Modafinil 200 mg kan være vanebildande. Til forskjell fra amfetamin og lignende sentralstimulantia anses risikoen for avhengig utvikling er minimal. https://kjopmodafinil.online/hvordan-bestille-modafinil-via-internett/ Tanken på å kunne ta en pille som gjør at du yter bedre både i studier og på jobb er svært fristende.

  3. Modafinil Uten Resept 2022年6月28日 at 下午7:51
    Your comment is awaiting moderation.

    Det er uklart om bruk under graviditet er trygt. Mengden medisiner som brukes må kanskje justeres hos de med nyre- eller leverproblemer. Alle disse medikamentene virker ved å øke dopamin-nivåer i hjernen. https://kjopmodafinil.ru Det er tilgjengelig som en generisk medisin. Forvara läkemedlet på en plats där andra inte kan komma åt det.

  4. Achat Modafinil En Ligne 2022年8月21日 at 下午8:18
    Your comment is awaiting moderation.

    Le modafinil a Г©tГ© brevetГ© pour la premiГЁre fois en 1974 et est entrГ© en service en France en 1979. https://achatmodafinil.space/modvigil-en-ligne/ Il a Г©tГ© notГ© qu’il a “un effet stimulant plus robuste”, et les animaux qui y sont exposГ©s prГ©fГЁrent travailler plus longtemps.

  5. Acheter Modafinil 2022年8月21日 at 下午9:40
    Your comment is awaiting moderation.

    D’autres mГ©canismes ont Г©galement Г©tГ© postulГ©s. Le modafinil a Г©tГ© prescrit hors AMM comme antagoniste non compГ©titif du N-mГ©thyl-D-aspartate (NMDA) pour traiter les troubles cognitifs de la maladie de Parkinson, et il existe des preuves suggГ©rant qu’il pourrait amГ©liorer la fonction visuospatiale chez les personnes atteintes de la maladie d’Alzheimer. https://achatmodafinil.online/achat-modafinil/ Cependant, en 2012, la FDA a dГ©couvert que le modafinil rГ©pondait Г  la dГ©finition de l’agence d’un ” nouveau mГ©dicament В» en vertu de la loi fГ©dГ©rale sur les aliments, les mГ©dicaments et les cosmГ©tiques (FFDCA).

  6. Modafinil En Ligne 2022年8月22日 at 下午5:23
    Your comment is awaiting moderation.

    Le modafinil est la substance active de Provigil et est vendu sous le nom de marque Provigil pour le traitement de la somnolence excessive dans la narcolepsie. Il est Г©galement utilisГ© occasionnellement comme aide Г  l’Г©tude. https://achatmodafinil.ru/modalert-canada/ Le modafinil interagit avec les systГЁmes glutamate et sГ©rotonine et peut Г©galement avoir des effets anxiolytiques et nootropiques.

  7. acheter modafinil en ligne 2022年11月10日 at 上午3:39
    Your comment is awaiting moderation.

    Le remède augmente les capacités cognitives et est utilisé par ceux qui recherchent une concentration supérieure et souhaitent passer de longues heures de travail productives, sans que leur esprit ne vagabonde d’une pensée à une autre. https://fr.ulule.com/achat-modafinil-en-ligne/ Le Modafinil prix en France peut être très différent en fonction de l`endroit où l`on achète des pilules.

  8. acheter modafinil en france 2022年11月10日 at 上午3:53
    Your comment is awaiting moderation.

    Modafinil, qui est commercialisé sous plusieurs noms de marques tels que Modvigil, ModAlert, et Provigil, est un médicament pharmaceutique approuver par la FDA pour soigner différents troubles du sommeil. https://fr.ulule.com/modafinil-en-france/ Il offre une plus grande clarté d’esprit et permet de bénéficier d’une motivation très prononcée à effectuer les tâches que l’on a besoin de faire chaque jour.

  9. acheter modafinil en belgique 2022年11月10日 at 上午4:07
    Your comment is awaiting moderation.

    Dans les deux conditions, plusieurs études ont montré que Modafinil fonctionne bien comme un éveil promotion médicament. https://fr.ulule.com/modafinil-en-belgique/ Le médicament pour dormir vite permet aux personnes qui souffrent d’une fatigue inhabituelle de rester éveillés, sans effets secondaires ou presque.

  10. acheter modafinil en suisse 2022年11月10日 at 上午4:20
    Your comment is awaiting moderation.

    Vous pouvez soit consommer le montant total. https://fr.ulule.com/modafinil-en-suisse/ Modafinil a été utilisé comme traitement d’appoint pour améliorer ce symptôme, ainsi que les traitements traditionnels pour résoudre les problèmes d’apnée-hypopnée.

  11. acheter modafinil sans ordonnance 2022年11月10日 at 上午6:05
    Your comment is awaiting moderation.

    Ils éprouvent souvent une somnolence diurne excessive. https://fr.ulule.com/modafinil-sansordonnance/ Le remède s’emploie également pour lutter contre l’hypersomnie (sommeil profond ou excessif) idiopathique (d’origine inconnue).

Leave A Comment