summaryrefslogtreecommitdiff
path: root/dht/TCPProber.hs
blob: ccdbd8d1e8409324dc3fa41c50c8d19375fbb10c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
{-# LANGUAGE CPP        #-}
{-# LANGUAGE LambdaCase #-}
module TCPProber where

#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent
import GHC.Conc
#endif

import Control.Arrow
import Control.Concurrent.STM
import Control.Monad
import Data.Function
import Data.IP
import Data.Maybe
import Data.Time.Clock.POSIX
import Network.Socket
import System.Timeout

import DPut
import DebugTag
import Crypto.Tox
import Data.Wrapper.PSQ as PSQ
import Network.Kademlia.Search
import Network.Tox.NodeId
import qualified Network.Tox.TCP as TCP
import Network.QueryResponse as QR

-- Probe TCP ports in a staggered fashion to up the odds of discovering
-- a higher priority port like 443.
resolvePort :: TCP.RelayClient -> NodeInfo -> IO (Maybe PortNumber)
resolvePort tcp ni = do
    got <- newTVarIO Nothing
    cnt <- newTVarIO 0
    let n port = TCP.NodeInfo ni port
        forkPort port = do
            atomically $ modifyTVar' cnt succ
            t <- forkIO $ do
                dput XTCP $ "TCP-probe pinging " ++ show (n port)
                m <- TCP.tcpPing tcp $ n port
                atomically $ do
                    mg <- readTVar got
                    when (isNothing mg)
                        $ forM_ m $ \_ -> writeTVar got $ Just port
                    modifyTVar' cnt pred
            labelThread t $ "probe." ++ show port ++ "." ++ show (nodeId ni)
            return t
        readResult = atomically $ do
            m <- readTVar got
            case m of
                Just v  -> return $ Just v
                Nothing -> readTVar cnt >>= check . (== 0) >> return Nothing
    t443 <- forkPort 443
    t80 <- forkPort 80
    p <- timeout 500000 readResult >>= \case
            Just (Just p) -> do
                killThread t443
                killThread t80
                return $ Just p
            _ -> do
                t3389 <- forkPort 3389
                timeout 500000 readResult >>= \case
                    Just (Just p) -> do
                        killThread t3389
                        killThread t443
                        killThread t80
                        return $ Just p
                    _ -> do
                        let uport = nodePort ni
                        tudp <- forM (guard $ uport `notElem` [443,80,3389,33445])
                                    $ \() -> forkPort uport
                        t33445 <- forkPort 33445
                        p <- readResult
                        mapM_ killThread [t443,t80,t3389,t33445]
                        mapM_ killThread (tudp :: Maybe ThreadId)
                        return p
    return p

data TCPProber = TCPProber
    { probeQueue :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-})
    , probeSpill :: TVar (PSQ' NodeId POSIXTime SockAddr{-UDP-})
    , probeSpillCount :: TVar Int -- Data.HashPSQ has O(n) size, so we keep a count.
    , probeCache :: TVar (PSQ' NodeId POSIXTime (SockAddr{-UDP-},PortNumber{-TCP-}))
    , probeCacheCount :: TVar Int
    }

newProber :: STM TCPProber
newProber = do
    q        <- newTVar PSQ.empty
    spill    <- newTVar PSQ.empty
    spillcnt <- newTVar 0
    cache    <- newTVar PSQ.empty
    cachecnt <- newTVar 0
    return TCPProber
        { probeQueue      = q
        , probeSpill      = spill
        , probeSpillCount = spillcnt
        , probeCache      = cache
        , probeCacheCount = cachecnt
        }


enqueueProbe :: TCPProber -> NodeInfo -> IO ()
enqueueProbe prober ni = do
    tm <- getPOSIXTime
    atomically $ do
        spill <- readTVar (probeSpill prober)
        cache <- readTVar (probeCache prober)
        q     <- readTVar (probeQueue prober)
        let bump var x = modifyTVar' var $ insert' (nodeId ni) x tm
        case PSQ.lookup (nodeId ni) cache of
            Just (tm, x)                       -> bump (probeCache prober) x
            Nothing | member (nodeId ni) spill -> bump (probeSpill prober) (nodeAddr ni)
                    | member (nodeId ni) q     -> return ()
                    | otherwise                -> bump (probeQueue prober) (nodeAddr ni)

maxSpill :: Int
maxSpill = 100

maxCache :: Int
maxCache = 50

runProbeQueue :: TCPProber -> TCP.RelayClient -> Int -> IO ()
runProbeQueue prober client maxjobs = do
    jcnt <- newTVarIO 0
    fix $ \loop -> do
        (tm, mni) <- atomically $ do
            j <- readTVar jcnt
            check (j < maxjobs)
            q <- readTVar $ probeQueue prober
            case minView q of
                Nothing                       -> retry
                Just (Binding nid saddr tm,q') -> do
                    writeTVar (probeQueue prober) q'
                    return (tm, nodeInfo nid saddr)
        forM_ mni $ \ni -> do
            atomically $ modifyTVar' jcnt succ
            t <- forkIO $ do
                m <- resolvePort client ni
                atomically $ case m of
                    Nothing -> do
                        pcnt <- readTVar (probeSpillCount prober)
                        modifyTVar' (probeSpill prober) $ insert' (nodeId ni) (nodeAddr ni) tm
                        if (pcnt == maxSpill)
                            then modifyTVar' (probeSpill prober) deleteMin
                            else modifyTVar' (probeSpillCount prober) succ
                    Just p  -> do
                        ccnt <- readTVar (probeCacheCount prober)
                        modifyTVar' (probeCache prober) $ insert' (nodeId ni) (nodeAddr ni,p) tm
                        if (ccnt == maxCache)
                            then modifyTVar' (probeCache prober) deleteMin
                            else modifyTVar' (probeCacheCount prober) succ
                atomically $ modifyTVar' jcnt pred
            labelThread t ("probe."++show ni)
        loop


getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()))
getNodes prober tcp seeking dst = do
    r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst)
    dput XTCP $ "Got via TCP nodes: " ++ show r
    let tcps (ns,_,mb) = (ns',ns',mb)
         where ns' = do
                    n <- ns
                    [ TCP.NodeInfo n 0 ]
    case r of
      Success (ns,gw) -> do
        let ts = tcps ns
        if TCP.nodeId gw == TCP.nodeId dst
            then return $ Success ts
            else do
                enqueueProbe prober (TCP.udpNodeInfo dst)
                return $ Success ts
        return $ Success ts
      _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r

asyncGetNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo
                           -> (Nonce8 -> Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()) -> IO ())
                           -> IO Nonce8
asyncGetNodes prober tcp seeking dst withResponse = do
    TCP.asyncUDPNodes tcp seeking (TCP.udpNodeInfo dst) $ \qid r -> do
        dput XTCP $ "Got via TCP nodes: " ++ show r
        let tcps (ns,_,mb) = (ns',ns',mb)
             where ns' = do
                        n <- ns
                        [ TCP.NodeInfo n 0 ]
        r' <- case r of
          Success (ns,gw) -> do
            let ts = tcps ns
            if TCP.nodeId gw == TCP.nodeId dst
                then return $ Success ts
                else do
                    enqueueProbe prober (TCP.udpNodeInfo dst)
                    return $ Success ts
            return $ Success ts
          _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r
        withResponse qid r'


nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo Nonce8
nodeSearch prober tcp = Search
    { searchSpace       = TCP.tcpSpace
    , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort
    , searchQuery       = asyncGetNodes prober tcp
    , searchQueryCancel = cancelQuery (TCP.tcpClient tcp)
    , searchAlpha       = 8
    , searchK           = 16
    }