summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Search.hs
blob: 91a1079ba2faff78c081c987e8f774c80400f8e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
{-# LANGUAGE CPP #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
module Network.BitTorrent.DHT.Search where

import Control.Concurrent
import Control.Concurrent.Async.Pool
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Bool
import Data.Function
import Data.List
import qualified Data.Map.Strict     as Map
         ;import Data.Map.Strict     (Map)
import Data.Maybe
import qualified Data.Set            as Set
         ;import Data.Set            (Set)
import System.IO

import qualified Data.MinMaxPSQ   as MM
         ;import Data.MinMaxPSQ   (MinMaxPSQ)
import qualified Data.Wrapper.PSQ as PSQ
         ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey)
import Network.Address hiding (NodeId)
import Network.DatagramServer.Types
import Network.DHT.Routing as R

data IterativeSearch nid addr ni r = IterativeSearch
    { searchTarget       :: nid
    , searchSpace        :: KademliaSpace nid ni
    , searchNodeAddress  :: ni -> addr
    , searchQuery        :: ni -> IO ([ni], [r])
    , searchPendingCount :: TVar Int
    , searchQueued       :: TVar (MinMaxPSQ ni nid)
    , searchInformant    :: TVar (MinMaxPSQ ni nid)
    , searchVisited      :: TVar (Set addr)
    , searchResults      :: TVar (Set r)
    }

newSearch :: ( Ord addr
             , PSQKey nid
             , PSQKey ni
             ) =>
             KademliaSpace nid ni
             -> (ni -> addr)
             -> (ni -> IO ([ni], [r]))
             -> nid -> [ni] -> IO (IterativeSearch nid addr ni r)
newSearch space nAddr qry target ns = atomically $ do
    c <- newTVar 0
    q <- newTVar $ MM.fromList
                 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
                 $ ns
    i <- newTVar MM.empty
    v <- newTVar Set.empty
    r <- newTVar Set.empty
    return $ IterativeSearch target space nAddr qry c q i v r

searchAlpha :: Int
searchAlpha = 3

searchK :: Int
searchK = 8

sendQuery :: forall addr nid ni r.
            ( Ord addr
            , Ord r
            , PSQKey nid
            , PSQKey ni
            ) =>
            IterativeSearch nid addr ni r
            -> Binding ni nid
            -> IO ()
sendQuery IterativeSearch{..} (ni :-> d) = do
    (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
                      (searchQuery ni)
    atomically $ do
        modifyTVar searchPendingCount pred
        vs <- readTVar searchVisited
        -- We only queue a node if it is not yet visited
        let insertFoundNode :: ni
                           -> MinMaxPSQ ni nid
                           -> MinMaxPSQ ni nid
            insertFoundNode n q
             | searchNodeAddress n `Set.member` vs
                         = q
             | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget
                                                     $ kademliaLocation searchSpace n )
                                                   q
        modifyTVar searchQueued    $ \q -> foldr insertFoundNode q ns
        modifyTVar searchInformant $ MM.insertTake searchK ni d
        modifyTVar searchResults   $ \s -> foldr Set.insert s rs


searchIsFinished :: ( Ord addr
                    , PSQKey nid
                    , PSQKey ni
                    ) => IterativeSearch nid addr ni r -> STM Bool
searchIsFinished IterativeSearch{..} = do
    q <- readTVar searchQueued
    cnt <- readTVar searchPendingCount
    informants <- readTVar searchInformant
    return $ cnt == 0
             && ( MM.null q
                  || ( MM.size informants >= searchK
                       && ( PSQ.prio (fromJust $ MM.findMax informants)
                            <= PSQ.prio (fromJust $ MM.findMin q))))

searchCancel :: IterativeSearch nid addr ni r -> IO ()
searchCancel IterativeSearch{..} = atomically $ do
    writeTVar searchPendingCount 0
    writeTVar searchQueued MM.empty

search ::
    ( Ord r
    , Ord addr
    , PSQKey nid
    , PSQKey ni
    ) => IterativeSearch nid addr ni r -> IO ()
search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
    fix $ \again -> do
        join $ atomically $ do
            cnt <- readTVar $ searchPendingCount
            informants <- readTVar searchInformant
            found <- MM.minView <$> readTVar searchQueued
            case found of
                Just (ni :-> d, q)
                  | (MM.size informants < searchK) && (cnt > 0 || not (MM.null q))
                    || (PSQ.prio (fromJust $ MM.findMax informants) > d)
                  -> do writeTVar searchQueued q
                        modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
                        modifyTVar searchPendingCount succ
                        return $ withAsync g (sendQuery s (ni :-> d)) (const again)
                _ -> do check (cnt == 0)
                        return $ return ()