python/dht.md at master · f034d21a/python · GitHub
Skip to content

Latest commit

 

History

History
253 lines (212 loc) · 7.11 KB

File metadata and controls

253 lines (212 loc) · 7.11 KB
#!/usr/bin/env python
# encoding: utf-8


import os
import signal
import asyncio
import binascii
from struct import unpack
from socket import inet_ntoa

import bencoder


BOOTSTRAP_NODES = (
    ("router.bittorrent.com", 6881),
    ("router.utorrent.com", 6881),
    ("router.bitcomet.com", 6881),
    ("dht.transmissionbt.com", 6881)
)


class DHT(asyncio.DatagramProtocol):
    def __init__(self, loop=None, bootstrap_nodes=BOOTSTRAP_NODES, interval=1):
        self.node_id = self.random_node_id()
        self.transport = None
        self.loop = loop or asyncio.get_event_loop()
        self.bootstrap_nodes = bootstrap_nodes
        self.is_running = False
        self.interval = interval

    def stop(self):
        self.is_running = False
        self.loop.call_later(self.interval, self.loop.stop)

    def run(self, port):
        datagram_endpoint = self.loop.create_datagram_endpoint(
            lambda: self, local_addr=('0.0.0.0', port)
        )
        transport, _ = self.loop.run_until_complete(datagram_endpoint)

        for signal_name in ('SIGINT', 'SIGTERM'):
            self.loop.add_signal_handler(getattr(signal, signal_name), self.stop)

        for node in self.bootstrap_nodes:
            # Bootstrap
            self.find_node(address=node, node_id=self.node_id)

        asyncio.ensure_future(self.auto_find_nodes(), loop=self.loop)
        self.loop.run_forever()
        self.loop.close()

    def connection_made(self, transport):
        self.transport = transport

    def connection_lost(self, exc):
        self.is_running = False
        self.transport.close()

    def datagram_received(self, data, address):
        try:
            msg = bencoder.bdecode(data)
        except Exception as e:
            _ = e
            return

        try:
            self.handle_message(msg, address)
        except Exception as e:
            self.send_message(
                data={
                    't': msg['t'],
                    'y': 'e',
                    'e': [202, 'Server Error']
                },
                address=address
            )
            raise e

    def handle_message(self, msg, address):
        msg_type = msg.get(b'y', b'e')

        return {
            b'e': None,
            b'r': self.handle_response(msg, address=address),
            b'q': asyncio.ensure_future(self.handle_query(msg, address=address), loop=self.loop)
        }.get(msg_type, None)

    def handle_response(self, msg, address):
        _ = address

        args = msg[b'r']
        if b'nodes' in args:
            for node_id, ip, port in self.split_nodes(args[b'nodes']):
                self.ping(address=(ip, port))

    async def handle_get_peers(self, info_hash, address):
        await self.handler(info_hash, address)

    async def handle_announce_peer(self, info_hash, address):
        await self.handler(info_hash, address)

    async def handler(self, info_hash, address):
        raise NotImplementedError()

    async def handle_query(self, msg, address):
        args = msg[b'a']
        node_id = args[b'id']
        query_type = msg[b'q']
        if query_type == b'get_peers':
            info_hash = args[b'info_hash']
            info_hash = self.decode_info_hash(info_hash)
            token = info_hash[:2]
            self.send_message(
                data={
                    't': msg[b't'],
                    'y': 'r',
                    'r': {
                        'id': self.fake_node_id(node_id),
                        'nodes': '',
                        'token': token
                    }
                },
                address=address
            )
            await self.handle_get_peers(info_hash, address)
        elif query_type == b'announce_peer':
            info_hash = args[b'info_hash']
            tid = msg[b't']
            self.send_message(
                data={
                    't': tid,
                    'y': 'r',
                    'r': {
                        'id': self.fake_node_id(node_id)
                    }
                },
                address=address
            )
            await self.handle_announce_peer(self.decode_info_hash(info_hash), address)
        elif query_type == b'find_node':
            tid = msg[b't']
            self.send_message(
                data={
                    't': tid,
                    'y': 'r',
                    'r': {
                        'id': self.fake_node_id(node_id),
                        'nodes': ''
                    }
                },
                address=address
            )
        elif query_type == b'ping':
            self.send_message(
                data={
                    't': b'tt',
                    'y': 'r',
                    'r': {
                        'id': self.fake_node_id(node_id)
                    }
                },
                address=address
            )
        self.find_node(address=address, node_id=node_id)

    async def auto_find_nodes(self):
        self.is_running = True
        while self.is_running:
            await asyncio.sleep(self.interval)
            for node in self.bootstrap_nodes:
                self.find_node(address=node)

    def ping(self, address, node_id=None):
        self.send_message(
            data={
                'y': 'q',
                't': 'pg',
                'q': 'ping',
                'a': {
                    'id': self.fake_node_id(node_id)
                }
            },
            address=address
        )

    def send_message(self, data, address):
        data.setdefault('t', b'tt')
        self.transport.sendto(bencoder.bencode(data), address)

    def fake_node_id(self, node_id=None):
        if node_id:
            return node_id[:-1] + self.node_id[-1:]
        return self.node_id

    def find_node(self, address, node_id=None, target=None):
        if not target:
            target = self.random_node_id()
        self.send_message(
            data={
                't': b'fn',
                'y': 'q',
                'q': 'find_node',
                'a': {
                    'id': self.fake_node_id(node_id),
                    'target': target
                }
            },
            address=address
        )

    @staticmethod
    def decode_info_hash(info_hash):
        if isinstance(info_hash, bytes):
            # Convert bytes to hex
            info_hash = binascii.hexlify(info_hash).decode('utf-8')

        return info_hash.lower()

    @staticmethod
    def random_node_id(size=20):
        return os.urandom(size)

    @staticmethod
    def split_nodes(nodes):
        length = len(nodes)
        if (length % 26) != 0:
            return

        for i in range(0, length, 26):
            nid = nodes[i:i + 20]
            ip = inet_ntoa(nodes[i + 20:i + 24])
            port = unpack('!H', nodes[i + 24:i + 26])[0]
            yield nid, ip, port
            

class MagnetCrawler(DHT):
    def __init__(self, *args, **kwargs):
        super(MagnetCrawler, self).__init__(*args, **kwargs)

    async def handler(self, info_hash, address):
        print(info_hash)


def get_magnets(port):
    crawler = MagnetCrawler()
    crawler.run(port)


if __name__ == '__main__':
    get_magnets(port=6881)