GH-3530: Optimize PLAIN encoding and decoding with direct ByteBuffer I/O#3565
GH-3530: Optimize PLAIN encoding and decoding with direct ByteBuffer I/O#3565iemejia wants to merge 3 commits into
Conversation
…uffer I/O Replace ByteBufferInputStream and LittleEndianDataInputStream wrappers with direct ByteBuffer access for all PLAIN value readers and writers. Readers (PlainValuesReader, BooleanPlainValuesReader, BinaryPlainValuesReader, FixedLenByteArrayPlainValuesReader) now hold a little-endian ByteBuffer obtained from initFromPage() and call getInt/getLong/getFloat/getDouble directly, eliminating per-value stream overhead. Writers (PlainValuesWriter, BooleanPlainValuesWriter, FixedLenByteArrayPlainValuesWriter) write through CapacityByteArrayOutputStream's new writeInt/writeLong methods, which put values directly into the NIO slab buffer in little-endian order, avoiding temporary byte-array allocation. Supporting changes: - CapacityByteArrayOutputStream: allocate slabs with ByteOrder.LITTLE_ENDIAN, add writeInt(int) and writeLong(long) for single-value NIO writes. - BytesInput: add zero-copy writeTo(ByteBuffer) and toByteArray() using bulk ByteBuffer.get() instead of stream copy. - LittleEndianDataOutputStream: batch single-byte writes into single write(buf, 0, N) calls for writeShort/writeInt. Includes JMH benchmarks (PlainEncodingBenchmark, PlainDecodingBenchmark) covering all 7 primitive types for both encoding and decoding.
| int length = BytesUtils.readIntLittleEndian(in); | ||
| return Binary.fromConstantByteBuffer(in.slice(length)); | ||
| } catch (IOException | RuntimeException e) { | ||
| throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); |
There was a problem hiding this comment.
Should we keep the ParquetDecodingException? Otherwise we're throwing the raw {IOException,RuntimeException} which is a behavioral change.
There was a problem hiding this comment.
Done. Added try/catch wrapping RuntimeException (which covers BufferUnderflowException, IllegalArgumentException, etc.) into ParquetDecodingException in both readBytes() and skip().
| if (available > 0) { | ||
| this.buffer = stream.slice(available).order(ByteOrder.LITTLE_ENDIAN); | ||
| } else { | ||
| this.buffer = ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN); |
There was a problem hiding this comment.
Should we create a constant for the ByteBuffer.allocate(0).order(ByteOrder.LITTLE_ENDIAN);?
There was a problem hiding this comment.
Done. Extracted EMPTY_LE_BUFFER as a private static final read-only constant (ByteBuffer.allocate(0).order(LITTLE_ENDIAN).asReadOnlyBuffer()), used via .duplicate() in initFromPage. Same pattern applied in PlainValuesReader.
| try { | ||
| return Binary.fromConstantByteBuffer(in.slice(length)); | ||
| } catch (IOException | RuntimeException e) { | ||
| throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e); |
There was a problem hiding this comment.
Same as above, should we keep the wrapped ParquetDecodingException?
There was a problem hiding this comment.
Done. Added try/catch wrapping in readBytes(), skip(), and skip(int n). Also using Math.multiplyExact(n, length) in skip(int n) to detect overflow.
| try { | ||
| skipBytesFully(n * 8); | ||
| } catch (IOException e) { | ||
| throw new ParquetDecodingException("could not skip " + n + " double values", e); |
There was a problem hiding this comment.
Same here, do we want to keep the ParquetDecodingException?
There was a problem hiding this comment.
Done. All readXxx() and skip(int n) methods across DoublePlainValuesReader, FloatPlainValuesReader, IntegerPlainValuesReader, and LongPlainValuesReader now wrap RuntimeException in ParquetDecodingException with descriptive messages matching the original error contract.
| public abstract class PlainValuesReader extends ValuesReader { | ||
| private static final Logger LOG = LoggerFactory.getLogger(PlainValuesReader.class); | ||
|
|
||
| protected LittleEndianDataInputStream in; |
There was a problem hiding this comment.
We should go through the deprecation cycle here, but is anything using this outside of the project itself?
There was a problem hiding this comment.
Good point. The old protected LittleEndianDataInputStream in field is now protected ByteBuffer buffer — the type change is binary-incompatible regardless, so a deprecation cycle wouldn't help external subclasses (they'd get a compile error either way). I searched the project and only internal subclasses (the 4 inner classes) access this field. I think this is acceptable given this class was never annotated @Public and the field type change makes deprecation impractical. WDYT?
| * mutable {@code BAOS.getBuf()}. | ||
| */ | ||
| @Override | ||
| public byte[] toByteArray() { |
There was a problem hiding this comment.
This overrides a deprecated API, as a follow-up we probably should move the internal calls to the new API:
@deprecated Use {@link #toByteBuffer(ByteBufferAllocator, Consumer)}There was a problem hiding this comment.
Addressed in f0bdac6. The base-class toInputStream() now tries getInternalByteBuffer() first (zero-copy fast path) before falling back to the deprecated toByteBuffer(). Also added getInternalByteBuffer() and toInputStream() overrides to ByteArrayBytesInput so the byte-array-backed path is zero-copy too. Added @SuppressWarnings("deprecation") on the intentional overrides.
| public int getNextOffset() { | ||
| return in.getNextOffset(); | ||
| public void skip(int n) { | ||
| bitIndex += n; |
There was a problem hiding this comment.
Should we check for bounds, and throw a ParquetDecodingException in case of out of bounds?
There was a problem hiding this comment.
Done. Added a bitCount field (set to length * 8 in initFromPage) and an explicit bounds check in readBoolean() that throws ParquetDecodingException with a descriptive message when attempting to read beyond the page boundary.
| } catch (IOException e) { | ||
| throw new ParquetDecodingException("could not skip " + n + " double values", e); | ||
| } | ||
| buffer.position(buffer.position() + n * 8); |
There was a problem hiding this comment.
Should we use Math.multiplyExact here and below?
There was a problem hiding this comment.
Done. Applied Math.multiplyExact in all skip(int n) methods: Math.multiplyExact(n, 8) for double/long, Math.multiplyExact(n, 4) for float/int, and Math.multiplyExact(n, length) in FixedLenByteArrayPlainValuesReader. Overflow now produces an ArithmeticException which gets caught and wrapped in ParquetDecodingException.
- Wrap RuntimeException in ParquetDecodingException in all read/skip methods to preserve existing error contract - Extract EMPTY_LE_BUFFER constant for empty page initialization - Add bounds check in BooleanPlainValuesReader.readBoolean() - Use Math.multiplyExact in skip(int n) to detect overflow
… toByteArray/toByteBuffer - Base-class toInputStream() now tries getInternalByteBuffer() first for zero-copy path before falling back to deprecated toByteBuffer() - ByteArrayBytesInput: add getInternalByteBuffer() and toInputStream() overrides to avoid unnecessary copy through BAOS - Add @SuppressWarnings("deprecation") on intentional deprecated overrides
Part of #3530 — Apache Parquet Java Performance Improvements
Summary
Replace
ByteBufferInputStreamandLittleEndianDataInputStreamwrappers with directByteBufferaccess for all PLAIN value readers and writers.Readers (
PlainValuesReader,BooleanPlainValuesReader,BinaryPlainValuesReader,FixedLenByteArrayPlainValuesReader): hold a little-endianByteBufferfrominitFromPage()and callgetInt/getLong/getFloat/getDoubledirectly, eliminating per-value stream overhead.Writers (
PlainValuesWriter,BooleanPlainValuesWriter,FixedLenByteArrayPlainValuesWriter): write throughCapacityByteArrayOutputStream's newwriteInt/writeLongmethods which put values directly into the NIO slab buffer in little-endian order, avoiding temporary byte-array allocation.Supporting changes:
CapacityByteArrayOutputStream: allocate slabs withByteOrder.LITTLE_ENDIAN, addwriteInt(int)andwriteLong(long)for single-value NIO writes.BytesInput: add zero-copywriteTo(ByteBuffer)andtoByteArray()using bulkByteBuffer.get()instead of stream copy.LittleEndianDataOutputStream: batch single-byte writes into singlewrite(buf, 0, N)calls forwriteShort/writeInt.Includes JMH benchmarks (
PlainEncodingBenchmark,PlainDecodingBenchmark) covering all 7 primitive types for both encoding and decoding.Benchmark results
Environment: JDK 25.0.3 (Temurin), OpenJDK 64-Bit Server VM, JMH 1.37, Linux x86_64.
Decoding (100K values/iteration, 3 forks x 5 iterations, throughput mode):
Encoding:
(*) decodeLong/Double show JIT variance across forks (error bars >20%); true steady-state likely ~13x consistent with INT32/FLOAT.