001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.compress.compressors.CompressorInputStream;
027import org.apache.commons.compress.utils.BoundedInputStream;
028import org.apache.commons.compress.utils.ByteUtils;
029import org.apache.commons.compress.utils.CountingInputStream;
030import org.apache.commons.compress.utils.IOUtils;
031import org.apache.commons.compress.utils.InputStreamStatistics;
032
033/**
034 * CompressorInputStream for the framing Snappy format.
035 *
036 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p>
037 *
038 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
039 * @since 1.7
040 */
041public class FramedSnappyCompressorInputStream extends CompressorInputStream
042    implements InputStreamStatistics {
043
044    /**
045     * package private for tests only.
046     */
047    static final long MASK_OFFSET = 0xa282ead8L;
048
049    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
050    static final int COMPRESSED_CHUNK_TYPE = 0;
051    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
052    private static final int PADDING_CHUNK_TYPE = 0xfe;
053    private static final int MIN_UNSKIPPABLE_TYPE = 2;
054    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
055    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
056
057    // used by FramedSnappyCompressorOutputStream as well
058    static final byte[] SZ_SIGNATURE = new byte[] { //NOSONAR
059        (byte) STREAM_IDENTIFIER_TYPE, // tag
060        6, 0, 0, // length
061        's', 'N', 'a', 'P', 'p', 'Y'
062    };
063
064    private long unreadBytes;
065    private final CountingInputStream countingStream;
066
067    /** The underlying stream to read compressed data from */
068    private final PushbackInputStream inputStream;
069
070    /** The dialect to expect */
071    private final FramedSnappyDialect dialect;
072
073    private SnappyCompressorInputStream currentCompressedChunk;
074
075    // used in no-arg read method
076    private final byte[] oneByte = new byte[1];
077
078    private boolean endReached, inUncompressedChunk;
079
080    private int uncompressedBytesRemaining;
081    private long expectedChecksum = -1;
082    private final int blockSize;
083    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
084
085    private final ByteUtils.ByteSupplier supplier = this::readOneByte;
086
087    /**
088     * Constructs a new input stream that decompresses
089     * snappy-framed-compressed data from the specified input stream
090     * using the {@link FramedSnappyDialect#STANDARD} dialect.
091     * @param in  the InputStream from which to read the compressed data
092     * @throws IOException if reading fails
093     */
094    public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
095        this(in, FramedSnappyDialect.STANDARD);
096    }
097
098    /**
099     * Constructs a new input stream that decompresses snappy-framed-compressed data
100     * from the specified input stream.
101     * @param in  the InputStream from which to read the compressed data
102     * @param dialect the dialect used by the compressed stream
103     * @throws IOException if reading fails
104     */
105    public FramedSnappyCompressorInputStream(final InputStream in,
106                                             final FramedSnappyDialect dialect)
107        throws IOException {
108        this(in, SnappyCompressorInputStream.DEFAULT_BLOCK_SIZE, dialect);
109    }
110
111    /**
112     * Constructs a new input stream that decompresses snappy-framed-compressed data
113     * from the specified input stream.
114     * @param in  the InputStream from which to read the compressed data
115     * @param blockSize the block size to use for the compressed stream
116     * @param dialect the dialect used by the compressed stream
117     * @throws IOException if reading fails
118     * @throws IllegalArgumentException if blockSize is not bigger than 0
119     * @since 1.14
120     */
121    public FramedSnappyCompressorInputStream(final InputStream in,
122                                             final int blockSize,
123                                             final FramedSnappyDialect dialect)
124        throws IOException {
125        if (blockSize <= 0) {
126            throw new IllegalArgumentException("blockSize must be bigger than 0");
127        }
128        countingStream = new CountingInputStream(in);
129        this.inputStream = new PushbackInputStream(countingStream, 1);
130        this.blockSize = blockSize;
131        this.dialect = dialect;
132        if (dialect.hasStreamIdentifier()) {
133            readStreamIdentifier();
134        }
135    }
136
137    /** {@inheritDoc} */
138    @Override
139    public int read() throws IOException {
140        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
141    }
142
143    /** {@inheritDoc} */
144    @Override
145    public void close() throws IOException {
146        try {
147            if (currentCompressedChunk != null) {
148                currentCompressedChunk.close();
149                currentCompressedChunk = null;
150            }
151        } finally {
152            inputStream.close();
153        }
154    }
155
156    /** {@inheritDoc} */
157    @Override
158    public int read(final byte[] b, final int off, final int len) throws IOException {
159        if (len == 0) {
160            return 0;
161        }
162        int read = readOnce(b, off, len);
163        if (read == -1) {
164            readNextBlock();
165            if (endReached) {
166                return -1;
167            }
168            read = readOnce(b, off, len);
169        }
170        return read;
171    }
172
173    /** {@inheritDoc} */
174    @Override
175    public int available() throws IOException {
176        if (inUncompressedChunk) {
177            return Math.min(uncompressedBytesRemaining,
178                            inputStream.available());
179        }
180        if (currentCompressedChunk != null) {
181            return currentCompressedChunk.available();
182        }
183        return 0;
184    }
185
186    /**
187     * @since 1.17
188     */
189    @Override
190    public long getCompressedCount() {
191        return countingStream.getBytesRead() - unreadBytes;
192    }
193
194    /**
195     * Read from the current chunk into the given array.
196     *
197     * @return -1 if there is no current chunk or the number of bytes
198     * read from the current chunk (which may be -1 if the end of the
199     * chunk is reached).
200     */
201    private int readOnce(final byte[] b, final int off, final int len) throws IOException {
202        int read = -1;
203        if (inUncompressedChunk) {
204            final int amount = Math.min(uncompressedBytesRemaining, len);
205            if (amount == 0) {
206                return -1;
207            }
208            read = inputStream.read(b, off, amount);
209            if (read != -1) {
210                uncompressedBytesRemaining -= read;
211                count(read);
212            }
213        } else if (currentCompressedChunk != null) {
214            final long before = currentCompressedChunk.getBytesRead();
215            read = currentCompressedChunk.read(b, off, len);
216            if (read == -1) {
217                currentCompressedChunk.close();
218                currentCompressedChunk = null;
219            } else {
220                count(currentCompressedChunk.getBytesRead() - before);
221            }
222        }
223        if (read > 0) {
224            checksum.update(b, off, read);
225        }
226        return read;
227    }
228
229    private void readNextBlock() throws IOException {
230        verifyLastChecksumAndReset();
231        inUncompressedChunk = false;
232        final int type = readOneByte();
233        if (type == -1) {
234            endReached = true;
235        } else if (type == STREAM_IDENTIFIER_TYPE) {
236            inputStream.unread(type);
237            unreadBytes++;
238            pushedBackBytes(1);
239            readStreamIdentifier();
240            readNextBlock();
241        } else if (type == PADDING_CHUNK_TYPE
242                   || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) {
243            skipBlock();
244            readNextBlock();
245        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
246            throw new IOException("Unskippable chunk with type " + type
247                                  + " (hex " + Integer.toHexString(type) + ")"
248                                  + " detected.");
249        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
250            inUncompressedChunk = true;
251            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
252            if (uncompressedBytesRemaining < 0) {
253                throw new IOException("Found illegal chunk with negative size");
254            }
255            expectedChecksum = unmask(readCrc());
256        } else if (type == COMPRESSED_CHUNK_TYPE) {
257            final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
258            final long size = readSize() - (expectChecksum ? 4L : 0L);
259            if (size < 0) {
260                throw new IOException("Found illegal chunk with negative size");
261            }
262            if (expectChecksum) {
263                expectedChecksum = unmask(readCrc());
264            } else {
265                expectedChecksum = -1;
266            }
267            currentCompressedChunk =
268                new SnappyCompressorInputStream(new BoundedInputStream(inputStream, size), blockSize);
269            // constructor reads uncompressed size
270            count(currentCompressedChunk.getBytesRead());
271        } else {
272            // impossible as all potential byte values have been covered
273            throw new IOException("Unknown chunk type " + type
274                                  + " detected.");
275        }
276    }
277
278    private long readCrc() throws IOException {
279        final byte[] b = new byte[4];
280        final int read = IOUtils.readFully(inputStream, b);
281        count(read);
282        if (read != 4) {
283            throw new IOException("Premature end of stream");
284        }
285        return ByteUtils.fromLittleEndian(b);
286    }
287
288    static long unmask(long x) {
289        // ugly, maybe we should just have used ints and deal with the
290        // overflow
291        x -= MASK_OFFSET;
292        x &= 0xffffFFFFL;
293        return ((x >> 17) | (x << 15)) & 0xffffFFFFL;
294    }
295
296    private int readSize() throws IOException {
297        return (int) ByteUtils.fromLittleEndian(supplier, 3);
298    }
299
300    private void skipBlock() throws IOException {
301        final int size = readSize();
302        if (size < 0) {
303            throw new IOException("Found illegal chunk with negative size");
304        }
305        final long read = IOUtils.skip(inputStream, size);
306        count(read);
307        if (read != size) {
308            throw new IOException("Premature end of stream");
309        }
310    }
311
312    private void readStreamIdentifier() throws IOException {
313        final byte[] b = new byte[10];
314        final int read = IOUtils.readFully(inputStream, b);
315        count(read);
316        if (10 != read || !matches(b, 10)) {
317            throw new IOException("Not a framed Snappy stream");
318        }
319    }
320
321    private int readOneByte() throws IOException {
322        final int b = inputStream.read();
323        if (b != -1) {
324            count(1);
325            return b & 0xFF;
326        }
327        return -1;
328    }
329
330    private void verifyLastChecksumAndReset() throws IOException {
331        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
332            throw new IOException("Checksum verification failed");
333        }
334        expectedChecksum = -1;
335        checksum.reset();
336    }
337
338    /**
339     * Checks if the signature matches what is expected for a .sz file.
340     *
341     * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p>
342     *
343     * @param signature the bytes to check
344     * @param length    the number of bytes to check
345     * @return          true if this is a .sz stream, false otherwise
346     */
347    public static boolean matches(final byte[] signature, final int length) {
348
349        if (length < SZ_SIGNATURE.length) {
350            return false;
351        }
352
353        byte[] shortenedSig = signature;
354        if (signature.length > SZ_SIGNATURE.length) {
355            shortenedSig = new byte[SZ_SIGNATURE.length];
356            System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length);
357        }
358
359        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
360    }
361
362}