summaryrefslogtreecommitdiff
path: root/Kademlia.hs
blob: d29a324019644b6af63b9c44f0b3654b8a70324c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
-- {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE GADTs #-}
module Kademlia where

import Data.Maybe
import Network.DHT.Routing as R
import Data.Time.Clock       (getCurrentTime)
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import GHC.Conc (labelThread)
import Control.Concurrent.Lifted
#endif

import Text.PrettyPrint as PP hiding ((<>), ($$))
import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
import Data.IP
import Control.Concurrent.STM
import Control.Monad
import Data.Monoid
import Data.Time.Clock.POSIX (POSIXTime)

{-
insertNode1 :: forall raw dht u ip.
               ( Address ip
               , Default u
               , Show u
               , Ord (NodeId dht)
               , FiniteBits (NodeId dht)
               , Show (NodeId dht)
               , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
               , DHT.Kademlia dht
               , Ord (TransactionID dht)
               , WireFormat raw dht
               , Serialize (TransactionID dht)
               , SerializableTo raw (Response dht (Ping dht))
               , SerializableTo raw (Query dht (Ping dht))
               , Ord (NodeId dht)
               , Show (NodeId dht)
               , Show (QueryMethod dht)
               ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
-}
{-
insertNode1 = do
      params = DHT.TableParameters
        { maxBuckets = R.defaultBucketCount :: Int
        , fallbackID = myid
        , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
        , logMessage = (\ _ _ -> return ()) {- TODO -} :: Char -> String -> IO ()
        , pingProbe = error "probe" :: ni -> NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP)
        }
  let state = DHT.TableKeeper
        { routingInfo = tbl
        , grokNode    = DHT.insertNode params state
        , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
        }
  return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0

-}

{-
insertNode :: forall msg ip u.
                ( Address ip
                , Show u
                , Show (NodeId msg)
                , Ord (NodeId msg)
                , FiniteBits (NodeId msg)
                ) => TableParameters msg ip u -> TableKeeper msg ip u -> NodeInfo msg ip u -> Maybe ReflectedIP -> IO ()
-}
  {-
  let showTable = do
        t <- atomically $ fmap myBuckets <$> readTVar routingInfo
        let logMsg = "Routing table: " <> pPrint t
        logMessage 'D' (render logMsg)
      reportPingResult tm n b  = showTable
      reportArrival tm info ps = showTable
  -}

-- | A change occured in the kademlia routing table.
data RoutingTableChanged ni = RoutingTableChanged
    { nodeReplaced :: !(Maybe ni) -- Deleted entry.
    , nodeInserted :: ni          -- New routing table entry.
    , nodeTimestamp :: !POSIXTime -- Last-seen time for the new node.
    }
 deriving (Eq,Ord,Show,Functor,Foldable,Traversable)

data InsertionReporter ni = InsertionReporter
    { -- | Called on every inbound packet.
      reportArrival :: POSIXTime
                       -> ni   -- ^ Origin of packet.
                       -> [ni] -- ^ These will be pinged as a result.
                       -> IO ()
      -- | Called on every ping probe.
    , reportPingResult :: POSIXTime
                          -> ni   -- ^ Who was pinged.
                          -> Bool -- ^ True if they ponged.
                          -> IO ()
    }

contramapIR f ir = InsertionReporter
    { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
    , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
    }

data KademliaSpace nid ni = KademliaSpace
    { kademliaLocation :: ni -> nid
    , kademliaTestBit :: nid -> Word -> Bool
    }

contramapKS f ks = KademliaSpace
    { kademliaLocation = kademliaLocation ks . f
    }

-- insertNode param@TableParameters{..} state info witnessed_ip0 = do
insertNode ::
    forall ni nid.
    (Ord ni) =>

    -- reporter
    InsertionReporter ni

    -- nil
    -> R.Info ni nid
 
    -- k
    -> KademliaSpace nid ni

    -- changed
    -> (RoutingTableChanged ni -> STM (IO ()))

    -- pingProbe
    -> (ni -> IO Bool)

    -- info
    -> ni

    -- var
    -> TVar (Maybe (R.Info ni nid))


    -> IO ()

insertNode
    reporter
    nil
    k
    changed
    pingProbe
    info
    var = do

  tm <- utcTimeToPOSIXSeconds <$> getCurrentTime

  (ps,reaction) <- atomically $ do
    tbl <- fromMaybe nil <$> readTVar var
    let (inserted, ps,t') = R.updateForInbound (kademliaTestBit k) (kademliaLocation k) tm info $ myBuckets tbl
    reaction <- if inserted
                    then changed $ RoutingTableChanged Nothing info tm
                    else return $ return ()
    writeTVar var (Just $ tbl { myBuckets = t' })
    return (ps, reaction)

  reportArrival reporter tm info ps
  reaction

  _ <- fork $ do
    myThreadId >>= flip labelThread "pingResults"
    forM_ ps $ \n -> do
        b <- pingProbe n
        reportPingResult reporter tm n b
        join $ atomically $ do
            tbl <- fromMaybe nil <$> readTVar var
            let (replacements, t') = R.updateForPingResult (kademliaTestBit k) (kademliaLocation k) n b $ myBuckets tbl
            writeTVar var (Just $ tbl { myBuckets = t' })
            sequence <$> mapM (\(x,(t,y)) -> changed $ RoutingTableChanged (Just x) y t)
                              replacements

  return ()