summaryrefslogtreecommitdiff
path: root/src/Network/DHT.hs
blob: 285cf9ffb489c74af5b4404e4893c65f7d857c14 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
module Network.DHT
    ( -- makeTableKeeper
    -- , TableKeeper(..)
      module Network.DHT -- for now
    , module Network.DHT.Types
    ) where

import Data.Bits
import Data.Maybe
import Data.Monoid
import Network.Address
import Network.DHT.Types
import Network.DatagramServer.Types
import Network.DHT.Routing
import Control.Concurrent.STM
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import GHC.Conc (labelThread)
import Control.Concurrent.Lifted
#endif
import Text.PrettyPrint as PP hiding ((<>), ($$))
import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))

import Control.Monad
import Data.Time.Clock       (getCurrentTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)

data TableKeeper msg ip u = TableKeeper
    { routingInfo  :: TVar    (Maybe (Info msg ip u))
    , grokNode :: NodeInfo msg ip u -> Maybe (ReflectedIP) -> IO ()
    , grokAddress :: Maybe SockAddr -> ReflectedIP -> IO ()
    }

makeTableKeeper :: forall msg ip u.
                ( Address ip
                , Show u
                , Show (NodeId msg)
                , Ord (NodeId msg)
                , FiniteBits (NodeId msg)
                ) => TableParameters msg ip u -> IO (TableKeeper msg ip u)
makeTableKeeper param@TableParameters{..} = do
    error "TODO makeTableKeeper" -- kick off table-updating thread
    ri <- atomically (newTVar Nothing)
    let tk = TableKeeper{ routingInfo  = ri
                        , grokNode    = insertNode param tk
                        , grokAddress = error "todo"
                        }
    return tk

atomicInsert :: ( Eq ip, Address ip, Ord (NodeId msg), FiniteBits (NodeId msg)
                ) => TableParameters msg ip u -> TableKeeper msg ip u -> Timestamp -> Event msg ip u -> Maybe ReflectedIP -> STM (IO [CheckPing msg ip u])
atomicInsert param@TableParameters{..} state tm arrival witnessed_ip = do
    minfo <- readTVar (routingInfo state)
    case minfo of
        Just inf -> do
          (ps,t') <- insert tm arrival $ myBuckets inf
          writeTVar (routingInfo state) $ Just $ inf { myBuckets = t' }
          return $ do
            case witnessed_ip of
                Just (ReflectedIP ip)
                  | ip /= myAddress inf
                    -> logMessage 'I' $ unwords
                          $ [ "Possible NAT?"
                            , show (toSockAddr $ nodeAddr $ foreignNode arrival)
                            , "reports my address:"
                            , show ip ]
                        -- TODO: Let routing table vote on my IP/NodeId.
                _   -> return ()
            return ps
        Nothing ->
            let dropped = return $ do
                  -- Ignore non-witnessing nodes until somebody tells
                  -- us our ip address.
                  logMessage 'W' ("Dropped " ++ show (toSockAddr $ nodeAddr $ foreignNode arrival))
                  return []
            in fromMaybe dropped $ do
                  ReflectedIP ip <- witnessed_ip
                  let nil = nullTable (adjustID ip arrival) maxBuckets
                  return $ do
                    (ps,t') <- insert tm arrival nil
                    let new_info = Info t' (adjustID ip arrival) ip
                    writeTVar (routingInfo state) $ Just new_info
                    return $ do
                       logMessage 'I' $ unwords
                            [ "External IP address:"
                            , show ip
                            , "(reported by"
                            , show (toSockAddr $ nodeAddr $ foreignNode arrival)
                                ++ ")"
                            ]
                       return ps

-- | This operation do not block but acquire exclusive access to
--   routing table.
insertNode :: forall msg ip u.
                ( Address ip
                , Show u
                , Show (NodeId msg)
                , Ord (NodeId msg)
                , FiniteBits (NodeId msg)
                ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
insertNode param@TableParameters{..} state info witnessed_ip0 = do
  tm <- utcTimeToPOSIXSeconds <$> getCurrentTime -- Network.DHT.Routing.TimeStamp = POSIXTime
  let showTable = do
        t <- atomically $ fmap myBuckets <$> readTVar (routingInfo state)
        let logMsg = "Routing table: " <> pPrint t
        logMessage 'D' (render logMsg)
  let arrival = TryInsert info
  logMessage 'D' $ show ( TryInsert (mapAddress fromAddr info) :: Event _ (Maybe IPv4) _ )
  ps <- join $ atomically $ atomicInsert param state tm arrival witnessed_ip0
  showTable
  _ <- fork $ do
    myThreadId >>= flip labelThread "DHT.insertNode.pingResults"
    forM_ ps $ \(CheckPing ns)-> do
      forM_ ns $ \n -> do
        (b,mip) <- pingProbe n
        let alive = PingResult n b
        logMessage 'D' $ "PingResult "++show (nodeId n,b)
        _ <- join $ atomically $ atomicInsert param state tm alive mip
        showTable
        return ()
  return ()