public void put(int index, byte[] src, int offset, int length) { // check the byte array offset and length if ((offset | length | (offset + length) | (src.length - (offset + length))) < 0) { throw new IndexOutOfBoundsException(); }
//address 为byte arrary begining地址 //启始指针 address 为byte对象头的长度 16 + index启位置就是pos final long pos = address + index;
// index > 0 且 没有超出 limit 调用UNSAFE copyMemory0 进行拷贝分配 if (index >= 0 && pos <= addressLimit - length) { final long arrayAddress = BYTE_ARRAY_BASE_OFFSET + offset; UNSAFE.copyMemory(src, arrayAddress, heapMemory, pos, length); } else if (address > addressLimit) { throw new IllegalStateException("segment has been freed"); } else { // index is in fact invalid throw new IndexOutOfBoundsException(); } }
public NetworkBufferPool( int numberOfSegmentsToAllocate, int segmentSize, Duration requestSegmentsTimeout) { // 要分配的 内存段个数 this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate; // 单个内存段的大小 this.memorySegmentSize = segmentSize;
Preconditions.checkNotNull(requestSegmentsTimeout); checkArgument( requestSegmentsTimeout.toMillis() > 0, "The timeout for requesting exclusive buffers should be positive."); this.requestSegmentsTimeout = requestSegmentsTimeout;
final long sizeInLong = (long) segmentSize;
try { // 初始化 内存段双端队列 this.availableMemorySegments = new ArrayDeque<>(numberOfSegmentsToAllocate); } catch (OutOfMemoryError err) { throw new OutOfMemoryError( "Could not allocate buffer queue of length " + numberOfSegmentsToAllocate + " - " + err.getMessage()); }
try { //分配 堆外的非池化的segment for (int i = 0; i < numberOfSegmentsToAllocate; i++) { availableMemorySegments.add( MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null)); } } catch (OutOfMemoryError err) { int allocated = availableMemorySegments.size();
// free some memory availableMemorySegments.clear();
long requiredMb = (sizeInLong * numberOfSegmentsToAllocate) >> 20; long allocatedMb = (sizeInLong * allocated) >> 20; long missingMb = requiredMb - allocatedMb;