/*
Copyright 2017 The Pony MessagePack Developers
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
class ZeroCopyReader
"""
A reader optimized for zero-copy extraction of byte ranges.
Returns `val` views into the internal buffer instead of
copying into fresh `iso` arrays.
Zero-copy succeeds when the requested range falls within a
single internal chunk. When data spans chunk boundaries, the
reader falls back to copying — no worse than `buffered.Reader`.
This reader is specific to the msgpack package and is not
interoperable with APIs that expect `buffered.Reader`. Use it
with `MessagePackZeroCopyDecoder` for zero-copy decoding, or
use `MessagePackStreamingDecoder` which uses it internally.
Decoded values returned via `block()` may hold references to
the reader's internal chunks. As long as a decoded value is
live, its source chunk stays in memory. For most use cases
(decode, process, discard) this is not a concern. Callers who
cache decoded values should be aware of this lifetime
relationship.
"""
embed _chunks: Array[(Array[U8] val, USize)]
var _available: USize = 0
new create() =>
_chunks = Array[(Array[U8] val, USize)]
fun size(): USize =>
"""
Return the number of available bytes.
"""
_available
fun ref append(data: ByteSeq) =>
"""
Add a chunk of data. The reader holds a `val` reference to
the chunk's underlying array. No copy is performed.
"""
let data_array =
match data
| let data': Array[U8] val => data'
| let data': String => data'.array()
end
_available = _available + data_array.size()
_chunks.push((data_array, 0))
fun ref block(len: USize): Array[U8] val ? =>
"""
Return `len` bytes. If the range falls within a single
chunk, returns a `trim`'d view (zero-copy). If it spans
chunks, copies into a new array (fallback).
"""
if _available < len then error end
if len == 0 then
return recover val Array[U8] end
end
_available = _available - len
(let data, let offset) = _chunks(0)?
let avail = data.size() - offset
if avail >= len then
// Fast path: single chunk, zero-copy
let result = data.trim(offset, offset + len)
if avail == len then
_chunks.shift()?
else
_chunks(0)? = (data, offset + len)
end
result
else
// Slow path: spans chunks, must copy
_block_copy(len)?
end
fun ref skip(n: USize) ? =>
"""
Skip `n` bytes. Errors if fewer than `n` bytes are
available.
"""
if _available >= n then
_available = _available - n
var rem = n
while rem > 0 do
(var data, var offset) = _chunks(0)?
let avail = data.size() - offset
if avail > rem then
_chunks(0)? = (data, offset + rem)
break
end
rem = rem - avail
_chunks.shift()?
end
else
error
end
fun ref u8(): U8 ? =>
"""
Get a U8. Errors if there isn't enough data.
"""
if _available >= 1 then
_byte()?
else
error
end
fun ref i8(): I8 ? =>
"""
Get an I8.
"""
u8()?.i8()
fun ref u16_be(): U16 ? =>
"""
Get a big-endian U16.
"""
let num_bytes = USize(2)
if _available >= num_bytes then
(var data, var offset) = _chunks(0)?
if (data.size() - offset) >= num_bytes then
let r =
ifdef bigendian then
data.read_u16(offset)?
else
data.read_u16(offset)?.bswap()
end
offset = offset + num_bytes
_available = _available - num_bytes
if offset < data.size() then
_chunks(0)? = (data, offset)
else
_chunks.shift()?
end
r
else
(u8()?.u16() << 8) or u8()?.u16()
end
else
error
end
fun ref i16_be(): I16 ? =>
"""
Get a big-endian I16.
"""
u16_be()?.i16()
fun ref u32_be(): U32 ? =>
"""
Get a big-endian U32.
"""
let num_bytes = USize(4)
if _available >= num_bytes then
(var data, var offset) = _chunks(0)?
if (data.size() - offset) >= num_bytes then
let r =
ifdef bigendian then
data.read_u32(offset)?
else
data.read_u32(offset)?.bswap()
end
offset = offset + num_bytes
_available = _available - num_bytes
if offset < data.size() then
_chunks(0)? = (data, offset)
else
_chunks.shift()?
end
r
else
(u8()?.u32() << 24) or (u8()?.u32() << 16) or
(u8()?.u32() << 8) or u8()?.u32()
end
else
error
end
fun ref i32_be(): I32 ? =>
"""
Get a big-endian I32.
"""
u32_be()?.i32()
fun ref u64_be(): U64 ? =>
"""
Get a big-endian U64.
"""
let num_bytes = USize(8)
if _available >= num_bytes then
(var data, var offset) = _chunks(0)?
if (data.size() - offset) >= num_bytes then
let r =
ifdef bigendian then
data.read_u64(offset)?
else
data.read_u64(offset)?.bswap()
end
offset = offset + num_bytes
_available = _available - num_bytes
if offset < data.size() then
_chunks(0)? = (data, offset)
else
_chunks.shift()?
end
r
else
(u8()?.u64() << 56) or (u8()?.u64() << 48) or
(u8()?.u64() << 40) or (u8()?.u64() << 32) or
(u8()?.u64() << 24) or (u8()?.u64() << 16) or
(u8()?.u64() << 8) or u8()?.u64()
end
else
error
end
fun ref i64_be(): I64 ? =>
"""
Get a big-endian I64.
"""
u64_be()?.i64()
fun ref f32_be(): F32 ? =>
"""
Get a big-endian F32.
"""
F32.from_bits(u32_be()?)
fun ref f64_be(): F64 ? =>
"""
Get a big-endian F64.
"""
F64.from_bits(u64_be()?)
fun peek_u8(offset: USize = 0): U8 ? =>
"""
Peek at a U8 at the given offset without consuming it.
Errors if there isn't enough data.
"""
_peek_byte(offset)?
fun peek_u16_be(offset: USize = 0): U16 ? =>
"""
Peek at a big-endian U16 at the given offset without
consuming it.
"""
(peek_u8(offset)?.u16() << 8) or
peek_u8(offset + 1)?.u16()
fun peek_u32_be(offset: USize = 0): U32 ? =>
"""
Peek at a big-endian U32 at the given offset without
consuming it.
"""
(peek_u16_be(offset)?.u32() << 16) or
peek_u16_be(offset + 2)?.u32()
fun ref _byte(): U8 ? =>
"""
Get a single byte.
"""
(var data, var offset) = _chunks(0)?
let r = data(offset)?
offset = offset + 1
_available = _available - 1
if offset < data.size() then
_chunks(0)? = (data, offset)
else
_chunks.shift()?
end
r
fun _peek_byte(offset: USize = 0): U8 ? =>
"""
Get the byte at the given offset without moving the cursor.
Errors if the offset is beyond available data.
"""
var offset' = offset
var i: USize = 0
while i < _chunks.size() do
(let data, let node_offset) = _chunks(i)?
offset' = offset' + node_offset
let data_size = data.size()
if offset' >= data_size then
offset' = offset' - data_size
else
return data(offset')?
end
i = i + 1
end
error
fun ref _block_copy(len: USize): Array[U8] val ? =>
"""
Copy `len` bytes from multiple chunks into a new array.
Consumes fully-exhausted chunks.
"""
var out = recover iso Array[U8] .> undefined(len) end
var i = USize(0)
while i < len do
(let data, let offset) = _chunks(0)?
let avail = data.size() - offset
let need = len - i
let copy_len = need.min(avail)
out = recover iso
let r = consume ref out
data.copy_to(r, offset, i, copy_len)
consume r
end
if avail > need then
_chunks(0)? = (data, offset + need)
break
end
i = i + copy_len
_chunks.shift()?
end
consume out