EncodingIdMapper put 方法:


long eId = encode( inputId );

dataCache.set( nodeId, eId );

groupCache.set( nodeId, group.id() );

candidateHighestSetIndex.offer( nodeId );

dataCache.set( nodeId, eId );

DynamicLongArray.set(long index, long value )

at( index ).set( index, value );

1. at
{DynamicNumberArray<N extends NumberArray<N>>

    @Override
    public N at( long index )
    {
        if ( index >= length() )
        {
            synchronizedAddChunk( index );
            // 扩容 new OffHeapLongArray( length, defaultValue, base );
        }

        int chunkIndex = chunkIndex( index );
        return chunks[chunkIndex];
    }
}

at 方法先看情况给 chunks 扩容,实际上就是新建一个 OffHeapLongArray ,放到 chunks 数租,然后返回一个。实际上就是返回当前的顶点应该所在的 LongArray。

2. set
{OffHeapLongArray
    
    @Override
    public void set( long index, long value )
    {
        UnsafeUtil.putLong( addressOf( index ), value );
    }
    
    addressOf(index)
    {
        index = rebase( index ); // index - base;
        if ( index < 0 || index >= length )
        {
            throw new ArrayIndexOutOfBoundsException( "Requested index " + index + ", but length is " + length );
        }
        // 
        return address + (index << shift); // 在当前位置左移三位,也就是乘以8,因为保存一个 long 要 8位?
    }
    putLong:
    unsafe.putLong( address, value );
    
}


dataCache 的set逻辑比较清楚了,dataCache 是一个 DynamicLongArray ,里面有一个 chunks = OffHeapLongArray[] , 每个 OffHeapLongArray 长度是一百万,数据增多了就新建 OffHeapLongArray。 然后通过 Unsafe 的方法赋值,因为 ne4j 的 id 是连续的,所以直接移动八位就是下一个数据的存储地址。然后 putLong 放进去。 这里我们其实有一些疑问,现在其实是将顶点的值 value 放到了它的 id 对应的位置上,我们要查一个 id 对应的值只需要找到对应地址即可, 我们需要查询一个 value 对应的 id 还是没法查,所以我们接下来要研究一下怎么查。

看 get 方法,看名字就知道这是一个二分查找,二分查找有个前提是数据是排好序的,然后不断得到中间的值和现有的值对比。

private long binarySearch( Object inputId, int groupId )
{
    long low = 0;
    long high = highestSetIndex;
    // 得到加密后的 value 
    long x = encode( inputId );
    // 这个方法得到 x 的基数
    int rIndex = radixOf( x );
    
    // for 循环中,rIndex 和 sortBuckets 二维数组的 第一行的值作比较,如果满足了条件,就取对应的第二行和下一个第二行的值作为 low 和 high。
    // 根据这个逻辑我们大概能判断出来,sortBuckets 中放的是分位点,第一行放的是 rIndex 分位点,第二行放的是 index 的分位点,而且 index 对应的值一个个是排好序的,不然没法二分查找。
    // 
    for ( int k = 0; k < sortBuckets.length; k++ )
    {
        if ( rIndex <= sortBuckets[k][0] )//bucketRange[k] > rIndex )
        {
            low = sortBuckets[k][1];
            high = (k == sortBuckets.length - 1) ? highestSetIndex : sortBuckets[k + 1][1];
            break;
        }
    }

    long returnVal = binarySearch( x, inputId, low, high, groupId );
    if ( returnVal == ID_NOT_FOUND )
    {
        low = 0;
        high = highestSetIndex;
        returnVal = binarySearch( x, inputId, low, high, groupId );
    }
    return returnVal;
}


private long binarySearch( long x, Object inputId, long low, long high, int groupId )
{
    while ( low <= high )
    {
        // 中值点 low 和 high 都是 index
        long mid = low + (high - low) / 2;//(low + high) / 2;
        
        // trackerCache 能根据值 index 得到 真正的 index ?
        
        long dataIndex = trackerCache.get( mid );
        if ( dataIndex == ID_NOT_FOUND )
        {
            return ID_NOT_FOUND;
        }
        // 查找 value,dataCache 是我们上面放数据的 DynamicLongArray ,根据 index 能查到 值。
        long midValue = dataCache.get( dataIndex );
        switch ( unsignedDifference( clearCollision( midValue ), x ) )
        {
        case EQ:
            // We found the value we were looking for. Question now is whether or not it's the only
            // of its kind. Not all values that there are duplicates of are considered collisions,
            // read more in detectAndMarkCollisions(). So regardless we need to check previous/next
            // if they are the same value.
            boolean leftEq = mid > 0 && unsignedCompare( x, dataValue( mid - 1 ), CompareType.EQ );
            boolean rightEq = mid < highestSetIndex && unsignedCompare( x, dataValue( mid + 1 ), CompareType.EQ );
            if ( leftEq || rightEq )
            {   // OK so there are actually multiple equal data values here, we need to go through them all
                // to be sure we find the correct one.
                return findFromEIdRange( leftEq ? mid - 1 : mid, rightEq ? mid + 1 : mid, midValue, inputId, x, groupId );
            }
            // This is the only value here, let's do a simple comparison with correct group id and return
            return groupOf( dataIndex ) == groupId ? dataIndex : ID_NOT_FOUND;
        case LT:
            low = mid + 1;
            break;
        default:
            high = mid - 1;
            break;
        }
    }
    return ID_NOT_FOUND;
}

整个查询到这里感觉很乱,我们先理一下,我们要在 dataCache 中查找某个 x,首先通过 radixOf 方法得到一个 rIndex,再通过 rIndex 在 sortBuckets 数组中找到 high 和 low, 在第二个方法中,high 和 low,可以通过 trackerCache 得到 dataIndex,然后 dataCache.get( dataIndex ) 得到了 value。

其实熟悉排序算法的话,第二个方法并不难明白。在有的排序算法中,排序算法涉及到交换,而我们保存在内存的数据进行交换是很麻烦的,所以我们再加一个数组记录数据下标,每次不交换数据只交换下标。 这种方式本身不复杂,只是多了一层,可能比较绕,我们用一个例子梳理一下。 假如:要排序的三个数 data[2,4,1], 我们先记下 index[0,1,2],然后进行排序,以冒泡排序为例: 1. 比较 data[index[0]], data[index[1]],无需交换,然后比较 data[index[1]], data[index[2]],发现 4比1大,然后交换index为[0,2,1], 2. 然后继续比较 data[index[0]], data[index[1]],发现2大于1,交换 index 为 [2,0,1] , 3. 得到的排序结果是 data[2,3,1], index[2,0,1] 。 如果是别的排序方法类似,例如快速排序,我们先记下 data[2,5,1,3,0],index[0,1,2,3,4],首先所有的数据和 data[index[0]] 比较,得到index[2,4,0,1,3],然后继续得到 index[4,2,0,3,1].

排序好了以后,我们需要进行查找操作,而二分查找也需要先根据index找到对应位置的数据, 例如上面5个数据我要查找 3 ,首先 left=0,right=4, mid =2,而 data[index[2]]=2,小于3,所以 left=2+1,mid=3 ,data[index[3]]=3,查找到了对应的索引是 3,

所以我们可以猜想这里也是这个道理 dataCahce 排序的时候并没有交换数据二是通过 trackerCache 交换了对应的索引。 而要查找 dataCahce 的数据,首先在得到 mid ,这个 mid 实际上是数据按照大小的排名,然后从 trackerCache 得到排名为 mid 的索引, 然后调用 dataCache 得到排名为 mid 的数据,和已有的数据进行比较。

我们看看 trackerCache 注释:

// Ordering information about values in dataCache; the ordering of values in dataCache remains unchanged.
// in prepare() this array is populated and changed along with how dataCache items "move around" so that
// they end up sorted. Again, dataCache remains unchanged, only the ordering information is kept here.
// Each index in trackerCache points to a dataCache index, where the value in dataCache contains the
// encoded input id, used to match against the input id that is looked up during binary search.

注释说的很清楚,trackerCache 是 dataCache 的排序信息,dataCache 的数据是不变的, prepare() 执行的时候 trackerCache 生成并且随着 dataCache 排序而改变,记录 dataCache 的顺序, trackerCache 的每个索引指向 dataCache 的一个位置,where 放置了 加密的 inputId ,在 binarysearch 用来匹配输入的 inputId。那重点就是 prepare() 方法。

trackerCache 的问题解决了,sortBuckets 大概是什么意思?dataCache 的长度可能有几十亿,即便是二分查找,也要很久,所以我们是否可以大概确定一个上下限呢?其实就是桶排序,或者基数排序。 例如:data[1,2,3,4,5,6,7,8,9,10] 我们要查找 9的位置,是否可以先把它分成 [1,2,3,4,5],[6,7,8,9,10] 两个,记下他们的上下限 [0,5],[1,10]。来一个数据先缩小一下它的范围,再进行二分查找?

而对于存在内存中的数据,需要进行基数排序,然后进行快速排序是有一定要求的。

我们看一下需要看一下 encode 和 radixOf 的代码,

encode:Encodes String into a long with very small chance of collision

@Override
public long encode( Object s )
{
    int[] val = encodeInt( (String) s ); 
    return (long) val[0] << 32 | val[1] & UPPER_INT_MASK; // 连接 val[0] 和 val[1]
}

private int[] encodeInt( String s )
{
    // 将 String 变成bytes
    int inputLength = s.length();
    byte[] bytes = new byte[inputLength];
    for ( int i = 0; i < inputLength; i++ )
    {
        bytes[i] = (byte) ((s.charAt( i )) % 127);
    }
    
    // 调用 remap, 将 bytes[i] 转化为 reMap[bytes[i]]
    reMap( bytes, inputLength );
    
    // encode 长度小于 7 的 简单编码
    if ( inputLength <= encodingThreshold )
    {
        
        return simplestCode( bytes, inputLength );
    }
    int[] codes = new int[numCodes];
    for ( int i = 0; i < numCodes; )
    {
        codes[i] = getCode( bytes, inputLength, 1 );
        codes[i + 1] = getCode( bytes, inputLength, inputLength - 1 );
        i += 2;
    }
    int carryOver = lengthEncoder( inputLength ) << 1;
    int temp = 0;
    for ( int i = 0; i < numCodes; i++ )
    {
        temp = codes[i] & FOURTH_BYTE;
        codes[i] = codes[i] >>> 8 | carryOver << 24;
        carryOver = temp;
    }
    return codes;
}
private int lengthEncoder( int length )
{
    if ( length < 32 )
    {
        return length;
    }
    else if ( length <= 96 )
    {
        return length >> 1;
    }
    else if ( length <= 324 )
    {
        return length >> 2;
    }
    else if ( length <= 580 )
    {
        return length >> 3;
    }
    else if ( length <= 836 )
    {
        return length >> 4;
    }
    else
    {
        return 127;
    }
}

// reMap 方法就是给 bytes 做一个一一隐射,将 bytes[i] 转化为 reMap[bytes[i]],0<=bytes[i]<=255
private void reMap( byte[] bytes, int inputLength )
{
    for ( int i = 0; i < inputLength; i++ )
    {
        if ( reMap[bytes[i]] == -1 )
        {
            synchronized ( this )
            {
                if ( reMap[bytes[i]] == -1 )
                {
                    reMap[bytes[i]] = (byte) (numChars++ % 256);
                }
            }
        }
        bytes[i] = reMap[bytes[i]];
    }
}

// 
private int[] simplestCode( byte[] bytes, int inputLength )
{
    int[] codes = new int[]{0, 0};
    codes[0] = max( inputLength, 1 ) << 25; 长度左移 25位,inputLength_0000000000
    codes[1] = 0;
    for ( int i = 0; i < 3 && i < inputLength; i++ )
    {
        codes[0] = codes[0] | bytes[i] << ((2 - i) * 8);  bytes[i] 左移 16,8,0 位后和已有的 codes[0] 取或,
    }
    for ( int i = 3; i < 7 && i < inputLength; i++ )
    {
        codes[1] = codes[1] | (bytes[i]) << ((6 - i) * 8);bytes[i] 左移 16,8,0 位后和已有的 codes[1]取或,
    }
    return codes;
}

private int getCode( byte[] bytes, int inputLength, int order )
{
    long code = 0;
    int size = inputLength;
    for ( int i = 0; i < size; i++ )
    {
        //code += (((long)bytes[(i*order) % size]) << (i % 7)*8);
        long val = bytes[(i * order) % size];
        for ( int k = 1; k <= i; k++ )
        {
            long prev = val;
            val = (val << 4) + prev;//% Integer.MAX_VALUE;
        }
        code += val;
    }
    return (int) code;
}

encode 太复杂了,我们知道是尽量保证不重复的long即可,编码的前 8位 是长度编码,后面的是数据编码,中间留了一个0.

radixOf : Calculates and keeps radix counts. Uses a {@link RadixCalculator} to calculate an integer radix value from a long value.

protected static final int RADIX_BITS = 24; // 24 位
protected static final long LENGTH_BITS = 0xFE000000_00000000L; // 1111_1110_00000000000000000 (56个0)
protected static final int LENGTH_MASK = (int) (LENGTH_BITS >>> (64 - RADIX_BITS)); // 右移40位 还剩24位  1111_111_00000000000000 (17 个零)
protected static final int HASHCODE_MASK = (int) (0x00FFFF00_00000000L >>> (64 - RADIX_BITS)); 右移40位 还剩24位  0000_0000_1111_1111_1111_1111
/**
  * Radix optimized for strings encoded into long by {@link StringEncoder}.
  */
 public static class String extends RadixCalculator
 {
     @Override
     public int radixOf( long value )
     {
         int index = (int) (value >>> (64 - RADIX_BITS));  RADIX_BITS = 24 右移40位 还剩高位24个位,
         index = ((index & LENGTH_MASK) >>> 1) | (index & HASHCODE_MASK);  //  (FE00 & index) | (00FFFF & index) 其实就是取 index 的前7位和后16位。也就是去掉第17位
         return index; // 整个逻辑就是先去掉低40位变成24位 index,然后去掉第17位。
     }
 }

编码最后去掉第17位是因为第17位是 0 没有意义。

这里大概知道用了什么原理,接下来我们 prepare 方法,看看 sort 的过程。我们大概已经知道使用了 基数排序和快速排序。

ParallelSort 的 run 方法,首先是 sortRadix

// 实际上就是将数据按照 thread 数量分为几个桶,
private long[][] sortRadix() throws InterruptedException
{
    long[][] rangeParams = new long[threads][2];
    int[] bucketRange = new int[threads];
    // 初始化的工作,初始化应该主要就是得到几个分桶。
    Workers<TrackerInitializer> initializers = new Workers<>( "TrackerInitializer" );
    // sortBuckets 保存分桶信息,一共两行,第一行是捅的 index ,第二行是数据的 index
    sortBuckets = new long[threads][2];
    
    long dataSize = highestSetIndex + 1;
    long bucketSize = dataSize / threads;
    long count = 0;
    long fullCount = 0;
    progress.started( "SPLIT" );
    //遍历所有数据 radixIndex, 每个基数对应的数据的个数,基数 = 先去掉低40位变成24位 index,然后第17位变为0。
    for ( int i = 0, threadIndex = 0; i < radixIndexCount.length && threadIndex < threads; i++ )
    {
        // 已找到的的数据 ,也就是基数 > bucketSize
        if ( (count + radixIndexCount[i]) > bucketSize )
        {
            // 现在要根据线程数将整个基数分成几个桶, bucketRange 保存分位点位置,
            bucketRange[threadIndex] = count == 0 ? i : i - 1; 
            // rangeParams 的第一行是截至目前数据总量
            rangeParams[threadIndex][0] = fullCount;
            if ( count != 0 )
            {
                // rangeParams 的第二行是当前数据量
                rangeParams[threadIndex][1] = count;
                fullCount += count;
                progress.add( count );
                count = radixIndexCount[i];
            }
            else
            {
                rangeParams[threadIndex][1] = radixIndexCount[i];
                fullCount += radixIndexCount[i];
                progress.add( radixIndexCount[i] );
            }
            initializers.start( new TrackerInitializer( threadIndex, rangeParams[threadIndex],
                    threadIndex > 0 ? bucketRange[threadIndex - 1] : -1, bucketRange[threadIndex],
                    sortBuckets[threadIndex] ) );
            threadIndex++;
        }
        else
        {
            count += radixIndexCount[i];
        }
        if ( threadIndex == threads - 1 || i == radixIndexCount.length - 1 )
        {
            bucketRange[threadIndex] = radixIndexCount.length;
            rangeParams[threadIndex][0] = fullCount;
            rangeParams[threadIndex][1] = dataSize - fullCount;
            initializers.start( new TrackerInitializer( threadIndex, rangeParams[threadIndex],
                    threadIndex > 0 ? bucketRange[threadIndex - 1] : -1, bucketRange[threadIndex],
                    sortBuckets[threadIndex] ) );
            break;
        }
    }
    progress.done();

    // In the loop above where we split up radixes into buckets, we start one thread per bucket whose
    // job is to populate trackerCache and sortBuckets where each thread will not touch the same
    // data indexes as any other thread. Here we wait for them all to finish.
    Throwable error = initializers.await();
    long[] bucketIndex = new long[threads];
    int i = 0;
    for ( TrackerInitializer initializer : initializers )
    {
        bucketIndex[i++] = initializer.bucketIndex;
    }
    if ( error != null )
    {
        throw new AssertionError( error.getMessage() + "\n" + dumpBuckets( rangeParams, bucketRange, bucketIndex ),
                error );
    }
    return rangeParams;
}

TrackerInitializer 的 start 启动一个线程,我们看看 run 方法。

public void run()
{
    for ( long i = 0; i <= highestSetIndex; i++ )
    {
        // 计算基数,也就是在 整个基数桶 中的位置
        int rIndex = radixCalculator.radixOf( comparator.dataValue( dataCache.get( i ) ) );
        
        // 看看这个数是否属于这个桶。
        if ( rIndex > lowRadixRange && rIndex <= highRadixRange )
        {
            // rangeParams 保存了当前线程在桶中的上下限。
            long trackerIndex = rangeParams[0] + bucketIndex++;
            assert tracker.get( trackerIndex ) == -1 :
                    "Overlapping buckets i:" + i + ", k:" + threadIndex + ", index:" + trackerIndex;
            
            // 这个设置到 tracker 中。每个 trackerIndex 对应一个 i,每个 i 也对应一个 trackerIndex。
            tracker.set( trackerIndex, i );
            
            // 如果到达了上界。
            if ( bucketIndex == rangeParams[1] )
            {
                result[0] = highRadixRange;
                result[1] = rangeParams[0];
            }
        }
    }
}

TrackerInitializer 的 run 方法作用就是在 tracker 设置 trackerIndex 和 i 的对应关系, trackerIndex 是 i 的基数在桶中的位置 + 当前 bucket 的 Index bucketIndex 是递增的。

然后我们看 SortWorker 排序。关键逻辑在 partition 中。

private long partition( long leftIndex, long rightIndex, long pivotIndex )
{
    // left 和 right ,right 为倒数第二个
    long li = leftIndex;
    long ri = rightIndex - 2;
    long pi = pivotIndex;
    long pivot = clearCollision( dataCache.get( tracker.get( pi ) ) );
    
    // save pivot in last index,先把 pivot 放到最后
    tracker.swap( pi, rightIndex - 1 );
    long left = clearCollision( dataCache.get( tracker.get( li ) ) );
    long right = clearCollision( dataCache.get( tracker.get( ri ) ) );
    while ( li < ri )
    {
        // 左边和 pivot 比较
        if ( comparator.lt( left, pivot ) )
        {   
            // this value is on the correct side of the pivot, moving on。 左边的比 pivot 小,不用管,直接看下一个。
            left = clearCollision( dataCache.get( tracker.get( ++li ) ) );
        }
        else if ( comparator.ge( right, pivot ) )
        {   // this value is on the correct side of the pivot, moving on 。右边的比 pivot 大,不用管,下一个
            right = clearCollision( dataCache.get( tracker.get( --ri ) ) );
        }
        else
        {   // this value is on the wrong side of the pivot, swapping, 右边的比 pivot 小,左边的比 pivot 大,交换一下。
            tracker.swap( li, ri );
            long temp = left;
            left = right;
            right = temp;
        }
    }
    long partingIndex = ri;
    if ( comparator.lt( right, pivot ) )
    {
        partingIndex++;
    }
    // restore pivot
    tracker.swap( rightIndex - 1, partingIndex );
    return partingIndex;
}

这是快速排序的实现方法。 截至目前,其实还有一点没解决,就是 tracker ,tracker 中存放了 trackerIndex 和 index 的键值对,里面是怎么存储的其实值得我们研究。