上一节我们了解了 JanusGraph 的关系存储,主要是在 EdgeSerializer 中的序列化和反序列化,我们还要这次看看 IndexSerializer 的相关类。
基础类
IndexSerializer
用来序列化,反序列化
IndexProvider
IndexProvider 继承自 IndexInformation ,IndexInformation 主要判断是否支持某个 KeyInformation, IndexProvider 最主要的是 mutate 方法,该方法就是用来保存数据到底层存储系统。
KeyInformation
保存key的信息,有三个内部接口
StoreRetriever
能够根据key得到 KeyInformation
IndexRetriever
能够根据key 和 store得到 KeyInformation 根据store 得到 StoreRetriever
Retriever
根据 index 得到IndexRetriever
这几个比较混乱。主要实现在 IndexInfoRetriever 中。
IndexSerializer.IndexInfoRetriever
IndexInfoRetriever 继承自 KeyInformation.Retriever,只有一个 get 方法:
public static class IndexInfoRetriever implements KeyInformation.Retriever {
private final StandardJanusGraphTx transaction;
private IndexInfoRetriever(StandardJanusGraphTx tx) {
Preconditions.checkNotNull(tx);
transaction=tx;
}
@Override
public KeyInformation.IndexRetriever get(final String index) {
return new KeyInformation.IndexRetriever() {
final Map<String,KeyInformation.StoreRetriever> indexes = new ConcurrentHashMap<>();
@Override
public KeyInformation get(String store, String key) {
return get(store).get(key);
}
@Override
public KeyInformation.StoreRetriever get(final String store) {
if (indexes.get(store)==null) {
Preconditions.checkState(transaction!=null,"Retriever has not been initialized");
final MixedIndexType extIndex = getMixedIndex(store, transaction);
assert extIndex.getBackingIndexName().equals(index);
final ImmutableMap.Builder<String,KeyInformation> b = ImmutableMap.builder();
for (final ParameterIndexField field : extIndex.getFieldKeys()) b.put(key2Field(field),getKeyInformation(field));
final ImmutableMap<String,KeyInformation> infoMap = b.build();
final KeyInformation.StoreRetriever storeRetriever = infoMap::get;
indexes.put(store,storeRetriever);
}
return indexes.get(store);
}
};
}
}
通过看代码,我们发现其实都是几个map。
1. 首先 IndexInfoRetriever 里面有个 final Map<String,KeyInformation.StoreRetriever> indexes = new ConcurrentHashMap<>();
2. KeyInformation.StoreRetriever 实际上也就是一个 final ImmutableMap<String,KeyInformation> infoMap = b.build();
3. 调用 IndexInfoRetriever 的 get 方法,会调用getMixedIndex(store, transaction); 也就是说这个得到的只是 MixedIndexType。
RecordEntry
这个类 有三个属性,分别是 long relationId, Object value, PropertyKey key,这应该就代表了待建索引的一个记录。
IndexRecords
public static class IndexRecords extends ArrayList
看上去像一个二维数组,记录索引的更新。
IndexUpdate
这个类主要是提供一些计算索引更新的工具方法。
索引回顾
我们先回顾一下相关知识,主要是我们建索引的时候发生了什么。JanusGraphIndex nameIndex = management.buildIndex("name", Vertex.class).addKey(name).buildCompositeIndex();
private JanusGraphIndex createCompositeIndex(String indexName, ElementCategory elementCategory, boolean unique, JanusGraphSchemaType constraint, PropertyKey... keys) {
//
Preconditions.checkArgument(!unique || elementCategory == ElementCategory.VERTEX, "Unique indexes can only be created on vertices [%s]", indexName);
boolean allSingleKeys = true;
boolean oneNewKey = false;
for (PropertyKey key : keys) {
if (key.cardinality() != Cardinality.SINGLE) allSingleKeys = false;
if (key.isNew()) oneNewKey = true;
else updatedTypes.add((PropertyKeyVertex) key);
}
Cardinality indexCardinality;
if (unique) indexCardinality = Cardinality.SINGLE;
else indexCardinality = (allSingleKeys ? Cardinality.SET : Cardinality.LIST);
boolean canIndexBeEnabled = oneNewKey || (constraint != null && constraint.isNew());
TypeDefinitionMap def = new TypeDefinitionMap();
def.setValue(TypeDefinitionCategory.INTERNAL_INDEX, true);
def.setValue(TypeDefinitionCategory.ELEMENT_CATEGORY, elementCategory);
def.setValue(TypeDefinitionCategory.BACKING_INDEX, Token.INTERNAL_INDEX_NAME);
def.setValue(TypeDefinitionCategory.INDEXSTORE_NAME, indexName);
def.setValue(TypeDefinitionCategory.INDEX_CARDINALITY, indexCardinality);
def.setValue(TypeDefinitionCategory.STATUS, canIndexBeEnabled ? SchemaStatus.ENABLED : SchemaStatus.INSTALLED);
// 新建一个顶点。
JanusGraphSchemaVertex indexVertex = transaction.makeSchemaVertex(JanusGraphSchemaCategory.GRAPHINDEX, indexName, def);
for (int i = 0; i < keys.length; i++) {
Parameter[] paras = {ParameterType.INDEX_POSITION.getParameter(i)};
// 添加边,顶点分别是两个 index 和 propertykey
addSchemaEdge(indexVertex, keys[i], TypeDefinitionCategory.INDEX_FIELD, paras);
}
Preconditions.checkArgument(constraint == null || (elementCategory.isValidConstraint(constraint) && constraint instanceof JanusGraphSchemaVertex));
if (constraint != null) {
// 如果加了限制 ,在添加一条边。
addSchemaEdge(indexVertex, (JanusGraphSchemaVertex) constraint, TypeDefinitionCategory.INDEX_SCHEMA_CONSTRAINT, null);
}
updateSchemaVertex(indexVertex);
JanusGraphIndexWrapper index = new JanusGraphIndexWrapper(indexVertex.asIndexType());
if (!oneNewKey) updateIndex(index, SchemaAction.REGISTER_INDEX);
return index;
}
addSchemaEdge 方法
public JanusGraphEdge addSchemaEdge(JanusGraphVertex out, JanusGraphVertex in, TypeDefinitionCategory def, Object modifier) {
assert def.isEdge();
// 加一条边,边的 label 是 SchemaDefinitionEdge
JanusGraphEdge edge = addEdge(out, in, BaseLabel.SchemaDefinitionEdge);
TypeDefinitionDescription desc = new TypeDefinitionDescription(def, modifier);
edge.property(BaseKey.SchemaDefinitionDesc.name(), desc);
return edge;
}
可以看出其实就是新建一个 顶点,添加属性,然后添加边。
indexSerializer.getIndexUpdates(del)
我们现在就看看 index 如何序列化。
public Collection<IndexUpdate> getIndexUpdates(InternalRelation relation) {
assert relation.isNew() || relation.isRemoved();
final Set<IndexUpdate> updates = Sets.newHashSet();
final IndexUpdate.Type updateType = getUpdateType(relation);
final int ttl = updateType==IndexUpdate.Type.ADD?StandardJanusGraph.getTTL(relation):0;
for (final RelationType type : relation.getPropertyKeysDirect()) {
if (!(type instanceof PropertyKey)) continue;
final PropertyKey key = (PropertyKey)type;
for (final IndexType index : ((InternalRelationType)key).getKeyIndexes()) {
if (!indexAppliesTo(index,relation)) continue;
IndexUpdate update;
if (index instanceof CompositeIndexType) {
final CompositeIndexType iIndex= (CompositeIndexType) index;
final RecordEntry[] record = indexMatch(relation, iIndex);
if (record==null) continue;
update = new IndexUpdate<>(iIndex, updateType, getIndexKey(iIndex, record), getIndexEntry(iIndex, record, relation), relation);
} else {
assert relation.valueOrNull(key)!=null;
if (((MixedIndexType)index).getField(key).getStatus()== SchemaStatus.DISABLED) continue;
update = getMixedIndexUpdate(relation, key, relation.valueOrNull(key), (MixedIndexType) index, updateType);
}
if (ttl>0) update.setTTL(ttl);
updates.add(update);
}
}
return updates;
}
CompositeIndexType
我们先看 CompositeIndexType 部分,我们发现主要就是 indexMatch 方法 和 new IndexUpdate,得到某个 relation 相关的index:
public static RecordEntry[] indexMatch(JanusGraphRelation relation, CompositeIndexType index) {
// 得到所有的key。
final IndexField[] fields = index.getFieldKeys();
// 新建一个对应的数组
final RecordEntry[] match = new RecordEntry[fields.length];
for (int i = 0; i <fields.length; i++) {
final IndexField f = fields[i];
final Object value = relation.valueOrNull(f.getFieldKey());
if (value==null) return null; //No match
match[i] = new RecordEntry(relation.longId(),value,f.getFieldKey());
}
return match;
}
总的来说还是很简单的,得到所有的索引字段的值即可,但是假如一个索引有两个字段,我们每次更新其中一个字段,都会更新一次索引,这岂不是会很麻烦。
private StaticBuffer getIndexKey(CompositeIndexType index, Object[] values) {
final DataOutput out = serializer.getDataOutput(8*DEFAULT_OBJECT_BYTELEN + 8);
// 写入 indexType 的 ID
VariableLong.writePositive(out, index.getID());
final IndexField[] fields = index.getFieldKeys();
Preconditions.checkArgument(fields.length>0 && fields.length==values.length);
for (int i = 0; i < fields.length; i++) {
final IndexField f = fields[i];
final Object value = values[i];
Preconditions.checkNotNull(value);
// 写入 index 的值。
if (AttributeUtil.hasGenericDataType(f.getFieldKey())) {
out.writeClassAndObject(value);
} else {
assert value.getClass().equals(f.getFieldKey().dataType()) : value.getClass() + " - " + f.getFieldKey().dataType();
out.writeObjectNotNull(value);
}
}
StaticBuffer key = out.getStaticBuffer();
if (hashKeys) key = HashingUtil.hashPrefixKey(hashLength,key);
return key;
}
可以看出 compositeindex 数据的key结构,indexId+value(所有的)。
private Entry getIndexEntry(CompositeIndexType index, RecordEntry[] record, JanusGraphElement element) {
final DataOutput out = serializer.getDataOutput(1+8+8*record.length+4*8);
out.putByte(FIRST_INDEX_COLUMN_BYTE);
if (index.getCardinality()!=Cardinality.SINGLE) { // 代表是 SET 或者 LIST
// 写出 value 的 id
VariableLong.writePositive(out,element.longId());
if (index.getCardinality()!=Cardinality.SET) { // 如果是LIST
// 循环写出 relationId
for (final RecordEntry re : record) {
VariableLong.writePositive(out,re.relationId);
}
}
}
// column 和 value 的分界点。
final int valuePosition=out.getPosition();
if (element instanceof JanusGraphVertex) { // 如果是顶点
VariableLong.writePositive(out,element.longId());
} else {
assert element instanceof JanusGraphRelation;
final RelationIdentifier rid = (RelationIdentifier)element.id();
final long[] longs = rid.getLongRepresentation();
Preconditions.checkArgument(longs.length == 3 || longs.length == 4);
for (final long aLong : longs) VariableLong.writePositive(out, aLong);
}
return new StaticArrayEntry(out.getStaticBuffer(),valuePosition);
}
这两个都比较类似的,按照固定的格式,生成看key 和value。
getMixedIndexUpdate
上面看的是 compositeIndex 的序列化过程,还有 MixedIndex。return new IndexUpdate<>(index, updateType, element2String(element), new IndexEntry(key2Field(index.getField(key)), value), element);
private static String element2String(Object elementId) {
if (elementId instanceof Long) return longID2Name((Long)elementId);
else return ((RelationIdentifier) elementId).toString();
}
private static String key2Field(ParameterIndexField field) {
assert field!=null;
return ParameterType.MAPPED_NAME.findParameter(field.getParameters(),keyID2Name(field.getFieldKey()));
}
这两个部分有很多其他的类,比较乱,后续可以自己整理一下,但是整体意思就是得到一个 key value 的类。。
反序列化查询
查询过程稍微有点复杂,一般会通过读索引。后续进行分析。