#!/usr/bin/python3.1 -O''' Library for making torrent files'''from functools import reducefrom hashlib import sha1from operator import addfrom os import stat, listdirfrom os.path import isdir, join as pathjoin, basenamefrom time import time# As recommended BEP 003PIECE_LENGTH = 2 ** 18#### File hashing helper functions and classes###class ChunkedReader(object): ''' An object to read many bite-size pieces of files at once ''' def __init__(self, fnames, piece_size): ''' Takes a list of file names to read from. Opens the first for an eventual call to read(len). ''' self.fnames = fnames self._open_new() self.piece_size = piece_size def _open_new(self): ''' Open the next file in the list for reading, allowing exceptions for missing files to propogate ''' self.f = open(self.fnames[0], 'rb') self.fnames = self.fnames[1:] def __iter__(self): ''' Creates an iterator for the files, calling read() until empty ''' class ChunkIterator(object): def __init__(self, f): self.f = f def __next__(self): data = self.f.read() if not len(data): raise StopIteration() return data # Python 2 iteration ChunkIterator.next = ChunkIterator.__next__ return ChunkIterator(self) def read(self): ''' Reads (length) bytes from the file list; will always return exactly (length) bytes unless it is at the very end of the last file. ''' b = bytearray(self.f.read(self.piece_size)) try: while len(b) < self.piece_size: self._open_new() b.extend(self.f.read(self.piece_size - len(b))) finally: return bytes(b)# When uploading a directory, BitTorrent requires a file list consisting of a# list of dictionaries with length=(length) and path=(list, of, dirnames).def build_filelist(target, path=()): l = [] for i in listdir(pathjoin(target, *path)): this = path + (i,) # Try to recurse directories try: l += build_filelist(target, this) # Not a directory except: l.append({ 'length': stat(pathjoin(target, *this)).st_size, 'path': this }) return l#### BitTorrent encoding/decoding functions###def bencode(s, encoding="utf-8"): ''' bencodes a set of Python objects. Behavior only defined for list, dict, tuple, str, int, bytes, bytearray, and dict. Returns a bytes object that can be written directly into a .torrent file''' actions = { list : lambda l : b''.join(( b'l', b''.join(map(bencode, l)), b'e' )), dict : lambda d : b''.join(( b'd', b''.join(map(bencode, reduce(add, d.items()))), b'e' )), str : lambda s : bytes('%s:%s' % (len(s), s), encoding=encoding), bytearray: lambda s : b':'.join(( bytes(str(len(s)), encoding=encoding), bytes(s) )), int : lambda i : b''.join(( b'i', bytes(str(i), encoding=encoding), b'e' )) } # Aliases for types that work roughly the same actions[tuple] = actions[list] actions[bytes] = actions[str] return actions[type(s)](s)def _bdecode_int(data): value, data = data[1:].split(b'e', 1) value = int(value) return (value, data)def _bdecode_list(data): data = data[1:] l = [] while not data.startswith(b'e'): value, data = _bdecode(data) l.append(value) return l, data[1:]def _bdecode_dict(data): l, data = _bdecode_list(data) d = dict(zip(l[::2], l[1::2])) return d, datadef _bdecode(data, encoding="utf-8"): if data.startswith(b'l'): return _bdecode_list(data) elif data.startswith(b'd'): return _bdecode_dict(data) elif data.startswith(b'i'): return _bdecode_int(data) # String data length, data = data.split(b':', 1) length = int(length) try: return str(data[:length], encoding=encoding), data[length:] except: return data[:length], data[length:]def bdecode(data, encoding="utf-8", ignore_errors=False): data_struct, data = _bdecode(data, encoding) if len(data) and not ignore_errors: raise ValueError("Junk data at the end of input") else: return data_structdef save(data, filename): ''' Save the given data to the given filename. Will bencode data if required. ''' if not isinstance(data, bytes): data = bencode(data) open(filename, 'wb').write(data)def load(filename, encoding="utf-8"): ''' Load the given filename, using the given codec to decode strings ''' return bdecode(open(filename, 'rb').read(), encoding)def make(target, announce, **kwargs): ''' Create a torrent file from the given directory target (will recurse subdirectories if necessary), using the given announce URL. target: Directory or file to make a torrent metafile for announce: bittorrent announce URL for this torrent optional keyword args: piece_length: size of one piece, per bittorrent protocol private: whether to mark the torrent as private (default: yes) filename: filename to save the torrent to (will not return a value, only write the file) encode: whether to bencode the torrent file before returning (will return a dictionary if False) By default, the torrent will be returned as a bencoded string. ''' metafile = { 'announce': announce, 'encoding' : 'UTF-8', 'created by': 'Steve\'s torrent maker ' + __version__, 'creation date': int(time()), 'info': { 'piece length': kwargs.get('piece_length', PIECE_LENGTH), 'private': kwargs.get('private', 1), 'name': basename(target), } } if isdir(target): files = build_filelist(target) metafile['info']['files'] = files reader = ChunkedReader([pathjoin(target, *i['path']) for i in files], PIECE_LENGTH) else: metafile['info']['length'] = stat(target).st_size reader = ChunkedReader([target]) metafile['info']['pieces'] = bytearray().join(sha1(i).digest() for i in reader) if 'filename' in kwargs: save(metafile, kwargs['filename']) elif kwargs.get('encode', 1): return bencode(metafile) else: return metafile__version__ = '0.4'__author__ = 'Steve Howard'__all__ = ['make', 'bencode', 'bdecode', 'save', 'load', 'PIECE_LENGTH']