Skip to content

Commit

Permalink
Added ChunkHasher class
Browse files Browse the repository at this point in the history
Wraps CallableChunkHasher removed from Torrent class
  • Loading branch information
zanella committed Nov 25, 2016
1 parent 0acd62d commit fab42c2
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 42 deletions.
127 changes: 127 additions & 0 deletions core/src/main/java/com/turn/ttorrent/common/ChunkHasher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.turn.ttorrent.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.*;

/**
* Hashes the torrent's file pieces
*
* @author rzanella
*/
public class ChunkHasher {

private static final Logger logger = LoggerFactory.getLogger(ChunkHasher.class);

private final ExecutorService executor;

/**
* Matches the number of threads
*/
private final ArrayBlockingQueue<MessageDigest> mdQueue;

/**
* A limited pool of buffers, so that:
*
* - We don't thrash the memory with a lot of short-lived objects
* - We don't use a lot of memory when we're ingesting a huge amount of data
*
* The ByteBuffers are array backed, so the APIs they get sent to have no need to instantiate one
*/

private final ArrayBlockingQueue<ByteBuffer> bbQueue;

/**
* Creates the resources needed to hash the enqueued pieces
*
* @param threads number of workers to create
* @param pieceLength size of the pieces that will be received, has to be informed upon creation since
* the user will get the buffer from here
*/
public ChunkHasher(int threads, int pieceLength) throws InterruptedException, NoSuchAlgorithmException {
mdQueue = new ArrayBlockingQueue<MessageDigest>(threads);

for (int i = 0; i < threads; i++) {
mdQueue.add(MessageDigest.getInstance("SHA-1"));
}

bbQueue = new ArrayBlockingQueue<ByteBuffer>(threads + 1);

for (int i = 0; i < threads + 1; i++) {
bbQueue.add(ByteBuffer.allocate(pieceLength));
}

executor = Executors.newFixedThreadPool(threads);
}

/**
*
* @param buffer
* @return Future so that the user can order the results on it's side
* @throws NoSuchAlgorithmException
*/
public Future<String> enqueueChunk(ByteBuffer buffer) throws NoSuchAlgorithmException {
return executor.submit(new CallableChunkHasher(buffer));
}

/**
*
* @return an array-backed ByteBuffer of pieceLength size
* @throws InterruptedException
*/
public ByteBuffer getBuffer() throws InterruptedException {
return bbQueue.take();
}

/**
* Clears the internal resources
*
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException {
// Request orderly executor shutdown and wait for hashing tasks to
// complete.
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(10);
}
}

/**
* A {@link Callable} to hash a data chunk.
*
* @author mpetazzoni
*/
private class CallableChunkHasher implements Callable<String> {

private final ByteBuffer data;

CallableChunkHasher(ByteBuffer rentedBuffer) throws NoSuchAlgorithmException {
this.data = rentedBuffer;
}

@Override
public String call() throws UnsupportedEncodingException, InterruptedException {
final MessageDigest md = mdQueue.remove();

this.data.mark();
this.data.reset();
md.update(this.data);

final String hash = new String(md.digest(), Torrent.BYTE_ENCODING);

this.data.clear();
bbQueue.add(this.data);

md.reset();
mdQueue.add(md);

return hash;
}
}
}
57 changes: 15 additions & 42 deletions core/src/main/java/com/turn/ttorrent/common/Torrent.java
Original file line number Diff line number Diff line change
Expand Up @@ -667,33 +667,7 @@ private static Torrent create(File parent, List<File> files, int pieceLength,
return new Torrent(baos.toByteArray(), true);
}

/**
* A {@link Callable} to hash a data chunk.
*
* @author mpetazzoni
*/
private static class CallableChunkHasher implements Callable<String> {

private final MessageDigest md;
private final ByteBuffer data;

CallableChunkHasher(ByteBuffer buffer) throws NoSuchAlgorithmException {
this.md = MessageDigest.getInstance("SHA-1");

this.data = ByteBuffer.allocate(buffer.remaining());
buffer.mark();
this.data.put(buffer);
this.data.clear();
buffer.reset();
}

@Override
public String call() throws UnsupportedEncodingException {
this.md.reset();
this.md.update(this.data.array());
return new String(md.digest(), Torrent.BYTE_ENCODING);
}
}

/**
* Return the concatenation of the SHA-1 hashes of a file's pieces.
Expand All @@ -710,30 +684,30 @@ public String call() throws UnsupportedEncodingException {
*
* @param file The file to hash.
*/
private static String hashFile(File file, int pieceLenght)
private static String hashFile(File file, int pieceLength)
throws InterruptedException, IOException, NoSuchAlgorithmException {
return Torrent.hashFiles(Arrays.asList(new File[] { file }), pieceLenght);
return Torrent.hashFiles(Arrays.asList(new File[] { file }), pieceLength);
}

private static String hashFiles(List<File> files, int pieceLenght)
private static String hashFiles(List<File> files, int pieceLength)
throws InterruptedException, IOException, NoSuchAlgorithmException {
int threads = getHashingThreadsCount();
ExecutorService executor = Executors.newFixedThreadPool(threads);
ByteBuffer buffer = ByteBuffer.allocate(pieceLenght);
List<Future<String>> results = new LinkedList<Future<String>>();
StringBuilder hashes = new StringBuilder();
final ChunkHasher chunkHasher = new ChunkHasher(threads, pieceLength);

long length = 0L;
int pieces = 0;

ByteBuffer buffer = null;
long start = System.nanoTime();
for (File file : files) {
logger.info("Hashing data from {} with {} threads ({} pieces)...",
new Object[] {
file.getName(),
threads,
(int) (Math.ceil(
(double)file.length() / pieceLenght))
(double)file.length() / pieceLength))
});

length += file.length();
Expand All @@ -743,10 +717,14 @@ private static String hashFiles(List<File> files, int pieceLenght)
int step = 10;

try {
buffer = chunkHasher.getBuffer();

while (channel.read(buffer) > 0) {
if (buffer.remaining() == 0) {
buffer.clear();
results.add(executor.submit(new CallableChunkHasher(buffer)));
results.add(chunkHasher.enqueueChunk(buffer));

buffer = chunkHasher.getBuffer();
}

if (results.size() >= threads) {
Expand All @@ -765,24 +743,19 @@ private static String hashFiles(List<File> files, int pieceLenght)
}

// Hash the last bit, if any
if (buffer.position() > 0) {
if ((buffer != null) && (buffer.position() > 0)) {
buffer.limit(buffer.position());
buffer.position(0);
results.add(executor.submit(new CallableChunkHasher(buffer)));
results.add(chunkHasher.enqueueChunk(buffer));
}

pieces += accumulateHashes(hashes, results);

// Request orderly executor shutdown and wait for hashing tasks to
// complete.
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(10);
}
chunkHasher.shutdown();
long elapsed = System.nanoTime() - start;

int expectedPieces = (int) (Math.ceil(
(double)length / pieceLenght));
(double)length / pieceLength));
logger.info("Hashed {} file(s) ({} bytes) in {} pieces ({} expected) in {}ms.",
new Object[] {
files.size(),
Expand Down

0 comments on commit fab42c2

Please sign in to comment.