vearch源码阅读——http重要接口一览
基本名词解释
Vearch 是对大规模深度学习向量进行高性能相似搜索的弹性分布式系统。可以做一个和mysql类比的抽象理解,vearch就是一个分布式数据库,只不过存的数据的某些属性可能是向量。下面对vearch里数据相关一些名词做解释:
db :一个库,类似mysql的一个数据库
space :一个表空间,对应mysql的一个
table
partition :一个
space
上的数据分布在多个partition上,每个partition只能存在一台机器上,一个机器可以有多个partitiondoc :一个doc可以理解成mysql存的一条数据
field :doc的属性,一个field可以理解成mysql table中的一列。
field有自己的
type
,如果type是vector
,表明这个field是一个向量
在verach运行过程中,机器分为三种角色:
router :接收对数据(doc)增删改查的请求,然后把请求发往
ps
master :接收对集群的操作,比如对
db、space、partition
的增删,还有对集群的维护操作ps :ps上存着一个或多个
partition
,ps的工作就是接收router
的请求并调用c++ faiss库
负责具体的向量运算了,是真正的worker
主要接口与问题
向量召回的步骤简单理解为这样:
模型同学训练好自己的模型
把所有物料经过计算算出向量,存在某个地方
线上服务机器把上一步算好的所有物料向量都拉到本地存着
一条请求过来,用户特征经过模型计算出一个用户向量
用户向量在步骤[3]中存的物料向量里搜索,取出最相近的topn个物料向量
召回完成
如果有了vearch,上述步骤[3]和步骤[5]都可以变成一条rpc请求
对于步骤[3], 离线任务可以将自己训练的物料向量通过vearch的
插入&批量插入
接口存在vearch里对于步骤[5], 线上服务可以将本地搜索过程也变成一个rpc或http请求,通过vearch的
查询(search)
接口找到topn相近的向量
这么做的好处是:
避免线上服务定时拉取更新物料向量
线上服务机器不需要存储物料向量,省内存
但也存在一些担忧:
通过rpc请求进行向量搜索时间大概率是没有本地搜索快
正常召回只需要返回topN物料的ID以及用户向量与物料向量的
内积
即可,但是如果有的业务需要返回物料的向量,比如我要top3000
物料的向量,那体积过大,明显不现实,只有把物料向量存在本地才能这么玩
在上述过程中,业务方主要需要使用的这几个接口:
单条插入&批量插入
查询
下面对vearch接口工作流程进行一个梳理
vearch工作步骤
Vearch对用户http请求调用步骤大致都是:
router收到指令,开始构建
RouterRequest
解析请求,填充
RouterRequest
物料相关的信息根据缓存和ETCD,找到对应的partition的具体机器信息
发送
RouterRequest
给partition所在的ps机器partition所在的ps机器接收到请求,调用gamma引擎执行c++代码进行向量运算
这里提出几个问题,最后回答
创建新的doc,怎么选择对应存储的partition
router的缓存都有什么
怎么根据doc的id确认对应的partition
单条插入
单条插入的请求如下
curl -XPOST -H "content-type: application/json" -d' { "field1": "value1", "field2": "value2", "field3": { "feature": [0.1, 0.2] } } ' http://router_server/$db_name/$space_name复制代码
请求中的field
与space的field
一一对应,如果field类型是向量,通过feature:[xxx, xxx]
写入,并且维数要与space中该field对应
对应调用handleUpdateDoc()
方法,这个方法通过传入的http请求初始化一个UpdateRequest
,这里需要注意的是为UpdateRequest
设置PKey这里
URLParamID = "_id" args.Doc.PKey = params.ByName(URLParamID)复制代码
如果传入参数有 _id
,那么PKey就等于传入的_id
,否则为""(空)。我们这里是插入,不需要传入_id
,所以_id
为空。而当调用查询之类接口时,会传入_id
。插入新物料时,后续的SetDocsField(docs)
方法中会为插入的物料自动生成一个id,方法大概就是自增,这里不深究,只要知道router会为新插入的物料生成唯一id就行了。
完成后,调用updateDoc()
函数处理初始化的UpdateRequest
。
updateDoc()
updateDoc
函数通过传入的 pb UpdateRequest
构建一个RouterRequest
并发送给partition
,步骤分为:
装填
RouterRequest
发送请求给partition
RouterRequest的结构
RouterRequest
结构如下
head
里是请求的基本信息:包括用户名,密码,目标dbname以及spacenamemd
是一个map,记录了请求的方法和idkey为
HandlerType
时,value表示该请求对应的方法(增删改查)key为
MessageID
时,value表示本条请求的唯一iddocs
是本条请求的物料信息space
是本条请求对应的space的信息sendMap
的key是partitionID,value是要发给这个partition的信息,其中items
包含了doc信息,其他别的借口时候再补充
updateDoc
关键的步骤代码如下:
request := client.NewRouterRequest(ctx, docService.client) request.SetMsgID() .SetMethod(client.ReplaceDocHandler) .SetHead(args.Head) .SetSpace() .SetDocs(docs) .SetDocsField() .PartitionDocs() items := request.Execute()复制代码
装填
首先通过一串函数装填RouterRequest
SetMsgID()
为本条request生成唯一id,填入md[MessageID]
。SetMethod()
填写md[HandlerType]
,表明本request是一条更新请求SetSpace()
填写request的space
字段,获取方式是先从router本地缓存找,找不到就去etcd里拿SetDocs()
填写request的docs
数组SetDocsField()
为docs
数组里每一个doc
填写PKey
和Fields
字段,创建时,PKey为空,
generateUUID
为doc自动生成一个id这个函数在Fields里加了一个PKey
PartitionDocs()
填充sendMap
字段,就是把docs
字段里的doc都加进sendmap[id]
的items
数组里id是根据doc的PKey做哈希算出来的,这决定了该doc存在哪个partition上
RouterRequest
装填完毕,下一步就是发送了
router发送给partition
发送是通过RouterRequest
的Execute()
函数
所有要给partition
发送请求的接口最后都会落在这个函数上,这个函数分为两步
构建
rpcClient
正式发送请求
先来看构建rpcClient
上一步中,我们已经确认了要发往每个partition
的数据,存在RouterRequest
的sendMap
成员里
首先通过partitionID
获取对应partition的信息,包括机器地址等信息。获取的方式就是先从本地缓存中取,如果没有就从ETCD
里拿,这里介绍一下router的本地缓存,router本地缓存如下图所示:
缓存相关都在router.client.masterclient.cliCache
下面
router.client.masterclient.cliCache
本身继承了sync.map
,存储了nodeID
对应的rpcClient
,避免多次创建partitionCache
里存放了partitionID
对应的partition相关信息,包括机器节点ID(没有地址)serverCache
里存放了NodeID
对应的机器信息,包括IP、端口等如果在缓存里没有找到,router会访问
etcd
获取相关数据,router.client.masterclient.store
就是etcd相关
继续回到构建rpcClient
,构建它的关键就是填写ip和端口。从缓存和etcd拿到nodeID
后,调用GetOrCreateRPCClient(ctx, nodeID)
,同样从缓存和etcd拿到nodeID对应的具体机器信息(地址、端口),并构造一个rpcClient
,完毕
构建好以后,发送就完事了,远程调用的方法是UnaryHandler
partition处理请求
方法入口是UnaryHandler.Execute
,根据请求是插入,调用update(ctx, store, req.Items)
这里贴一段代码,后面进入gamma引擎了,这里不做研究
func update(ctx context.Context, store PartitionStore, items []*vearchpb.Item) { item := items[0] docGamma := &gamma.Doc{Fields: item.Doc.Fields} docBytes := docGamma.Serialize() docCmd := &vearchpb.DocCmd{Type: vearchpb.OpType_REPLACE, Doc: docBytes} if err := store.Write(ctx, docCmd, nil, nil); err != nil { log.Error("Add doc failed, err: [%s]", err.Error()) item.Err = vearchpb.NewError(vearchpb.ErrorEnum_INTERNAL_ERROR, err).GetError() } else { item.Doc.Fields = nil } }复制代码
批量插入
批量插入的请求如下,每一个插入的物料要两行
第一行固定 {"index":{"_id":"xxx"}} \n,_id可以为空,router会自动生成唯一id
第二行里是物料每一列的值
每一行用结尾需要‘\n’分开
curl -H "content-type: application/json" -XPOST -d' {"index": {"_id": "v1"}}\n {"field1": "value", "field2": {"feature": []}}\n {"index": {"_id": "v2"}}\n {"field1": "value", "field2": {"feature": []}}\n ' http://router_server/$db_name/$space_name/_bulk复制代码
router中对应的处理方法是handleBulk()
,该方法通过http请求初始化一个BulkRequest
,主要就是解析请求中每一个doc
,把他们填入BulkRequest.docs
里,BulkRequest
结构如下:
message BulkRequest{ RequestHead head = 1; repeated Document docs = 4; }复制代码
填充完后,调用bulk()
方法填充一个RouterRequest
并发送,步骤和单条插入里的updateDoc()
方法类似。
reply := handler.docService.bulk(ctx, args)复制代码
与单条插入不同的是,批量插入的rpc请求中call的方法是BatchHandler
,ps接到router批量插入请求,调用对应的处理方法是bulk()
查询
查询接口示例如下
curl -H "content-type: application/json" -XPOST -d' { "query": { "sum": [{ "field": "field_name", "feature": [0.1, 0.2, 0.3, 0.4, 0.5], "min_score": 0.9, "boost": 0.5 }], "filter": [{ "range": { "field_name": { "gte": 160, "lte": 180 } } }, { "term": { "field_name": ["100", "200", "300"], "operator": "or" } }] }, "retrieval_param": { "nprobe": 20 }, "fields": ["field1", "field2"], "is_brute_search": 0, "online_log_level": "debug", "quick": false, "vector_value": false, "client_type": "leader", "l2_sqrt": false, "sort": [{"field1":{"order": "asc"}}], "size": 10 } ' http://router_server/$db_name/$space_name/_search复制代码
工作的方法与前面大同小异,无非是构造请求然后发送,这里mark一下重要的参数
sum:
跟需要查询的特征,下面又有几个参数:field 指定创建表时特征字段的名称。类似
select col from table
中的col
feature 传递特征,维数和定义表结构时维数必须相同。
min_score 指定返回结果中分值必须大于等于0.9,两个向量计算结果相似度在0-1之间,min_score可以指定返回结果分值最小值,max_score可以指定最大值。如设置: “min_score”: 0.8,“max_score”: 0.95 代表过滤0.8<= 分值<= 0.95 的结果。同时另外一种分值过滤的方式是使用: “symbol”:”>=”,”value”:0.9 这种组合方式,symbol支持的值类型包含: > 、 >= 、 <、 <= 4种,value及min_score、max_score值在0到1之间。
boost指定相似度的权重,比如两个向量相似度分值是0.7,boost设置成0.5之后,返回的结果中会将分值0.7乘以0.5即0.35。
size
指定最多返回的结果数量,通过这个参数指定topN
。若请求url中设置了size值http://router_server/$db_name/$space_name/_search?size=20优先使用url中指定的size值。quick
搜索结果默认将PQ召回向量进行计算和精排,为了加快服务端处理速度设置成true可以指定只召回,不做计算和精排。(这个不是很理解)
问题回答
创建新的doc,怎么选择对应存储的partition
A: router为doc生成新的唯一id,然后通过hash函数计算partition
router的缓存都有什么
A: 缓存相关都在
router.client.masterclient.cliCache
下面router.client.masterclient.cliCache
本身继承了sync.map
,存储了nodeID
对应的rpcClient
,避免多次创建partitionCache
里存放了partitionID
对应的partition相关信息,包括机器节点ID(没有地址)serverCache
里存放了NodeID
对应的机器信息,包括IP、端口等如果在缓存里没有找到,router会访问
etcd
获取相关数据,router.client.masterclient.store
就是etcd相关
怎么根据doc的id确认对应的partition
A:hash函数,代码对应这句:
partitionID := r.space.PartitionId(murmur3.Sum32WithSeed(cbbytes.StringToByte(doc.PKey), 0))
作者:后场村卷王
链接:https://juejin.cn/post/7031835366550142989