summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-29 10:37:07 -0400
committerjoe <joe@jerkface.net>2017-06-29 13:00:16 -0400
commit3195c0877b443e5ccd4d489f03944fc059d4d7aa (patch)
tree2a05c35a9b43d8f0725c52fc860b30ae191f3871 /src
parent05e70386c2248d87a61a8e8267e0211597f2fa88 (diff)
WIP: Generalizing DHT monad.
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/DHT.hs42
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs391
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs52
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs114
-rw-r--r--src/Network/DHT/Mainline.hs89
-rw-r--r--src/Network/DHT/Types.hs51
-rw-r--r--src/Network/DatagramServer.hs36
7 files changed, 537 insertions, 238 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index d9328cea..8bc423a3 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -15,9 +15,11 @@
15-- <http://www.bittorrent.org/beps/bep_0005.html> 15-- <http://www.bittorrent.org/beps/bep_0005.html>
16-- 16--
17{-# LANGUAGE FlexibleInstances #-} 17{-# LANGUAGE FlexibleInstances #-}
18{-# LANGUAGE FlexibleContexts #-}
18{-# LANGUAGE TemplateHaskell #-} 19{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE TypeOperators #-} 20{-# LANGUAGE TypeOperators #-}
20{-# LANGUAGE ScopedTypeVariables #-} 21{-# LANGUAGE ScopedTypeVariables #-}
22{-# LANGUAGE CPP #-}
21module Network.BitTorrent.DHT 23module Network.BitTorrent.DHT
22 ( -- * Distributed Hash Table 24 ( -- * Distributed Hash Table
23 DHT 25 DHT
@@ -37,7 +39,7 @@ module Network.BitTorrent.DHT
37 , snapshot 39 , snapshot
38 40
39 -- * Operations 41 -- * Operations
40 , Network.BitTorrent.DHT.lookup 42 -- , Network.BitTorrent.DHT.lookup
41 , Network.BitTorrent.DHT.insert 43 , Network.BitTorrent.DHT.insert
42 , Network.BitTorrent.DHT.delete 44 , Network.BitTorrent.DHT.delete
43 45
@@ -50,7 +52,7 @@ module Network.BitTorrent.DHT
50 , closeNode 52 , closeNode
51 53
52 -- ** Monad 54 -- ** Monad
53 , MonadDHT (..) 55 -- , MonadDHT (..)
54 , runDHT 56 , runDHT
55 ) where 57 ) where
56 58
@@ -81,11 +83,13 @@ import qualified Network.DatagramServer as KRPC (listen, Protocol(..))
81-- DHT types 83-- DHT types
82-----------------------------------------------------------------------} 84-----------------------------------------------------------------------}
83 85
86#if 0
84class MonadDHT m where 87class MonadDHT m where
85 liftDHT :: DHT IPv4 a -> m a 88 liftDHT :: DHT raw dht u IPv4 a -> m a
86 89
87instance MonadDHT (DHT IPv4) where 90instance MonadDHT (DHT raw dht u IPv4) where
88 liftDHT = id 91 liftDHT = id
92#endif
89 93
90-- | Convenience method. Pass this to 'dht' to enable full logging. 94-- | Convenience method. Pass this to 'dht' to enable full logging.
91fullLogging :: LogSource -> LogLevel -> Bool 95fullLogging :: LogSource -> LogLevel -> Bool
@@ -96,7 +100,7 @@ dht :: (Ord ip, Address ip)
96 => Options -- ^ normally you need to use 'Data.Default.def'; 100 => Options -- ^ normally you need to use 'Data.Default.def';
97 -> NodeAddr ip -- ^ address to bind this node; 101 -> NodeAddr ip -- ^ address to bind this node;
98 -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default 102 -> (LogSource -> LogLevel -> Bool) -- ^ use 'fullLogging' as a noisy default
99 -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; 103 -> DHT raw dht u ip a -- ^ actions to run: 'bootstrap', 'lookup', etc;
100 -> IO a -- ^ result. 104 -> IO a -- ^ result.
101dht opts addr logfilter action = do 105dht opts addr logfilter action = do
102 runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do 106 runStderrLoggingT $ filterLogger logfilter $ LoggingT $ \ logger -> do
@@ -175,7 +179,7 @@ resolveHostName NodeAddr {..} = do
175-- 179--
176-- This operation do block, use 180-- This operation do block, use
177-- 'Control.Concurrent.Async.Lifted.async' if needed. 181-- 'Control.Concurrent.Async.Lifted.async' if needed.
178bootstrap :: forall ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT ip () 182bootstrap :: forall raw dht u ip. Address ip => Maybe BS.ByteString -> [NodeAddr ip] -> DHT raw dht u ip ()
179bootstrap mbs startNodes = do 183bootstrap mbs startNodes = do
180 restored <- 184 restored <-
181 case decode <$> mbs of 185 case decode <$> mbs of
@@ -187,8 +191,8 @@ bootstrap mbs startNodes = do
187 $(logInfoS) "bootstrap" "Start node bootstrapping" 191 $(logInfoS) "bootstrap" "Start node bootstrapping"
188 let searchAll aliveNodes = do 192 let searchAll aliveNodes = do
189 nid <- myNodeIdAccordingTo (error "FIXME") 193 nid <- myNodeIdAccordingTo (error "FIXME")
190 nss <- C.sourceList [aliveNodes] $= search nid (findNodeQ nid) $$ C.consume 194 ns <- bgsearch ioFindNodes nid
191 return ( nss :: [[NodeInfo KMessageOf ip ()]] ) 195 return ( ns :: [NodeInfo KMessageOf ip ()] )
192 input_nodes <- (restored ++) . T.toList <$> getTable 196 input_nodes <- (restored ++) . T.toList <$> getTable
193 -- Step 1: Use iterative searches to flesh out the table.. 197 -- Step 1: Use iterative searches to flesh out the table..
194 do let knowns = map (map $ nodeAddr . fst) input_nodes 198 do let knowns = map (map $ nodeAddr . fst) input_nodes
@@ -200,10 +204,10 @@ bootstrap mbs startNodes = do
200 -- If our cached nodes are alive and our IP address did not change, it's possible 204 -- If our cached nodes are alive and our IP address did not change, it's possible
201 -- we are already bootsrapped, so no need to do any searches. 205 -- we are already bootsrapped, so no need to do any searches.
202 when (not b) $ do 206 when (not b) $ do
203 nss <- searchAll $ take 2 alive_knowns 207 ns <- searchAll $ take 2 alive_knowns
204 -- We only use the supplied bootstrap nodes when we don't know of any 208 -- We only use the supplied bootstrap nodes when we don't know of any
205 -- others to try. 209 -- others to try.
206 when (null nss) $ do 210 when (null ns) $ do
207 -- TODO filter duplicated in startNodes list 211 -- TODO filter duplicated in startNodes list
208 -- TODO retransmissions for startNodes 212 -- TODO retransmissions for startNodes
209 (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes) 213 (aliveNodes,_) <- unzip <$> queryParallel (pingQ <$> startNodes)
@@ -243,7 +247,7 @@ bootstrap mbs startNodes = do
243-- 247--
244-- This operation do not block. 248-- This operation do not block.
245-- 249--
246isBootstrapped :: Eq ip => DHT ip Bool 250isBootstrapped :: Eq ip => DHT raw dht u ip Bool
247isBootstrapped = T.full <$> getTable 251isBootstrapped = T.full <$> getTable
248 252
249{----------------------------------------------------------------------- 253{-----------------------------------------------------------------------
@@ -254,7 +258,11 @@ isBootstrapped = T.full <$> getTable
254-- 258--
255-- This is blocking operation, use 259-- This is blocking operation, use
256-- 'Control.Concurrent.Async.Lifted.async' if needed. 260-- 'Control.Concurrent.Async.Lifted.async' if needed.
257snapshot :: Address ip => DHT ip BS.ByteString 261snapshot :: ( Address ip
262 , Ord (NodeId dht)
263 , Serialize u
264 , Serialize (NodeId dht)
265 ) => DHT raw dht u ip BS.ByteString
258snapshot = do 266snapshot = do
259 tbl <- getTable 267 tbl <- getTable
260 return $ encode tbl 268 return $ encode tbl
@@ -263,15 +271,19 @@ snapshot = do
263-- Operations 271-- Operations
264-----------------------------------------------------------------------} 272-----------------------------------------------------------------------}
265 273
274#if 0
275
266-- | Get list of peers which downloading this torrent. 276-- | Get list of peers which downloading this torrent.
267-- 277--
268-- This operation is incremental and do block. 278-- This operation is incremental and do block.
269-- 279--
270lookup :: Address ip => InfoHash -> DHT ip `C.Source` [PeerAddr ip] 280lookup :: Address ip => InfoHash -> DHT raw dht u ip `C.Source` [PeerAddr ip]
271lookup topic = do -- TODO retry getClosest if bucket is empty 281lookup topic = do -- TODO retry getClosest if bucket is empty
272 closest <- lift $ getClosest topic 282 closest <- lift $ getClosest topic
273 C.sourceList [closest] $= search topic (getPeersQ topic) 283 C.sourceList [closest] $= search topic (getPeersQ topic)
274 284
285#endif
286
275-- TODO do not republish if the topic is already in announceSet 287-- TODO do not republish if the topic is already in announceSet
276 288
277-- | Announce that /this/ peer may have some pieces of the specified 289-- | Announce that /this/ peer may have some pieces of the specified
@@ -281,7 +293,7 @@ lookup topic = do -- TODO retry getClosest if bucket is empty
281-- This operation is synchronous and do block, use 293-- This operation is synchronous and do block, use
282-- 'Control.Concurrent.Async.Lifted.async' if needed. 294-- 'Control.Concurrent.Async.Lifted.async' if needed.
283-- 295--
284insert :: Address ip => InfoHash -> PortNumber -> DHT ip () 296insert :: Address ip => InfoHash -> PortNumber -> DHT raw dht u ip ()
285insert ih p = do 297insert ih p = do
286 publish ih p 298 publish ih p
287 insertTopic ih p 299 insertTopic ih p
@@ -290,6 +302,6 @@ insert ih p = do
290-- 302--
291-- This operation is atomic and may block for a while. 303-- This operation is atomic and may block for a while.
292-- 304--
293delete :: InfoHash -> PortNumber -> DHT ip () 305delete :: InfoHash -> PortNumber -> DHT raw dht u ip ()
294delete = deleteTopic 306delete = deleteTopic
295{-# INLINE delete #-} 307{-# INLINE delete #-}
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 254b347c..e5d9bd5f 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -41,11 +41,13 @@ module Network.BitTorrent.DHT.Query
41 -- concatenate its responses, optionally yielding result and 41 -- concatenate its responses, optionally yielding result and
42 -- continue to the next iteration. 42 -- continue to the next iteration.
43 , Search 43 , Search
44 , search 44 -- , search
45 , publish 45 , publish
46 , ioFindNode 46 , ioFindNode
47 , ioFindNodes
47 , ioGetPeers 48 , ioGetPeers
48 , isearch 49 , isearch
50 , bgsearch
49 51
50 -- ** Routing table 52 -- ** Routing table
51 , insertNode 53 , insertNode
@@ -57,6 +59,8 @@ module Network.BitTorrent.DHT.Query
57 , (<@>) 59 , (<@>)
58 ) where 60 ) where
59 61
62import Data.Bits
63import Data.Default
60#ifdef THREAD_DEBUG 64#ifdef THREAD_DEBUG
61import Control.Concurrent.Lifted.Instrument hiding (yield) 65import Control.Concurrent.Lifted.Instrument hiding (yield)
62#else 66#else
@@ -102,30 +106,43 @@ import Network.DatagramServer.Tox
102#endif 106#endif
103import Network.Address hiding (NodeId) 107import Network.Address hiding (NodeId)
104import Network.DatagramServer.Types as RPC hiding (Query,Response) 108import Network.DatagramServer.Types as RPC hiding (Query,Response)
109import Network.DHT.Types
105import Control.Monad.Trans.Control 110import Control.Monad.Trans.Control
111import Data.Typeable
112import Data.Serialize
113import System.IO.Unsafe (unsafeInterleaveIO)
114import Data.String
106 115
107{----------------------------------------------------------------------- 116{-----------------------------------------------------------------------
108-- Handlers 117-- Handlers
109-----------------------------------------------------------------------} 118-----------------------------------------------------------------------}
110 119
120{-
111nodeHandler :: ( Address ip 121nodeHandler :: ( Address ip
112 , KRPC (Query a) (Response b) 122 , KRPC (Query KMessageOf a) (Response KMessageOf b)
113 ) 123 )
114 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler 124 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
125-}
126nodeHandler ::
127 (Address addr, WireFormat raw msg, Pretty (NodeInfo dht addr u),
128 Default u,
129 IsString t, Functor msg,
130 SerializableTo raw (Response dht r),
131 SerializableTo raw (Query dht q)) =>
132 (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ())
133 -> (NodeAddr addr -> IO (NodeId dht))
134 -> (Char -> t -> Text -> IO ())
135 -> QueryMethod msg
136 -> (NodeAddr addr -> q -> IO r)
137 -> Handler IO msg raw
115nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do 138nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $ \ sockAddr qry -> do
116#ifdef VERSION_bencoding
117 let remoteId = queringNodeId qry 139 let remoteId = queringNodeId qry
118 read_only = queryIsReadOnly qry 140 read_only = queryIsReadOnly qry
119 q = queryParams qry 141 q = queryParams qry
120#else
121 let remoteId = msgClient qry
122 read_only = False
123 q = msgPayload qry
124#endif
125 case fromSockAddr sockAddr of 142 case fromSockAddr sockAddr of
126 Nothing -> throwIO BadAddress 143 Nothing -> throwIO BadAddress
127 Just naddr -> do 144 Just naddr -> do
128 let ni = NodeInfo remoteId naddr () 145 let ni = NodeInfo remoteId naddr def
129 -- Do not route read-only nodes. (bep 43) 146 -- Do not route read-only nodes. (bep 43)
130 if read_only 147 if read_only
131 then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) 148 then logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
@@ -135,13 +152,13 @@ nodeHandler insertNode myNodeIdAccordingTo logm method action = handler method $
135 <*> action naddr q 152 <*> action naddr q
136 153
137-- | Default 'Ping' handler. 154-- | Default 'Ping' handler.
138pingH :: NodeAddr ip -> Ping -> IO Ping 155pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht)
139pingH _ Ping = return Ping 156pingH dht _ _ = return (DHT.pongMessage dht)
140-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } 157-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
141 158
142-- | Default 'FindNode' handler. 159-- | Default 'FindNode' handler.
143findNodeH :: (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) -> NodeAddr ip -> FindNode ip -> IO (NodeFound ip) 160findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip)
144findNodeH getclosest _ (FindNode nid) = NodeFound <$> getclosest nid 161findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg)
145 162
146-- | Default 'GetPeers' handler. 163-- | Default 'GetPeers' handler.
147getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) 164getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip)
@@ -162,51 +179,100 @@ announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do
162 insertPeer peers topic announcedName peerAddr 179 insertPeer peers topic announcedName peerAddr
163 return Announced 180 return Announced
164 181
182-- | Includes all Kademlia-related handlers.
183kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip
184 , Ord (TransactionID dht)
185 , Ord (NodeId dht)
186 , Show u
187 , SerializableTo raw (Response dht (Ping dht))
188 , SerializableTo raw (Query dht (Ping dht))
189 , Show (QueryMethod dht)
190 , Show (NodeId dht)
191 , FiniteBits (NodeId dht)
192 , Default u
193 , Serialize (TransactionID dht)
194 , WireFormat raw dht
195 , Kademlia dht
196 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
197 , Functor dht
198 , Pretty (NodeInfo dht ip u)
199 , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
200 , SerializableTo raw (Response dht (NodeFound dht ip))
201 , SerializableTo raw (Query dht (FindNode dht ip))
202 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
203-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler]
204kademliaHandlers logger = do
205 groknode <- insertNode1
206 mynid <- myNodeIdAccordingTo1
207 let handler :: ( KRPC (Query dht a) (Response dht b)
208 , SerializableTo raw (Response dht b)
209 , SerializableTo raw (Query dht a)
210 ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw
211 handler = nodeHandler groknode mynid (logt logger)
212 dht = Proxy :: Proxy dht
213 getclosest <- getClosest1
214 return [ handler (namePing dht) $ pingH dht
215 , handler (nameFindNodes dht) $ findNodeH getclosest
216 ]
217
218
165-- | Includes all default query handlers. 219-- | Includes all default query handlers.
166defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT ip [NodeHandler] 220defaultHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler]
167defaultHandlers logger = do 221defaultHandlers logger = do
168 groknode <- insertNode1 222 groknode <- insertNode1
169 toks <- asks sessionTokens
170 getclosest <- getClosest1
171 mynid <- myNodeIdAccordingTo1 223 mynid <- myNodeIdAccordingTo1
224 let handler :: KRPC (Query KMessageOf a) (Response KMessageOf b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
225 handler = nodeHandler groknode mynid (logt logger)
226 toks <- asks sessionTokens
172 peers <- asks contactInfo 227 peers <- asks contactInfo
173 getpeers <- getPeerList1 228 getpeers <- getPeerList1
174 let handler :: KRPC (Query a) (Response b) => QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler 229 hs <- kademliaHandlers logger
175 handler = nodeHandler groknode mynid (logt logger) 230 return $ hs ++ [ handler "get_peers" $ getPeersH getpeers toks
176 return [ handler "ping" $ pingH 231 , handler "announce_peer" $ announceH peers toks ]
177 , handler "find-nodes" $ findNodeH getclosest
178 , handler "get_peers" $ getPeersH getpeers toks
179 , handler "announce_peer" $ announceH peers toks ]
180 232
181{----------------------------------------------------------------------- 233{-----------------------------------------------------------------------
182-- Basic queries 234-- Basic queries
183-----------------------------------------------------------------------} 235-----------------------------------------------------------------------}
184 236
185type Iteration ip o = NodeInfo KMessageOf ip () -> DHT ip (Either [NodeInfo KMessageOf ip ()] [o ip]) 237type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip])
186 238
187-- | The most basic query. May be used to check if the given node is 239-- | The most basic query. May be used to check if the given node is
188-- alive or get its 'NodeId'. 240-- alive or get its 'NodeId'.
189pingQ :: Address ip => NodeAddr ip -> DHT ip (NodeInfo KMessageOf ip (), Maybe ReflectedIP) 241pingQ :: forall raw dht u ip.
242 ( DHT.Kademlia dht
243 , Address ip
244 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
245 , Default u
246 , Show u
247 , Ord (TransactionID dht)
248 , Serialize (TransactionID dht)
249 , WireFormat raw dht
250 , SerializableTo raw (Response dht (Ping dht))
251 , SerializableTo raw (Query dht (Ping dht))
252 , Ord (NodeId dht)
253 , FiniteBits (NodeId dht)
254 , Show (NodeId dht)
255 , Show (QueryMethod dht)
256 ) => NodeAddr ip -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
190pingQ addr = do 257pingQ addr = do
191#ifdef VERSION_bencoding 258 let ping = DHT.pingMessage (Proxy :: Proxy dht)
192 (nid, Ping, mip) <- queryNode' addr Ping 259 (nid, pong, mip) <- queryNode' addr ping
193#else 260 let _ = pong `asTypeOf` ping
194 (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} 261 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
195#endif 262 return (NodeInfo nid addr def, mip)
196 return (NodeInfo nid addr (), mip)
197 263
198-- TODO [robustness] match range of returned node ids with the 264-- TODO [robustness] match range of returned node ids with the
199-- expected range and either filter bad nodes or discard response at 265-- expected range and either filter bad nodes or discard response at
200-- all throwing an exception 266-- all throwing an exception
201-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo 267-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo
202findNodeQ key NodeInfo {..} = do 268findNodeQ proxy key NodeInfo {..} = do
203 NodeFound closest <- FindNode (toNodeId key) <@> nodeAddr 269 closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> nodeAddr
204 $(logInfoS) "findNodeQ" $ "NodeFound\n" 270 $(logInfoS) "findNodeQ" $ "NodeFound\n"
205 <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) 271 <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest)
206 return $ Right closest 272 return $ Right closest
207 273
208#ifdef VERSION_bencoding 274#ifdef VERSION_bencoding
209getPeersQ :: Address ip => InfoHash -> Iteration ip PeerAddr 275getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr
210getPeersQ topic NodeInfo {..} = do 276getPeersQ topic NodeInfo {..} = do
211 GotPeers {..} <- GetPeers topic <@> nodeAddr 277 GotPeers {..} <- GetPeers topic <@> nodeAddr
212 let dist = distance (toNodeId topic) nodeId 278 let dist = distance (toNodeId topic) nodeId
@@ -215,7 +281,7 @@ getPeersQ topic NodeInfo {..} = do
215 <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } 281 <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" }
216 return peers 282 return peers
217 283
218announceQ :: Address ip => InfoHash -> PortNumber -> Iteration ip NodeAddr 284announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr
219announceQ ih p NodeInfo {..} = do 285announceQ ih p NodeInfo {..} = do
220 GotPeers {..} <- GetPeers ih <@> nodeAddr 286 GotPeers {..} <- GetPeers ih <@> nodeAddr
221 case peers of 287 case peers of
@@ -232,7 +298,7 @@ announceQ ih p NodeInfo {..} = do
232-----------------------------------------------------------------------} 298-----------------------------------------------------------------------}
233 299
234 300
235ioGetPeers :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) 301ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip]))
236ioGetPeers ih = do 302ioGetPeers ih = do
237 session <- ask 303 session <- ask
238 return $ \ni -> runDHT session $ do 304 return $ \ni -> runDHT session $ do
@@ -241,17 +307,71 @@ ioGetPeers ih = do
241 Right e -> return $ either (,[]) ([],) e 307 Right e -> return $ either (,[]) ([],) e
242 Left e -> let _ = e :: QueryFailure in return ([],[]) 308 Left e -> let _ = e :: QueryFailure in return ([],[])
243 309
244ioFindNode :: Address ip => InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [NodeInfo KMessageOf ip ()])) 310ioFindNode :: ( DHT.Kademlia dht
311 , WireFormat raw dht
312 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
313 , Address ip
314 , Default u
315 , Show u
316 , Show (QueryMethod dht)
317 , TableKey dht infohash
318 , Eq (NodeId dht)
319 , Ord (NodeId dht)
320 , FiniteBits (NodeId dht)
321 , Show (NodeId dht)
322 , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
323 , Ord (TransactionID dht)
324 , Serialize (TransactionID dht)
325 , SerializableTo raw (Response dht (NodeFound dht ip))
326 , SerializableTo raw (Query dht (FindNode dht ip))
327 , SerializableTo raw (Response dht (Ping dht))
328 , SerializableTo raw (Query dht (Ping dht))
329 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
245ioFindNode ih = do 330ioFindNode ih = do
246 session <- ask 331 session <- ask
247 return $ \ni -> runDHT session $ do 332 return $ \ni -> runDHT session $ do
248 NodeFound ns <- FindNode (toNodeId ih) <@> nodeAddr ni 333 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni
249 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns 334 let ns' = L.map (fmap (const def)) ns
250 335 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns'
251isearch :: (Ord r, Ord ip) => 336
252 (InfoHash -> DHT ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [r]))) 337
253 -> InfoHash 338-- | Like ioFindNode, but considers all found nodes to be 'Right' results.
254 -> DHT ip (ThreadId, Search.IterativeSearch ip r) 339ioFindNodes :: ( DHT.Kademlia dht
340 , WireFormat raw dht
341 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
342 , Address ip
343 , Default u
344 , Show u
345 , Show (QueryMethod dht)
346 , TableKey dht infohash
347 , Eq (NodeId dht)
348 , Ord (NodeId dht)
349 , FiniteBits (NodeId dht)
350 , Show (NodeId dht)
351 , KRPC (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
352 , Ord (TransactionID dht)
353 , Serialize (TransactionID dht)
354 , SerializableTo raw (Response dht (NodeFound dht ip))
355 , SerializableTo raw (Query dht (FindNode dht ip))
356 , SerializableTo raw (Response dht (Ping dht))
357 , SerializableTo raw (Query dht (Ping dht))
358 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
359ioFindNodes ih = do
360 session <- ask
361 return $ \ni -> runDHT session $ do
362 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> nodeAddr ni
363 let ns' = L.map (fmap (const def)) ns
364 return ([], ns')
365
366isearch :: ( Ord r
367 , Ord ip
368 , Ord (NodeId dht)
369 , FiniteBits (NodeId dht)
370 , TableKey dht ih
371 , Show ih) =>
372 (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])))
373 -> ih
374 -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r)
255isearch f ih = do 375isearch f ih = do
256 qry <- f ih 376 qry <- f ih
257 ns <- kclosest 8 ih <$> getTable 377 ns <- kclosest 8 ih <$> getTable
@@ -263,8 +383,25 @@ isearch f ih = do
263 -- atomically \$ readTVar (Search.searchResults s) 383 -- atomically \$ readTVar (Search.searchResults s)
264 return (a, s) 384 return (a, s)
265 385
266 386-- | Background search: fill a lazy list using a background thread.
267type Search ip o = Conduit [NodeInfo KMessageOf ip ()] (DHT ip) [o KMessageOf ip ()] 387bgsearch f ih = do
388 (tid, s) <- isearch f ih
389 let again shown = do
390 (chk,fin) <- atomically $ do
391 r <- (Set.\\ shown) <$> readTVar (Search.searchResults s)
392 if not $ Set.null r
393 then (,) r <$> Search.searchIsFinished s
394 else Search.searchIsFinished s >>= check >> return (Set.empty,True)
395 let ps = Set.toList chk
396 if fin then return ps
397 else do
398 xs <- unsafeInterleaveIO $ again (shown `Set.union` chk)
399 return $ ps ++ xs
400 liftIO $ again Set.empty
401
402type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u]
403
404#if 0
268 405
269-- TODO: use reorder and filter (Traversal option) leftovers 406-- TODO: use reorder and filter (Traversal option) leftovers
270-- search :: k -> IterationI ip o -> Search ip o 407-- search :: k -> IterationI ip o -> Search ip o
@@ -275,17 +412,36 @@ search _ action = do
275 let (nodes, results) = partitionEithers responses 412 let (nodes, results) = partitionEithers responses
276 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) 413 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results)))
277 leftover $ L.concat nodes 414 leftover $ L.concat nodes
278 mapM_ yield results 415 let r = mapM_ yield results
279 416 _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ())
280publish :: Address ip => InfoHash -> PortNumber -> DHT ip () 417 r
281publish ih p = do
282 nodes <- getClosest ih
283 r <- asks (optReplication . options)
284 _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
285 return ()
286 418
419#endif
287 420
288probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) 421publish = error "todo"
422-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip ()
423-- publish ih p = do
424 -- nodes <- getClosest ih
425 -- r <- asks (optReplication . options)
426 -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
427 -- return ()
428
429
430probeNode :: ( Default u
431 , Show u
432 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
433 , DHT.Kademlia dht
434 , Address ip
435 , Ord (TransactionID dht)
436 , Serialize (TransactionID dht)
437 , WireFormat raw dht
438 , SerializableTo raw (Response dht (Ping dht))
439 , SerializableTo raw (Query dht (Ping dht))
440 , Ord (NodeId dht)
441 , FiniteBits (NodeId dht)
442 , Show (NodeId dht)
443 , Show (QueryMethod dht)
444 ) => NodeAddr ip -> DHT raw dht u ip (Bool , Maybe ReflectedIP)
289probeNode addr = do 445probeNode addr = do
290 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr))) 446 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint addr)))
291 result <- try $ pingQ addr 447 result <- try $ pingQ addr
@@ -293,8 +449,16 @@ probeNode addr = do
293 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result 449 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result
294 450
295 451
452refreshNodes :: forall raw dht u ip.
453 ( Address ip
454 , Ord (NodeId dht)
455 , Default u
456 , FiniteBits (NodeId dht)
457 , Pretty (NodeId dht)
458 , DHT.Kademlia dht ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()]
459refreshNodes _ = return () -- TODO
460#if 0
296-- FIXME do not use getClosest sinse we should /refresh/ them 461-- FIXME do not use getClosest sinse we should /refresh/ them
297refreshNodes :: Address ip => NodeId KMessageOf -> DHT ip () -- [NodeInfo KMessageOf ip ()]
298refreshNodes nid = do 462refreshNodes nid = do
299 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) 463 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
300 nodes <- getClosest nid 464 nodes <- getClosest nid
@@ -304,7 +468,7 @@ refreshNodes nid = do
304 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () 468 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) ()
305 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () 469 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) ()
306 -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume 470 -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume
307 nss <- sourceList [nodes] $= search nid (findNodeQ nid) $$ C.consume 471 nss <- sourceList [nodes] $= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume
308 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes." 472 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length (L.concat nss))) <> " nodes."
309 _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do 473 _ <- queryParallel $ flip L.map (L.concat nss) $ \n -> do
310 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) 474 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
@@ -312,8 +476,9 @@ refreshNodes nid = do
312 -- pingQ takes care of inserting the node. 476 -- pingQ takes care of inserting the node.
313 return () 477 return ()
314 return () -- \$ L.concat nss 478 return () -- \$ L.concat nss
479#endif
315 480
316logc :: Char -> String -> DHT ip () 481logc :: Char -> String -> DHT raw dht u ip ()
317logc 'D' = $(logDebugS) "insertNode" . T.pack 482logc 'D' = $(logDebugS) "insertNode" . T.pack
318logc 'W' = $(logWarnS) "insertNode" . T.pack 483logc 'W' = $(logWarnS) "insertNode" . T.pack
319logc 'I' = $(logInfoS) "insertNode" . T.pack 484logc 'I' = $(logInfoS) "insertNode" . T.pack
@@ -321,12 +486,46 @@ logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
321 486
322-- | This operation do not block but acquire exclusive access to 487-- | This operation do not block but acquire exclusive access to
323-- routing table. 488-- routing table.
324insertNode :: forall ip. Address ip => NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> DHT ip () 489insertNode :: forall raw dht u ip.
490 ( Address ip
491 , Ord (NodeId dht)
492 , FiniteBits (NodeId dht)
493 , Show (NodeId dht)
494 , Default u
495 , Show u
496 , DHT.Kademlia dht
497 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
498 , Ord (TransactionID dht)
499 , WireFormat raw dht
500 , Serialize (TransactionID dht)
501 , SerializableTo raw (Response dht (Ping dht))
502 , SerializableTo raw (Query dht (Ping dht))
503 , Ord (NodeId dht)
504 , Show (NodeId dht)
505 , Show (QueryMethod dht)
506 ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip ()
325insertNode info witnessed_ip0 = do 507insertNode info witnessed_ip0 = do
326 f <- insertNode1 508 f <- insertNode1
327 liftIO $ f info witnessed_ip0 509 liftIO $ f info witnessed_ip0
328 510
329insertNode1 :: forall ip. Address ip => DHT ip (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) 511insertNode1 :: forall raw dht u ip.
512 ( Address ip
513 , Default u
514 , Show u
515 , Ord (NodeId dht)
516 , FiniteBits (NodeId dht)
517 , Show (NodeId dht)
518 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
519 , DHT.Kademlia dht
520 , Ord (TransactionID dht)
521 , WireFormat raw dht
522 , Serialize (TransactionID dht)
523 , SerializableTo raw (Response dht (Ping dht))
524 , SerializableTo raw (Query dht (Ping dht))
525 , Ord (NodeId dht)
526 , Show (NodeId dht)
527 , Show (QueryMethod dht)
528 ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
330insertNode1 = do 529insertNode1 = do
331 bc <- optBucketCount <$> asks options 530 bc <- optBucketCount <$> asks options
332 nid <- asks tentativeNodeId 531 nid <- asks tentativeNodeId
@@ -335,15 +534,17 @@ insertNode1 = do
335 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. 534 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state.
336 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) 535 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
337 let probe n = probe0 n >>= runDHT dht_node_state . restoreM 536 let probe n = probe0 n >>= runDHT dht_node_state . restoreM
537 {-
338 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive 538 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive
339 ip <- fromSockAddr ip0 :: Maybe ip 539 ip <- fromSockAddr ip0 :: Maybe ip
340 listToMaybe 540 listToMaybe
341 $ rank id (nodeId $ foreignNode arrival) 541 $ rank id (nodeId $ foreignNode arrival)
342 $ bep42s ip (DHT.fallbackID params) -- warning: recursive 542 $ bep42s ip (DHT.fallbackID params) -- warning: recursive
543 -}
343 params = DHT.TableParameters 544 params = DHT.TableParameters
344 { maxBuckets = bc :: Int 545 { maxBuckets = bc :: Int
345 , fallbackID = nid :: NodeId KMessageOf 546 , fallbackID = nid :: NodeId dht
346 , adjustID = changeip :: SockAddr -> Event KMessageOf ip () -> NodeId KMessageOf 547 , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
347 , logMessage = logm :: Char -> String -> IO () 548 , logMessage = logm :: Char -> String -> IO ()
348 , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP) 549 , pingProbe = probe :: NodeAddr ip -> IO (Bool, Maybe ReflectedIP)
349 } 550 }
@@ -356,25 +557,75 @@ insertNode1 = do
356 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 557 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
357 558
358-- | Throws exception if node is not responding. 559-- | Throws exception if node is not responding.
359queryNode :: forall a b ip. Address ip => KRPC (Query a) (Response b) 560queryNode :: forall raw dht u a b ip.
360 => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b) 561 ( Address ip
562 , KRPC (Query dht a) (Response dht b)
563 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
564 , Default u
565 , Show u
566 , DHT.Kademlia dht
567 , Ord (TransactionID dht)
568 , Serialize (TransactionID dht)
569 , WireFormat raw dht
570 , SerializableTo raw (Response dht b)
571 , SerializableTo raw (Query dht a)
572 , Ord (NodeId dht)
573 , FiniteBits (NodeId dht)
574 , Show (NodeId dht)
575 , Show (QueryMethod dht)
576 , SerializableTo raw (Response dht (Ping dht))
577 , SerializableTo raw (Query dht (Ping dht))
578 ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b)
361queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q 579queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
362 580
363queryNode' :: forall a b ip. Address ip => KRPC (Query a) (Response b) 581queryNode' :: forall raw dht u a b ip.
364 => NodeAddr ip -> a -> DHT ip (NodeId KMessageOf, b, Maybe ReflectedIP) 582 ( Address ip
583 , Default u
584 , Show u
585 , DHT.Kademlia dht
586 , KRPC (Query dht a) (Response dht b)
587 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
588 , Ord (TransactionID dht)
589 , Serialize (TransactionID dht)
590 , WireFormat raw dht
591 , SerializableTo raw (Response dht b)
592 , SerializableTo raw (Query dht a)
593 , Ord (NodeId dht)
594 , FiniteBits (NodeId dht)
595 , Show (NodeId dht)
596 , Show (QueryMethod dht)
597 , SerializableTo raw (Response dht (Ping dht))
598 , SerializableTo raw (Query dht (Ping dht))
599 ) => NodeAddr ip -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
365queryNode' addr q = do 600queryNode' addr q = do
366 nid <- myNodeIdAccordingTo addr 601 nid <- myNodeIdAccordingTo addr
367 let read_only = False -- TODO: check for NAT issues. (BEP 43) 602 let read_only = False -- TODO: check for NAT issues. (BEP 43)
368 let KRPC.Method name = KRPC.method :: KRPC.Method (Query a) (Response b) 603 let KRPC.Method name = KRPC.method :: KRPC.Method (Query dht a) (Response dht b)
369 mgr <- asks manager 604 mgr <- asks manager
370 (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr name (toSockAddr addr) (Query nid read_only q) 605 (Response remoteId r, witnessed_ip) <- liftIO $ query' mgr (error "TODO: name") (toSockAddr addr) (Query nid read_only q)
371 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) 606 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
372 -- <> " by " <> T.pack (show (toSockAddr addr)) 607 -- <> " by " <> T.pack (show (toSockAddr addr))
373 _ <- insertNode (NodeInfo remoteId addr ()) witnessed_ip 608 _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip
374 return (remoteId, r, witnessed_ip) 609 return (remoteId, r, witnessed_ip)
375 610
376-- | Infix version of 'queryNode' function. 611-- | Infix version of 'queryNode' function.
377(<@>) :: Address ip => KRPC (Query a) (Response b) 612(<@>) :: ( Address ip
378 => a -> NodeAddr ip -> DHT ip b 613 , KRPC (Query dht a) (Response dht b)
614 , KRPC (Query dht (Ping dht)) (Response dht (Ping dht))
615 , Default u
616 , Show u
617 , Show (QueryMethod dht)
618 , Ord (NodeId dht)
619 , FiniteBits (NodeId dht)
620 , Show (NodeId dht)
621 , Ord (TransactionID dht)
622 , Serialize (TransactionID dht)
623 , SerializableTo raw (Response dht b)
624 , SerializableTo raw (Query dht a)
625 , SerializableTo raw (Response dht (Ping dht))
626 , SerializableTo raw (Query dht (Ping dht))
627 , WireFormat raw dht
628 , Kademlia dht
629 ) => a -> NodeAddr ip -> DHT raw dht u ip b
379q <@> addr = snd <$> queryNode addr q 630q <@> addr = snd <$> queryNode addr q
380{-# INLINE (<@>) #-} 631{-# INLINE (<@>) #-}
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
index f5cd7834..356f6fd9 100644
--- a/src/Network/BitTorrent/DHT/Search.hs
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -2,6 +2,7 @@
2{-# LANGUAGE PatternSynonyms #-} 2{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE RecordWildCards #-} 3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-} 4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-}
5module Network.BitTorrent.DHT.Search where 6module Network.BitTorrent.DHT.Search where
6 7
7import Control.Concurrent 8import Control.Concurrent
@@ -25,27 +26,23 @@ import qualified Data.Wrapper.PSQ as PSQ
25 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) 26 ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ)
26import Network.Address hiding (NodeId) 27import Network.Address hiding (NodeId)
27import Network.DatagramServer.Types 28import Network.DatagramServer.Types
28#ifdef VERSION_bencoding 29import Data.Bits
29import Network.DatagramServer.Mainline (KMessageOf)
30type Ann = ()
31#else
32import Network.DatagramServer.Tox as Tox
33type KMessageOf = Tox.Message
34type Ann = Bool
35#endif
36 30
37data IterativeSearch ip r = IterativeSearch 31data IterativeSearch dht u ip r = IterativeSearch
38 { searchTarget :: NodeId KMessageOf 32 { searchTarget :: NodeId dht
39 , searchQuery :: NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r]) 33 , searchQuery :: NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])
40 , searchPendingCount :: TVar Int 34 , searchPendingCount :: TVar Int
41 , searchQueued :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) 35 , searchQueued :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)))
42 , searchInformant :: TVar (MinMaxPSQ (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf))) 36 , searchInformant :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)))
43 , searchVisited :: TVar (Set (NodeAddr ip)) 37 , searchVisited :: TVar (Set (NodeAddr ip))
44 , searchResults :: TVar (Set r) 38 , searchResults :: TVar (Set r)
45 } 39 }
46 40
47newSearch :: Eq ip => (NodeInfo KMessageOf ip Ann -> IO ([NodeInfo KMessageOf ip Ann], [r])) 41newSearch :: ( Eq ip
48 -> NodeId KMessageOf -> [NodeInfo KMessageOf ip Ann] -> IO (IterativeSearch ip r) 42 , Ord (NodeId dht)
43 , FiniteBits (NodeId dht)
44 ) => (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))
45 -> NodeId dht -> [NodeInfo dht ip u] -> IO (IterativeSearch dht u ip r)
49newSearch qry target ns = atomically $ do 46newSearch qry target ns = atomically $ do
50 c <- newTVar 0 47 c <- newTVar 0
51 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns 48 q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns
@@ -60,9 +57,14 @@ searchAlpha = 3
60searchK :: Int 57searchK :: Int
61searchK = 8 58searchK = 8
62 59
63sendQuery :: forall a ip. (Ord a, Ord ip) => 60sendQuery :: forall a ip dht u.
64 IterativeSearch ip a 61 ( Ord a
65 -> Binding (NodeInfo KMessageOf ip Ann) (NodeDistance (NodeId KMessageOf)) 62 , Ord ip
63 , Ord (NodeId dht)
64 , FiniteBits (NodeId dht)
65 ) =>
66 IterativeSearch dht u ip a
67 -> Binding (NodeInfo dht ip u) (NodeDistance (NodeId dht))
66 -> IO () 68 -> IO ()
67sendQuery IterativeSearch{..} (ni :-> d) = do 69sendQuery IterativeSearch{..} (ni :-> d) = do
68 (ns,rs) <- handle (\(SomeException e) -> return ([],[])) 70 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
@@ -71,9 +73,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do
71 modifyTVar searchPendingCount pred 73 modifyTVar searchPendingCount pred
72 vs <- readTVar searchVisited 74 vs <- readTVar searchVisited
73 -- We only queue a node if it is not yet visited 75 -- We only queue a node if it is not yet visited
74 let insertFoundNode :: NodeInfo KMessageOf ip u 76 let insertFoundNode :: NodeInfo dht ip u
75 -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) 77 -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))
76 -> MinMaxPSQ (NodeInfo KMessageOf ip u) (NodeDistance (NodeId KMessageOf)) 78 -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))
77 insertFoundNode n q 79 insertFoundNode n q
78 | nodeAddr n `Set.member` vs = q 80 | nodeAddr n `Set.member` vs = q
79 | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q 81 | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q
@@ -82,7 +84,9 @@ sendQuery IterativeSearch{..} (ni :-> d) = do
82 modifyTVar searchResults $ \s -> foldr Set.insert s rs 84 modifyTVar searchResults $ \s -> foldr Set.insert s rs
83 85
84 86
85searchIsFinished :: Ord ip => IterativeSearch ip r -> STM Bool 87searchIsFinished :: ( Ord ip
88 , Ord (NodeId dht)
89 ) => IterativeSearch dht u ip r -> STM Bool
86searchIsFinished IterativeSearch{..} = do 90searchIsFinished IterativeSearch{..} = do
87 q <- readTVar searchQueued 91 q <- readTVar searchQueued
88 cnt <- readTVar searchPendingCount 92 cnt <- readTVar searchPendingCount
@@ -94,8 +98,8 @@ searchIsFinished IterativeSearch{..} = do
94 <= PSQ.prio (fromJust $ MM.findMin q)))) 98 <= PSQ.prio (fromJust $ MM.findMin q))))
95 99
96search :: 100search ::
97 (Ord r, Ord ip) => 101 (Ord r, Ord ip, Ord (NodeId dht), FiniteBits (NodeId dht)) =>
98 IterativeSearch ip r -> IO () 102 IterativeSearch dht u ip r -> IO ()
99search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do 103search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
100 fix $ \again -> do 104 fix $ \again -> do
101 join $ atomically $ do 105 join $ atomically $ do
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index d4794038..f96ba707 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -96,6 +96,7 @@ import Control.Monad.Trans.Control
96import Control.Monad.Trans.Resource 96import Control.Monad.Trans.Resource
97import Data.Typeable 97import Data.Typeable
98import Data.String 98import Data.String
99import Data.Bits
99import Data.ByteString 100import Data.ByteString
100import Data.Conduit.Lazy 101import Data.Conduit.Lazy
101import Data.Default 102import Data.Default
@@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber)
265type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () 266type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
266 267
267-- | DHT session keep track state of /this/ node. 268-- | DHT session keep track state of /this/ node.
268data Node ip = Node 269data Node raw dht u ip = Node
269 { -- | Session configuration; 270 { -- | Session configuration;
270 options :: !Options 271 options :: !Options
271 272
272 -- | Pseudo-unique self-assigned session identifier. This value is 273 -- | Pseudo-unique self-assigned session identifier. This value is
273 -- constant during DHT session and (optionally) between sessions. 274 -- constant during DHT session and (optionally) between sessions.
274#ifdef VERSION_bencoding 275 , tentativeNodeId :: !(NodeId dht)
275 , tentativeNodeId :: !(NodeId KMessageOf)
276#else
277 , tentativeNodeId :: !(NodeId Tox.Message)
278#endif
279 276
280 , resources :: !InternalState 277 , resources :: !InternalState
281#ifdef VERSION_bencoding 278 , manager :: !(Manager raw dht) -- ^ RPC manager;
282 , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; 279 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table;
283 , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table;
284#else
285 , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager;
286 , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table;
287#endif
288 , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; 280 , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes;
289 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; 281 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
290 , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. 282 , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs.
@@ -293,23 +285,23 @@ data Node ip = Node
293 285
294-- | DHT keep track current session and proper resource allocation for 286-- | DHT keep track current session and proper resource allocation for
295-- safe multithreading. 287-- safe multithreading.
296newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } 288newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a }
297 deriving ( Functor, Applicative, Monad, MonadIO 289 deriving ( Functor, Applicative, Monad, MonadIO
298 , MonadBase IO, MonadReader (Node ip), MonadThrow 290 , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow
299 ) 291 )
300 292
301#if MIN_VERSION_monad_control(1,0,0) 293#if MIN_VERSION_monad_control(1,0,0)
302newtype DHTStM ip a = StM { 294newtype DHTStM raw dht u ip a = StM {
303 unSt :: StM (ReaderT (Node ip) IO) a 295 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
304 } 296 }
305#endif 297#endif
306 298
307instance MonadBaseControl IO (DHT ip) where 299instance MonadBaseControl IO (DHT raw dht u ip) where
308#if MIN_VERSION_monad_control(1,0,0) 300#if MIN_VERSION_monad_control(1,0,0)
309 type StM (DHT ip) a = DHTStM ip a 301 type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a
310#else 302#else
311 newtype StM (DHT ip) a = StM { 303 newtype StM (DHT raw dht u ip) a = StM {
312 unSt :: StM (ReaderT (Node ip) IO) a 304 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
313 } 305 }
314#endif 306#endif
315 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> 307 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
@@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where
321 313
322-- | Check is it is possible to run 'queryNode' or handle pending 314-- | Check is it is possible to run 'queryNode' or handle pending
323-- query from remote node. 315-- query from remote node.
324instance MonadActive (DHT ip) where 316instance MonadActive (DHT raw dht u ip) where
325 monadActive = getManager >>= liftIO . isActive 317 monadActive = getManager >>= liftIO . isActive
326 {-# INLINE monadActive #-} 318 {-# INLINE monadActive #-}
327 319
328-- | All allocated resources will be closed at 'closeNode'. 320-- | All allocated resources will be closed at 'closeNode'.
329instance MonadResource (DHT ip) where 321instance MonadResource (DHT raw dht u ip) where
330 liftResourceT m = do 322 liftResourceT m = do
331 s <- asks resources 323 s <- asks resources
332 liftIO $ runInternalState m s 324 liftIO $ runInternalState m s
333 325
334-- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where 326-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where
335 327
336getManager :: DHT ip (Manager IO BValue KMessageOf) 328getManager :: DHT raw dht u ip (Manager raw dht)
337getManager = asks manager 329getManager = asks manager
338 330
339instance MonadLogger (DHT ip) where 331instance MonadLogger (DHT raw dht u ip) where
340 monadLoggerLog loc src lvl msg = do 332 monadLoggerLog loc src lvl msg = do
341 logger <- asks loggerFun 333 logger <- asks loggerFun
342 liftIO $ logger loc src lvl (toLogStr msg) 334 liftIO $ logger loc src lvl (toLogStr msg)
@@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where
344#ifdef VERSION_bencoding 336#ifdef VERSION_bencoding
345type NodeHandler = Handler IO KMessageOf BValue 337type NodeHandler = Handler IO KMessageOf BValue
346#else 338#else
347type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString 339type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString
348#endif 340#endif
349 341
350logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () 342logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO ()
@@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of
376-- | Run DHT session. You /must/ properly close session using 368-- | Run DHT session. You /must/ properly close session using
377-- 'closeNode' function, otherwise socket or other scarce resources may 369-- 'closeNode' function, otherwise socket or other scarce resources may
378-- leak. 370-- leak.
379newNode :: Address ip 371newNode :: ( Address ip
372 , FiniteBits (NodeId dht)
373 , Serialize (NodeId dht)
374 )
380 => -- [NodeHandler] -- ^ handlers to run on accepted queries; 375 => -- [NodeHandler] -- ^ handlers to run on accepted queries;
381 Options -- ^ various dht options; 376 Options -- ^ various dht options;
382 -> NodeAddr ip -- ^ node address to bind; 377 -> NodeAddr ip -- ^ node address to bind;
383 -> LogFun -- ^ invoked on log messages; 378 -> LogFun -- ^ invoked on log messages;
384#ifdef VERSION_bencoding 379 -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated.
385 -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. 380 -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address.
386#else
387 -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated.
388#endif
389 -> IO (Node ip) -- ^ a new DHT node running at given address.
390newNode opts naddr logger mbid = do 381newNode opts naddr logger mbid = do
391 s <- createInternalState 382 s <- createInternalState
392 runInternalState initNode s 383 runInternalState initNode s
@@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do
409 400
410-- | Some resources like listener thread may live for 401-- | Some resources like listener thread may live for
411-- some short period of time right after this DHT session closed. 402-- some short period of time right after this DHT session closed.
412closeNode :: Node ip -> IO () 403closeNode :: Node raw dht u ip -> IO ()
413closeNode Node {..} = closeInternalState resources 404closeNode Node {..} = closeInternalState resources
414 405
415-- | Run DHT operation on the given session. 406-- | Run DHT operation on the given session.
416runDHT :: Node ip -> DHT ip a -> IO a 407runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a
417runDHT node action = runReaderT (unDHT action) node 408runDHT node action = runReaderT (unDHT action) node
418{-# INLINE runDHT #-} 409{-# INLINE runDHT #-}
419 410
@@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do
453-----------------------------------------------------------------------} 444-----------------------------------------------------------------------}
454 445
455-- | This nodes externally routable address reported by remote peers. 446-- | This nodes externally routable address reported by remote peers.
456routableAddress :: DHT ip (Maybe SockAddr) 447routableAddress :: DHT raw dht u ip (Maybe SockAddr)
457routableAddress = do 448routableAddress = do
458 info <- asks routingInfo >>= liftIO . atomically . readTVar 449 info <- asks routingInfo >>= liftIO . atomically . readTVar
459 return $ myAddress <$> info 450 return $ myAddress <$> info
460 451
461-- | The current NodeId that the given remote node should know us by. 452-- | The current NodeId that the given remote node should know us by.
462#ifdef VERSION_bencoding 453myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht)
463myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf)
464#else
465myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message)
466#endif
467myNodeIdAccordingTo _ = do 454myNodeIdAccordingTo _ = do
468 info <- asks routingInfo >>= liftIO . atomically . readTVar 455 info <- asks routingInfo >>= liftIO . atomically . readTVar
469 maybe (asks tentativeNodeId) 456 maybe (asks tentativeNodeId)
470 (return . myNodeId) 457 (return . myNodeId)
471 info 458 info
472 459
473myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) 460myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) )
474myNodeIdAccordingTo1 = do 461myNodeIdAccordingTo1 = do
475 var <- asks routingInfo 462 var <- asks routingInfo
476 tid <- asks tentativeNodeId 463 tid <- asks tentativeNodeId
@@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do
480 467
481-- | Get current routing table. Normally you don't need to use this 468-- | Get current routing table. Normally you don't need to use this
482-- function, but it can be usefull for debugging and profiling purposes. 469-- function, but it can be usefull for debugging and profiling purposes.
483#ifdef VERSION_bencoding 470getTable :: Eq ip => DHT raw dht u ip (Table dht ip u)
484getTable :: Eq ip => DHT ip (Table KMessageOf ip ())
485#else
486getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool)
487#endif
488getTable = do 471getTable = do
489 Node { tentativeNodeId = myId 472 Node { tentativeNodeId = myId
490 , routingInfo = var 473 , routingInfo = var
@@ -492,18 +475,18 @@ getTable = do
492 let nil = nullTable myId (optBucketCount opts) 475 let nil = nullTable myId (optBucketCount opts)
493 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) 476 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var))
494 477
495getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] 478getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ]
496getSwarms = do 479getSwarms = do
497 store <- asks contactInfo >>= liftIO . atomically . readTVar 480 store <- asks contactInfo >>= liftIO . atomically . readTVar
498 return $ P.knownSwarms store 481 return $ P.knownSwarms store
499 482
500savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString 483savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString
501savePeerStore = do 484savePeerStore = do
502 var <- asks contactInfo 485 var <- asks contactInfo
503 peers <- liftIO $ atomically $ readTVar var 486 peers <- liftIO $ atomically $ readTVar var
504 return $ S.encode peers 487 return $ S.encode peers
505 488
506mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () 489mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip ()
507mergeSavedPeers bs = do 490mergeSavedPeers bs = do
508 var <- asks contactInfo 491 var <- asks contactInfo
509 case S.decode bs of 492 case S.decode bs of
@@ -511,7 +494,7 @@ mergeSavedPeers bs = do
511 Left _ -> return () 494 Left _ -> return ()
512 495
513 496
514allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] 497allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ]
515allPeers ih = do 498allPeers ih = do
516 store <- asks contactInfo >>= liftIO . atomically . readTVar 499 store <- asks contactInfo >>= liftIO . atomically . readTVar
517 return $ P.lookup ih store 500 return $ P.lookup ih store
@@ -521,18 +504,20 @@ allPeers ih = do
521-- 504--
522-- This operation used for 'find_nodes' query. 505-- This operation used for 'find_nodes' query.
523-- 506--
524#ifdef VERSION_bencoding 507getClosest :: ( Eq ip
525getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] 508 , Ord (NodeId dht)
526#else 509 , FiniteBits (NodeId dht)
527getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] 510 , TableKey dht k ) =>
528#endif 511 k -> DHT raw dht u ip [NodeInfo dht ip u]
529getClosest node = do 512getClosest node = do
530 k <- asks (optK . options) 513 k <- asks (optK . options)
531 kclosest k node <$> getTable 514 kclosest k node <$> getTable
532 515
533getClosest1 :: ( Eq ip 516getClosest1 :: ( Eq ip
534 , TableKey KMessageOf k 517 , Ord (NodeId dht)
535 ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) 518 , FiniteBits (NodeId dht)
519 , TableKey dht k
520 ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u])
536getClosest1 = do 521getClosest1 = do
537 k <- asks (optK . options) 522 k <- asks (optK . options)
538 nobkts <- asks (optBucketCount . options) 523 nobkts <- asks (optBucketCount . options)
@@ -574,13 +559,12 @@ getTimestamp = do
574 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) 559 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime)))
575 return $ utcTimeToPOSIXSeconds utcTime 560 return $ utcTimeToPOSIXSeconds utcTime
576 561
577
578#ifdef VERSION_bencoding 562#ifdef VERSION_bencoding
579-- | Prepare result for 'get_peers' query. 563-- | Prepare result for 'get_peers' query.
580-- 564--
581-- This operation use 'getClosest' as failback so it may block. 565-- This operation use 'getClosest' as failback so it may block.
582-- 566--
583getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) 567getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip)
584getPeerList ih = do 568getPeerList ih = do
585 var <- asks contactInfo 569 var <- asks contactInfo
586 ps <- liftIO $ lookupPeers var ih 570 ps <- liftIO $ lookupPeers var ih
@@ -588,7 +572,7 @@ getPeerList ih = do
588 then Left <$> getClosest ih 572 then Left <$> getClosest ih
589 else return (Right ps) 573 else return (Right ps)
590 574
591getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) 575getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip))
592getPeerList1 = do 576getPeerList1 = do
593 var <- asks contactInfo 577 var <- asks contactInfo
594 getclosest <- getClosest1 578 getclosest <- getClosest1
@@ -599,12 +583,12 @@ getPeerList1 = do
599 else return (Right ps) 583 else return (Right ps)
600 584
601 585
602insertTopic :: InfoHash -> PortNumber -> DHT ip () 586insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
603insertTopic ih p = do 587insertTopic ih p = do
604 var <- asks announceInfo 588 var <- asks announceInfo
605 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) 589 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p))
606 590
607deleteTopic :: InfoHash -> PortNumber -> DHT ip () 591deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
608deleteTopic ih p = do 592deleteTopic ih p = do
609 var <- asks announceInfo 593 var <- asks announceInfo
610 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) 594 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p))
@@ -616,7 +600,7 @@ deleteTopic ih p = do
616-----------------------------------------------------------------------} 600-----------------------------------------------------------------------}
617 601
618-- | Failed queries are ignored. 602-- | Failed queries are ignored.
619queryParallel :: [DHT ip a] -> DHT ip [a] 603queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a]
620queryParallel queries = do 604queryParallel queries = do
621 -- TODO: use alpha 605 -- TODO: use alpha
622 -- alpha <- asks (optAlpha . options) 606 -- alpha <- asks (optAlpha . options)
diff --git a/src/Network/DHT/Mainline.hs b/src/Network/DHT/Mainline.hs
index d118ceb0..29d4231d 100644
--- a/src/Network/DHT/Mainline.hs
+++ b/src/Network/DHT/Mainline.hs
@@ -123,6 +123,8 @@ import Network.BitTorrent.DHT.Token
123import Network.DatagramServer () 123import Network.DatagramServer ()
124#endif 124#endif
125import Network.DatagramServer.Types hiding (Query,Response) 125import Network.DatagramServer.Types hiding (Query,Response)
126import Network.DHT.Types
127import Network.DHT.Routing
126 128
127{----------------------------------------------------------------------- 129{-----------------------------------------------------------------------
128-- envelopes 130-- envelopes
@@ -140,15 +142,7 @@ read_only_key = "ro"
140 142
141 143
142#ifdef VERSION_bencoding 144#ifdef VERSION_bencoding
143-- | All queries have an \"id\" key and value containing the node ID 145instance BEncode a => BEncode (Query KMessageOf a) where
144-- of the querying node.
145data Query a = Query
146 { queringNodeId :: NodeId KMessageOf -- ^ node id of /quering/ node;
147 , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43
148 , queryParams :: a -- ^ query parameters.
149 } deriving (Show, Eq, Typeable)
150
151instance BEncode a => BEncode (Query a) where
152 toBEncode Query {..} = toDict $ 146 toBEncode Query {..} = toDict $
153 BDict.union ( node_id_key .=! queringNodeId 147 BDict.union ( node_id_key .=! queringNodeId
154 .: read_only_key .=? bool Nothing (Just (1 :: Integer)) queryIsReadOnly 148 .: read_only_key .=? bool Nothing (Just (1 :: Integer)) queryIsReadOnly
@@ -167,14 +161,7 @@ data Query a = Query a
167#endif 161#endif
168 162
169#ifdef VERSION_bencoding 163#ifdef VERSION_bencoding
170-- | All responses have an \"id\" key and value containing the node ID 164instance BEncode a => BEncode (Response KMessageOf a) where
171-- of the responding node.
172data Response a = Response
173 { queredNodeId :: NodeId KMessageOf -- ^ node id of /quered/ node;
174 , responseVals :: a -- ^ query result.
175 } deriving (Show, Eq, Typeable)
176
177instance BEncode a => BEncode (Response a) where
178 toBEncode = toBEncode . toQuery 165 toBEncode = toBEncode . toQuery
179 where 166 where
180 toQuery (Response nid a) = Query nid False a 167 toQuery (Response nid a) = Query nid False a
@@ -183,28 +170,23 @@ instance BEncode a => BEncode (Response a) where
183 where 170 where
184 fromQuery (Query nid _ a) = Response nid a 171 fromQuery (Query nid _ a) = Response nid a
185#else 172#else
186data Response a = Response a 173data Response KMessageOf a = Response KMessageOf a
187#endif 174#endif
188 175
189{----------------------------------------------------------------------- 176{-----------------------------------------------------------------------
190-- ping method 177-- ping method
191-----------------------------------------------------------------------} 178-----------------------------------------------------------------------}
192 179
193-- | The most basic query is a ping. Ping query is used to check if a 180-- / The most basic query is a ping. Ping query is used to check if a
194-- quered node is still alive. 181-- quered node is still alive.
195#ifdef VERSION_bencoding 182-- data Ping = Ping Tox.Nonce8 deriving (Show, Eq, Typeable)
196data Ping = Ping
197#else
198data Ping = Ping Tox.Nonce8
199#endif
200 deriving (Show, Eq, Typeable)
201 183
202#ifdef VERSION_bencoding 184#ifdef VERSION_bencoding
203instance BEncode Ping where 185instance BEncode (Ping KMessageOf) where
204 toBEncode Ping = toDict endDict 186 toBEncode Ping = toDict endDict
205 fromBEncode _ = pure Ping 187 fromBEncode _ = pure Ping
206#else 188#else
207instance Serialize (Query Ping) where 189instance Serialize (Query (Ping KMessageOf)) where
208 get = do 190 get = do
209 b <- get 191 b <- get
210 when ( (b::Word8) /= 0) $ fail "Bad ping request" 192 when ( (b::Word8) /= 0) $ fail "Bad ping request"
@@ -225,7 +207,7 @@ instance Serialize (Response Ping) where
225#endif 207#endif
226 208
227-- | \"q\" = \"ping\" 209-- | \"q\" = \"ping\"
228instance KRPC (Query Ping) (Response Ping) where 210instance KRPC (Query KMessageOf (Ping KMessageOf)) (Response KMessageOf (Ping KMessageOf)) where
229#ifdef VERSION_bencoding 211#ifdef VERSION_bencoding
230 method = "ping" 212 method = "ping"
231#else 213#else
@@ -236,24 +218,20 @@ instance KRPC (Query Ping) (Response Ping) where
236-- find_node method 218-- find_node method
237-----------------------------------------------------------------------} 219-----------------------------------------------------------------------}
238 220
239-- | Find node is used to find the contact information for a node 221-- / Find node is used to find the contact information for a node
240-- given its ID. 222-- given its ID.
241#ifdef VERSION_bencoding 223-- data FindNode KMessageOf ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes
242newtype FindNode ip = FindNode (NodeId KMessageOf) 224 -- deriving (Show, Eq, Typeable)
243#else
244data FindNode ip = FindNode (NodeId Tox.Message) Tox.Nonce8 -- Tox: Get Nodes
245#endif
246 deriving (Show, Eq, Typeable)
247 225
248target_key :: BKey 226target_key :: BKey
249target_key = "target" 227target_key = "target"
250 228
251#ifdef VERSION_bencoding 229#ifdef VERSION_bencoding
252instance Typeable ip => BEncode (FindNode ip) where 230instance Typeable ip => BEncode (FindNode KMessageOf ip) where
253 toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict 231 toBEncode (FindNode nid) = toDict $ target_key .=! nid .: endDict
254 fromBEncode = fromDict $ FindNode <$>! target_key 232 fromBEncode = fromDict $ FindNode <$>! target_key
255#else 233#else
256instance Serialize (Query (FindNode ip)) where 234instance Serialize (Query KMessageOf (FindNode KMessageOf ip)) where
257 get = do 235 get = do
258 nid <- get 236 nid <- get
259 nonce <- get 237 nonce <- get
@@ -268,12 +246,11 @@ instance Serialize (Query (FindNode ip)) where
268-- nodes in its own routing table. 246-- nodes in its own routing table.
269-- 247--
270#ifdef VERSION_bencoding 248#ifdef VERSION_bencoding
271newtype NodeFound ip = NodeFound [NodeInfo KMessageOf ip ()] 249-- newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()] deriving (Show, Eq, Typeable)
272#else 250#else
273data NodeFound ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 251data NodeFound KMessageOf ip = NodeFound [Tox.NodeFormat] Tox.Nonce8 deriving (Show, Eq, Typeable)
274#endif 252#endif
275-- Tox: send_nodes 253-- Tox: send_nodes
276 deriving (Show, Eq, Typeable)
277 254
278nodes_key :: BKey 255nodes_key :: BKey
279nodes_key = "nodes" 256nodes_key = "nodes"
@@ -290,7 +267,7 @@ binary k = field (req k) >>= either (fail . format) return .
290 where 267 where
291 format str = "fail to deserialize " ++ show k ++ " field: " ++ str 268 format str = "fail to deserialize " ++ show k ++ " field: " ++ str
292 269
293instance Address ip => BEncode (NodeFound ip) where 270instance Address ip => BEncode (NodeFound KMessageOf ip) where
294 toBEncode (NodeFound ns) = toDict $ 271 toBEncode (NodeFound ns) = toDict $
295 nodes_key .=! runPut (mapM_ put ns) 272 nodes_key .=! runPut (mapM_ put ns)
296 .: endDict 273 .: endDict
@@ -298,7 +275,7 @@ instance Address ip => BEncode (NodeFound ip) where
298 -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32) 275 -- TODO: handle IPv6 by reading the "nodes6" key (see bep 32)
299 fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval) 276 fromBEncode bval = NodeFound <$> (traverse from4 =<< fromDict (binary nodes_key) bval)
300#else 277#else
301instance Serialize (Response (NodeFound ip)) where 278instance Serialize (Response KMessageOf (NodeFound KMessageOf ip)) where
302 get = do 279 get = do
303 count <- get :: Get Word8 280 count <- get :: Get Word8
304 nodes <- sequence $ replicate (fromIntegral count) get 281 nodes <- sequence $ replicate (fromIntegral count) get
@@ -314,7 +291,7 @@ instance Serialize (Response (NodeFound ip)) where
314 291
315-- | \"q\" == \"find_node\" 292-- | \"q\" == \"find_node\"
316instance (Address ip, Typeable ip) 293instance (Address ip, Typeable ip)
317 => KRPC (Query (FindNode ip)) (Response (NodeFound ip)) where 294 => KRPC (Query KMessageOf (FindNode KMessageOf ip)) (Response KMessageOf (NodeFound KMessageOf ip)) where
318#ifdef VERSION_bencoding 295#ifdef VERSION_bencoding
319 method = "find_node" 296 method = "find_node"
320#else 297#else
@@ -383,7 +360,7 @@ instance (Typeable ip, Serialize ip) => BEncode (GotPeers ip) where
383 360
384-- | \"q" = \"get_peers\" 361-- | \"q" = \"get_peers\"
385instance (Typeable ip, Serialize ip) => 362instance (Typeable ip, Serialize ip) =>
386 KRPC (Query (GetPeers ip)) (Response (GotPeers ip)) where 363 KRPC (Query KMessageOf (GetPeers ip)) (Response KMessageOf (GotPeers ip)) where
387 method = "get_peers" 364 method = "get_peers"
388 365
389{----------------------------------------------------------------------- 366{-----------------------------------------------------------------------
@@ -455,7 +432,7 @@ instance BEncode Announced where
455 fromBEncode _ = pure Announced 432 fromBEncode _ = pure Announced
456 433
457-- | \"q" = \"announce\" 434-- | \"q" = \"announce\"
458instance KRPC (Query Announce) (Response Announced) where 435instance KRPC (Query KMessageOf Announce) (Response KMessageOf Announced) where
459 method = "announce_peer" 436 method = "announce_peer"
460 437
461-- endif VERSION_bencoding 438-- endif VERSION_bencoding
@@ -495,3 +472,25 @@ bep42 addr (NodeId r)
495 where msk | BS.length ip == 4 = ip4mask 472 where msk | BS.length ip == 4 = ip4mask
496 | otherwise = ip6mask 473 | otherwise = ip6mask
497 474
475instance Kademlia KMessageOf where
476 data Ping KMessageOf = Ping
477 deriving (Show, Eq, Typeable)
478 newtype FindNode KMessageOf ip = FindNode (NodeId KMessageOf)
479 deriving (Show, Eq, Typeable)
480 newtype NodeFound KMessageOf ip = NodeFound [NodeInfo KMessageOf ip ()]
481 deriving (Show, Eq, Typeable)
482 pingMessage _ = Ping
483 pongMessage _ = Ping
484 findNodeMessage _ k = FindNode (toNodeId k)
485 foundNodes (NodeFound ns) = ns
486
487 dhtAdjustID _ fallback ip0 arrival
488 = fromMaybe fallback $ do
489 ip <- fromSockAddr ip0 -- :: Maybe ip
490 let _ = ip `asTypeOf` nodeAddr (foreignNode arrival)
491 listToMaybe
492 $ rank id (nodeId $ foreignNode arrival)
493 $ bep42s ip fallback
494
495 namePing _ = "ping"
496 nameFindNodes _ = "find-nodes"
diff --git a/src/Network/DHT/Types.hs b/src/Network/DHT/Types.hs
index ed2dc175..79f9e1d3 100644
--- a/src/Network/DHT/Types.hs
+++ b/src/Network/DHT/Types.hs
@@ -1,8 +1,17 @@
1module Network.DHT.Types where 1{-# LANGUAGE TypeFamilies #-}
2{-# LANGUAGE ScopedTypeVariables #-}
3{-# LANGUAGE StandaloneDeriving #-}
4{-# LANGUAGE FlexibleContexts #-}
5module Network.DHT.Types
6 ( module Network.DHT.Types
7 , TableKey
8 , toNodeId
9 ) where
2 10
3import Network.Socket (SockAddr) 11import Network.Socket (SockAddr)
4import Network.DatagramServer.Types 12import Network.DatagramServer.Types
5import Network.DHT.Routing 13import Network.DHT.Routing
14import Data.Typeable
6 15
7data TableParameters msg ip u = TableParameters 16data TableParameters msg ip u = TableParameters
8 { maxBuckets :: Int 17 { maxBuckets :: Int
@@ -11,3 +20,43 @@ data TableParameters msg ip u = TableParameters
11 , logMessage :: Char -> String -> IO () 20 , logMessage :: Char -> String -> IO ()
12 , adjustID :: SockAddr -> Event msg ip u -> NodeId msg 21 , adjustID :: SockAddr -> Event msg ip u -> NodeId msg
13 } 22 }
23
24-- | All queries have an \"id\" key and value containing the node ID
25-- of the querying node.
26data Query dht a = Query
27 { queringNodeId :: NodeId dht -- ^ node id of /quering/ node;
28 , queryIsReadOnly :: Bool -- ^ node is read-only as per BEP 43
29 , queryParams :: a -- ^ query parameters.
30 } deriving (Typeable)
31
32deriving instance (Eq (NodeId dht), Eq a ) => Eq (Query dht a)
33deriving instance (Show (NodeId dht), Show a ) => Show (Query dht a)
34
35-- | All responses have an \"id\" key and value containing the node ID
36-- of the responding node.
37data Response dht a = Response
38 { queredNodeId :: NodeId dht -- ^ node id of /quered/ node;
39 , responseVals :: a -- ^ query result.
40 } deriving (Typeable)
41
42deriving instance (Eq (NodeId dht), Eq a ) => Eq (Response dht a)
43deriving instance (Show (NodeId dht), Show a ) => Show (Response dht a)
44
45
46class Kademlia dht where
47 -- | The most basic query is a ping. Ping query is used to check if a
48 -- quered node is still alive.
49 data Ping dht
50 -- | Find node is used to find the contact information for a node
51 -- given its ID.
52 data FindNode dht ip
53 data NodeFound dht ip
54 pingMessage :: Proxy dht -> Ping dht
55 pongMessage :: Proxy dht -> Ping dht
56 findNodeMessage :: TableKey dht k => Proxy dht -> k -> FindNode dht ip
57 foundNodesMessage :: [NodeInfo dht ip ()] -> NodeFound dht ip
58 foundNodes :: NodeFound dht ip -> [NodeInfo dht ip ()]
59 findWho :: FindNode dht ip -> NodeId dht
60 dhtAdjustID :: Address ip => Proxy dht -> NodeId dht -> SockAddr -> Event dht ip u -> NodeId dht
61 namePing :: Proxy dht -> QueryMethod dht
62 nameFindNodes :: Proxy dht -> QueryMethod dht
diff --git a/src/Network/DatagramServer.hs b/src/Network/DatagramServer.hs
index 2140e2cd..91efa443 100644
--- a/src/Network/DatagramServer.hs
+++ b/src/Network/DatagramServer.hs
@@ -190,13 +190,13 @@ type Handler h msg v = (QueryMethod msg, HandlerBody h msg v)
190 190
191-- | Keep track pending queries made by /this/ node and handle queries 191-- | Keep track pending queries made by /this/ node and handle queries
192-- made by /remote/ nodes. 192-- made by /remote/ nodes.
193data Manager h raw msg = Manager 193data Manager raw msg = Manager
194 { sock :: !Socket 194 { sock :: !Socket
195 , options :: !Options 195 , options :: !Options
196 , listenerThread :: !(MVar ThreadId) 196 , listenerThread :: !(MVar ThreadId)
197 , transactionCounter :: {-# UNPACK #-} !TransactionCounter 197 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
198 , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) 198 , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw)
199 , handlers :: [Handler h msg raw] -- TODO delete this, it's not used 199 -- , handlers :: [Handler h msg raw] -- TODO delete this, it's not used
200 , logMsg :: Char -> String -> T.Text -> IO () 200 , logMsg :: Char -> String -> T.Text -> IO ()
201 } 201 }
202 202
@@ -212,14 +212,14 @@ newManager :: Options -- ^ various protocol options;
212 -> (Char -> String -> T.Text -> IO ()) -- ^ loging function 212 -> (Char -> String -> T.Text -> IO ()) -- ^ loging function
213 -> SockAddr -- ^ address to listen on; 213 -> SockAddr -- ^ address to listen on;
214 -> [Handler h msg raw] -- ^ handlers to run on incoming queries. 214 -> [Handler h msg raw] -- ^ handlers to run on incoming queries.
215 -> IO (Manager h raw msg) -- ^ new rpc manager. 215 -> IO (Manager raw msg) -- ^ new rpc manager.
216newManager opts @ Options {..} logmsg servAddr handlers = do 216newManager opts @ Options {..} logmsg servAddr handlers = do
217 validateOptions opts 217 validateOptions opts
218 sock <- bindServ 218 sock <- bindServ
219 tref <- newEmptyMVar 219 tref <- newEmptyMVar
220 tran <- newIORef optSeedTransaction 220 tran <- newIORef optSeedTransaction
221 calls <- newIORef M.empty 221 calls <- newIORef M.empty
222 return $ Manager sock opts tref tran calls handlers logmsg 222 return $ Manager sock opts tref tran calls logmsg
223 where 223 where
224 bindServ = do 224 bindServ = do
225 let family = sockAddrFamily servAddr 225 let family = sockAddrFamily servAddr
@@ -230,7 +230,7 @@ newManager opts @ Options {..} logmsg servAddr handlers = do
230 return sock 230 return sock
231 231
232-- | Unblock all pending calls and close socket. 232-- | Unblock all pending calls and close socket.
233closeManager :: Manager m raw msg -> IO () 233closeManager :: Manager raw msg -> IO ()
234closeManager Manager {..} = do 234closeManager Manager {..} = do
235 maybe (return ()) killThread =<< tryTakeMVar listenerThread 235 maybe (return ()) killThread =<< tryTakeMVar listenerThread
236 -- TODO unblock calls 236 -- TODO unblock calls
@@ -238,7 +238,7 @@ closeManager Manager {..} = do
238 238
239-- | Check if the manager is still active. Manager becomes active 239-- | Check if the manager is still active. Manager becomes active
240-- until 'closeManager' called. 240-- until 'closeManager' called.
241isActive :: Manager m raw msg -> IO Bool 241isActive :: Manager raw msg -> IO Bool
242isActive Manager {..} = liftIO $ isBound sock 242isActive Manager {..} = liftIO $ isBound sock
243{-# INLINE isActive #-} 243{-# INLINE isActive #-}
244 244
@@ -246,7 +246,7 @@ isActive Manager {..} = liftIO $ isBound sock
246-- | Normally you should use Control.Monad.Trans.Resource.allocate 246-- | Normally you should use Control.Monad.Trans.Resource.allocate
247-- function. 247-- function.
248withManager :: Options -> SockAddr -> [Handler h msg raw] 248withManager :: Options -> SockAddr -> [Handler h msg raw]
249 -> (Manager h raw msg -> IO a) -> IO a 249 -> (Manager raw msg -> IO a) -> IO a
250withManager opts addr hs = bracket (newManager opts addr hs) closeManager 250withManager opts addr hs = bracket (newManager opts addr hs) closeManager
251#endif 251#endif
252 252
@@ -289,7 +289,7 @@ genTransactionId ref = do
289 uniqueTransactionId cur 289 uniqueTransactionId cur
290 290
291-- | How many times 'query' call have been performed. 291-- | How many times 'query' call have been performed.
292getQueryCount :: Manager h raw msg -> IO Int 292getQueryCount :: Manager raw msg -> IO Int
293getQueryCount mgr@Manager{..} = do 293getQueryCount mgr@Manager{..} = do
294 curTrans <- readIORef transactionCounter 294 curTrans <- readIORef transactionCounter
295 return $ curTrans - optSeedTransaction options 295 return $ curTrans - optSeedTransaction options
@@ -320,21 +320,21 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q
320-- This function should throw 'QueryFailure' exception if quered node 320-- This function should throw 'QueryFailure' exception if quered node
321-- respond with @error@ message or the query timeout expires. 321-- respond with @error@ message or the query timeout expires.
322-- 322--
323query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO b 323query :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO b
324query mgr meth addr params = queryK mgr meth addr params (\_ x _ -> x) 324query mgr meth addr params = queryK mgr meth addr params (\_ x _ -> x)
325 325
326-- | Like 'query' but possibly returns your externally routable IP address. 326-- | Like 'query' but possibly returns your externally routable IP address.
327query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP) 327query' :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, Maybe ReflectedIP)
328query' mgr meth addr params = queryK mgr meth addr params (const (,)) 328query' mgr meth addr params = queryK mgr meth addr params (const (,))
329 329
330-- | Enqueue a query, but give us the complete BEncoded content sent by the 330-- | Enqueue a query, but give us the complete BEncoded content sent by the
331-- remote Node. This is useful for handling extensions that this library does 331-- remote Node. This is useful for handling extensions that this library does
332-- not otherwise support. 332-- not otherwise support.
333queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw) 333queryRaw :: forall h a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => Manager raw msg -> QueryMethod msg -> SockAddr -> a -> IO (b, raw)
334queryRaw mgr meth addr params = queryK mgr meth addr params (\raw x _ -> (x,raw)) 334queryRaw mgr meth addr params = queryK mgr meth addr params (\raw x _ -> (x,raw))
335 335
336queryK :: forall h a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) => 336queryK :: forall h a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, WireFormat raw msg) =>
337 Manager h raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x 337 Manager raw msg -> QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> IO x
338queryK mgr@Manager{..} meth addr params kont = do 338queryK mgr@Manager{..} meth addr params kont = do
339 tid <- liftIO $ genTransactionId transactionCounter 339 tid <- liftIO $ genTransactionId transactionCounter
340 -- let queryMethod = method :: Method a b 340 -- let queryMethod = method :: Method a b
@@ -424,7 +424,7 @@ handler name body = (name, wrapper)
424runHandler :: ( Envelope msg 424runHandler :: ( Envelope msg
425 , Show (QueryMethod msg) 425 , Show (QueryMethod msg)
426 , Serialize (TransactionID msg)) 426 , Serialize (TransactionID msg))
427 => Manager IO raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) 427 => Manager raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw)
428runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks 428runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks
429 where 429 where
430 signature = querySignature meth (envelopeTransaction m) addr 430 signature = querySignature meth (envelopeTransaction m) addr
@@ -462,7 +462,7 @@ dispatchHandler :: ( Eq (QueryMethod msg)
462 , Show (QueryMethod msg) 462 , Show (QueryMethod msg)
463 , Serialize (TransactionID msg) 463 , Serialize (TransactionID msg)
464 , Envelope msg 464 , Envelope msg
465 ) => Manager IO raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) 465 ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw)
466dispatchHandler mgr handlers meth q addr = do 466dispatchHandler mgr handlers meth q addr = do
467 case L.lookup meth handlers of 467 case L.lookup meth handlers of
468 Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) 468 Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q)
@@ -483,7 +483,7 @@ handleQuery :: ( WireFormat raw msg
483 , Eq (QueryMethod msg) 483 , Eq (QueryMethod msg)
484 , Show (QueryMethod msg) 484 , Show (QueryMethod msg)
485 , Serialize (TransactionID msg) 485 , Serialize (TransactionID msg)
486 ) => Manager IO raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () 486 ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO ()
487handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do 487handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do
488 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" 488 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
489 res <- dispatchHandler mgr hs meth q addr 489 res <- dispatchHandler mgr hs meth q addr
@@ -501,7 +501,7 @@ handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do
501 501
502handleResponse :: ( Ord (TransactionID msg) 502handleResponse :: ( Ord (TransactionID msg)
503 , Envelope msg 503 , Envelope msg
504 ) => Manager IO raw msg -> raw -> KResult msg raw -> SockAddr -> IO () 504 ) => Manager raw msg -> raw -> KResult msg raw -> SockAddr -> IO ()
505handleResponse mgr@Manager{..} raw result addr = do 505handleResponse mgr@Manager{..} raw result addr = do
506 liftIO $ do 506 liftIO $ do
507 let resultId = either errorId envelopeTransaction result 507 let resultId = either errorId envelopeTransaction result
@@ -520,7 +520,7 @@ listener :: forall raw msg.
520 , Eq (QueryMethod msg) 520 , Eq (QueryMethod msg)
521 , Show (QueryMethod msg) 521 , Show (QueryMethod msg)
522 , Serialize (TransactionID msg) 522 , Serialize (TransactionID msg)
523 ) => Manager IO raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () 523 ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO ()
524listener mgr@Manager{..} hs p = do 524listener mgr@Manager{..} hs p = do
525 fix $ \again -> do 525 fix $ \again -> do
526 let ctx = error "TODO TOX ToxCipherContext or () for Mainline" 526 let ctx = error "TODO TOX ToxCipherContext or () for Mainline"
@@ -551,7 +551,7 @@ listen :: ( WireFormat raw msg
551 , Eq (QueryMethod msg) 551 , Eq (QueryMethod msg)
552 , Show (QueryMethod msg) 552 , Show (QueryMethod msg)
553 , Serialize (TransactionID msg) 553 , Serialize (TransactionID msg)
554 ) => Manager IO raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () 554 ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO ()
555listen mgr@Manager{..} hs p = do 555listen mgr@Manager{..} hs p = do
556 tid <- fork $ do 556 tid <- fork $ do
557 myThreadId >>= liftIO . flip labelThread "KRPC.listen" 557 myThreadId >>= liftIO . flip labelThread "KRPC.listen"