summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia.hs
blob: 0ab26e809ca02fcf48f5fe549196135a702b64f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
-- {-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PatternSynonyms #-}
module Network.Kademlia where

import Data.Function
import Data.Maybe
import qualified Data.Set as Set
import Data.Time.Clock       (getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds)
import Network.Kademlia.Routing   as R
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc                  (labelThread)
#endif
import Control.Concurrent.STM
import Control.Monad
import Data.Bits
import Data.Hashable
import Data.IP
import Data.Monoid
import Data.Serialize                 (Serialize)
import Data.Time.Clock.POSIX          (POSIXTime)
import Network.Address                (bucketRange,genBucketSample)
import Network.Kademlia.Search
import System.Entropy
import System.Timeout
import Text.PrettyPrint               as PP hiding (($$), (<>))
import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
import System.IO
import Control.Concurrent.Tasks

-- | The status of a given node with respect to a given routint table.
data RoutingStatus
    = Stranger  -- ^ The node is unknown to the Kademlia routing table.
    | Applicant -- ^ The node may be inserted pending a ping timeout.
    | Accepted  -- ^ The node has a slot in one of the Kademlia buckets.
    deriving (Eq,Ord,Enum,Show,Read)

-- | A change occured in the kademlia routing table.
data RoutingTransition ni = RoutingTransition
    { transitioningNode :: ni
    , transitionedTo    :: !RoutingStatus
    }
    deriving (Eq,Ord,Show,Read)

data InsertionReporter ni = InsertionReporter
    { -- | Called on every inbound packet. Accepts:
      --
      --   * Origin of packet.
      --
      --   * List of nodes to be pinged as a result.
      reportArrival :: POSIXTime
                       -> ni
                       -> [ni]
                       -> IO ()
      -- | Called on every ping probe. Accepts:
      --
      --    * Who was pinged.
      --
      --    * True Bool value if they ponged.
    , reportPingResult :: POSIXTime
                          -> ni
                          -> Bool
                          -> IO ()
    }

quietInsertions :: InsertionReporter ni
quietInsertions = InsertionReporter
    { reportArrival = \_ _ _ -> return ()
    , reportPingResult = \_ _ _ -> return ()
    }

contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t
contramapIR f ir = InsertionReporter
    { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
    , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
    }

-- | All the IO operations neccessary to maintain a Kademlia routing table.
data TableStateIO ni = TableStateIO
    { -- | Write the routing table.  Typically 'writeTVar'.
      tblWrite :: R.BucketList ni -> STM ()

      -- | Read the routing table.  Typically 'readTVar'.
    , tblRead :: STM (R.BucketList ni)

      -- | Issue a ping to a remote node and report 'True' if the node
      -- responded within an acceptable time and 'False' otherwise.
    , tblPing :: ni -> IO Bool

      -- | Convenience method provided to assist in maintaining state
      -- consistent with the routing table.  It will be invoked in the same
      -- transaction that 'tblRead'\/'tblWrite' occured but only when there was
      -- an interesting change.  The returned IO action will be triggered soon
      -- afterward.
      --
      -- It is not necessary to do anything interesting here.  The following
      -- trivial implementation is fine:
      --
      -- > tblTransition = const $ return $ return ()
    , tblTransition :: RoutingTransition ni -> STM (IO ())
    }

vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
vanillaIO var ping = TableStateIO
    { tblRead    = readTVar var
    , tblWrite   = writeTVar var
    , tblPing    = ping
    , tblTransition = const $ return $ return ()
    }

-- | Everything neccessary to maintain a routing table of /ni/ (node
-- information) entries.
data Kademlia nid ni = Kademlia (InsertionReporter ni)
                                (KademliaSpace nid ni)
                                (TableStateIO ni)


-- Helper to 'insertNode'.
--
-- Adapt return value from 'updateForPingResult' into a
-- more easily groked list of transitions.
transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
transition (x,m) =
    -- Just _  <- m  =  Node transition:  Accepted  --> Stranger
    -- Nothing <- m  =  Node transition:  Applicant --> Stranger
    RoutingTransition x Stranger
       : maybeToList (accepted <$> m)

-- Helper to 'transition'
--
-- Node transition: Applicant --> Accepted
accepted :: (t,ni) -> RoutingTransition ni
accepted (_,y) = RoutingTransition y Accepted


insertNode :: Kademlia nid ni -> ni -> IO ()
insertNode (Kademlia reporter space io) node = do

  tm <- utcTimeToPOSIXSeconds <$> getCurrentTime

  (ps,reaction) <- atomically $ do
    tbl <- tblRead io
    let (inserted, ps,t') = R.updateForInbound space tm node tbl
    tblWrite io t'
    reaction <- case ps of
        _ | inserted -> -- Node transition: Stranger --> Accepted
                        tblTransition io $ RoutingTransition node Accepted
        (_:_)        -> -- Node transition: Stranger --> Applicant
                        tblTransition io $ RoutingTransition node Applicant
        _            -> return $ return ()
    return (ps, reaction)

  reportArrival reporter tm node ps
  reaction

  _ <- fork $ do
    myThreadId >>= flip labelThread "pingResults"
    forM_ ps $ \n -> do
        b <- tblPing io n
        reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
        join $ atomically $ do
            tbl <- tblRead io
            let (replacements, t') = R.updateForPingResult space n b tbl
            tblWrite io t'
            ios <- sequence $ concatMap
                                (map (tblTransition io) . transition)
                                replacements
            return $ sequence_ ios

  return ()