summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-02-21 22:24:47 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-02-21 22:24:47 +0400
commit7be32f085a25f847b6eb07a085b4db8b3d3f6da3 (patch)
tree325e260709885b15dc153298a92b31084eebfb10 /src/Network
parent3c66c7129a9194d3c8bb3b65595d760d431ee5a2 (diff)
Add UDP tracker listener
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/BitTorrent/Tracker/RPC/UDP.hs138
1 files changed, 97 insertions, 41 deletions
diff --git a/src/Network/BitTorrent/Tracker/RPC/UDP.hs b/src/Network/BitTorrent/Tracker/RPC/UDP.hs
index bc4f9dd0..499b6d37 100644
--- a/src/Network/BitTorrent/Tracker/RPC/UDP.hs
+++ b/src/Network/BitTorrent/Tracker/RPC/UDP.hs
@@ -29,9 +29,9 @@ module Network.BitTorrent.Tracker.RPC.UDP
29 ) where 29 ) where
30 30
31import Control.Applicative 31import Control.Applicative
32import Control.Concurrent
32import Control.Exception 33import Control.Exception
33import Control.Monad 34import Control.Monad
34import Data.ByteString (ByteString)
35import Data.Default 35import Data.Default
36import Data.IORef 36import Data.IORef
37import Data.List as L 37import Data.List as L
@@ -46,7 +46,7 @@ import Data.Time.Clock.POSIX
46import Data.Typeable 46import Data.Typeable
47import Data.Word 47import Data.Word
48import Text.Read (readMaybe) 48import Text.Read (readMaybe)
49import Network.Socket hiding (Connected, connect) 49import Network.Socket hiding (Connected, connect, listen)
50import Network.Socket.ByteString as BS 50import Network.Socket.ByteString as BS
51import Network.URI 51import Network.URI
52import System.Entropy 52import System.Entropy
@@ -56,7 +56,7 @@ import Numeric
56import Network.BitTorrent.Tracker.Message 56import Network.BitTorrent.Tracker.Message
57 57
58{----------------------------------------------------------------------- 58{-----------------------------------------------------------------------
59-- Manager 59-- Options
60-----------------------------------------------------------------------} 60-----------------------------------------------------------------------}
61 61
62sec :: Int 62sec :: Int
@@ -89,21 +89,54 @@ instance Default Options where
89 , optMaxTimeout = defMaxTimeout 89 , optMaxTimeout = defMaxTimeout
90 } 90 }
91 91
92{-----------------------------------------------------------------------
93-- Manager state
94-----------------------------------------------------------------------}
95
96type PendingTransactions = Map TransactionId (MVar Response)
97
92data Manager = Manager 98data Manager = Manager
93 { options :: !Options 99 { options :: !Options
94 , sock :: !Socket 100 , sock :: !Socket
95-- , dnsCache :: !(IORef (Map URI SockAddr)) 101-- , dnsCache :: !(IORef (Map URI SockAddr))
96 , connectionCache :: !(IORef (Map SockAddr Connection)) 102 , connectionCache :: !(IORef (Map SockAddr Connection))
97-- , pendingResps :: !(IORef (Map Connection [MessageId])) 103 , pendingResps :: !(MVar (Map SockAddr PendingTransactions))
104 , listenerThread :: !(MVar ThreadId)
98 } 105 }
99 106
100newManager :: Options -> IO Manager 107initManager :: Options -> IO Manager
101newManager opts = Manager opts 108initManager opts = Manager opts
102 <$> socket AF_INET Datagram defaultProtocol 109 <$> socket AF_INET Datagram defaultProtocol
103 <*> newIORef M.empty 110 <*> newIORef M.empty
111 <*> newMVar M.empty
112 <*> newEmptyMVar
113
114resetState :: Manager -> IO ()
115resetState Manager {..} = do
116 writeIORef connectionCache err
117 _ <-swapMVar pendingResps err
118 m <- tryTakeMVar listenerThread
119 case m of
120 Nothing -> return () -- thread killed by 'closeManager'
121 Just _ -> return () -- thread killed by exception from 'listen'
122 return ()
123 where
124 err = error "UDP tracker manager closed"
125
126newManager :: Options -> IO Manager
127newManager opts = do
128 mgr <- initManager opts
129 tid <- forkIO (listen mgr `finally` resetState mgr)
130 putMVar (listenerThread mgr) tid
131 return mgr
104 132
105closeManager :: Manager -> IO () 133closeManager :: Manager -> IO ()
106closeManager Manager {..} = close sock 134closeManager Manager {..} = do
135 close sock
136 mtid <- tryTakeMVar listenerThread
137 case mtid of
138 Nothing -> return ()
139 Just tid -> killThread tid
107 140
108withManager :: Options -> (Manager -> IO a) -> IO a 141withManager :: Options -> (Manager -> IO a) -> IO a
109withManager opts = bracket (newManager opts) closeManager 142withManager opts = bracket (newManager opts) closeManager
@@ -122,21 +155,6 @@ data RpcException
122 -- | Tracker exists but not responding for specific number of seconds. 155 -- | Tracker exists but not responding for specific number of seconds.
123 | TrackerNotResponding Int 156 | TrackerNotResponding Int
124 157
125 -- | Source\/destination socket address mismatch.
126 --
127 -- WARNING: This is a BUG and will be fixed!
128 --
129 | UnexpectedSource
130
131 -- | Source\/destination transaction id mismatch.
132 --
133 -- WARNING: This is a BUG and will be fixed!
134 --
135 | TransactionFailed
136
137 -- | Unable to decode tracker response;
138 | ParserFailure String
139
140 -- | Tracker respond with unexpected message type. 158 -- | Tracker respond with unexpected message type.
141 | UnexpectedResponse 159 | UnexpectedResponse
142 { expectedMsg :: String 160 { expectedMsg :: String
@@ -192,10 +210,9 @@ instance Show ConnectionId where
192initialConnectionId :: ConnectionId 210initialConnectionId :: ConnectionId
193initialConnectionId = ConnectionId 0x41727101980 211initialConnectionId = ConnectionId 0x41727101980
194 212
195-- TODO rename
196-- | Transaction Id is used within a UDP RPC. 213-- | Transaction Id is used within a UDP RPC.
197newtype TransactionId = TransactionId Word32 214newtype TransactionId = TransactionId Word32
198 deriving (Eq, Serialize) 215 deriving (Eq, Ord, Serialize)
199 216
200instance Show TransactionId where 217instance Show TransactionId where
201 showsPrec _ (TransactionId tid) = showString "0x" <> showHex tid 218 showsPrec _ (TransactionId tid) = showString "0x" <> showHex tid
@@ -343,27 +360,66 @@ isExpired Connection {..} = do
343 return $ timeDiff > connectionLifetime 360 return $ timeDiff > connectionLifetime
344 361
345{----------------------------------------------------------------------- 362{-----------------------------------------------------------------------
346-- Basic transaction 363-- Transactions
347-----------------------------------------------------------------------} 364-----------------------------------------------------------------------}
348 365
349call :: Manager -> SockAddr -> ByteString -> IO ByteString 366allocTransaction :: Manager -> SockAddr -> MVar Response -> IO TransactionId
350call Manager {..} addr arg = do 367allocTransaction Manager {..} addr ares = modifyMVar pendingResps bindId
351 BS.sendAllTo sock arg addr 368 where
352 (res, addr') <- BS.recvFrom sock (optMaxPacketSize options) 369 bindId m = do
353 unless (addr' == addr) $ do 370 tid <- genTransactionId
354 throwIO $ UnexpectedSource 371 case M.lookup tid =<< M.lookup addr m of
355 return res 372 Just _ -> bindId m -- already used, retry
373 Nothing -> return (M.alter (insertId tid) addr m, tid)
374
375 insertId tid Nothing = Just (M.singleton tid ares)
376 insertId tid (Just m) = Just (M.insert tid ares m)
377
378commitTransaction :: Manager -> SockAddr -> TransactionId -> Response -> IO ()
379commitTransaction Manager {..} addr tid resp =
380 modifyMVar_ pendingResps $ \ m -> do
381 case M.lookup tid =<< M.lookup addr m of
382 Nothing -> return m -- tracker responded after 'cancelTransaction' fired
383 Just ares -> do
384 putMVar ares resp
385 return $ M.update deleteId addr m
386 where
387 deleteId m
388 | M.null m' = Nothing
389 | otherwise = Just m'
390 where
391 m' = M.delete tid m
392
393cancelTransaction :: Manager -> SockAddr -> TransactionId -> IO ()
394cancelTransaction Manager {..} addr tid =
395 modifyMVar_ pendingResps $ \m ->
396 return $ M.update deleteId addr m
397 where
398 deleteId m
399 | M.null m' = Nothing
400 | otherwise = Just m'
401 where
402 m' = M.delete tid m
403
404listen :: Manager -> IO ()
405listen mgr @ Manager {..} = do
406 forever $ do
407 (bs, addr) <- BS.recvFrom sock (optMaxPacketSize options)
408 case decode bs of
409 Left _ -> return ()
410 Right (TransactionR {..}) -> commitTransaction mgr addr transIdR response
356 411
357transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response 412transaction :: Manager -> SockAddr -> Connection -> Request -> IO Response
358transaction m addr conn request = do 413transaction mgr @ Manager {..} addr conn request = do
359 tid <- genTransactionId 414 ares <- newEmptyMVar
360 let trans = TransactionQ (connectionId conn) tid request 415 tid <- allocTransaction mgr addr ares
361 res <- call m addr (encode trans) 416 performTransaction tid ares
362 case decode res of 417 `onException` cancelTransaction mgr addr tid
363 Right (TransactionR {..}) 418 where
364 | tid == transIdR -> return response 419 performTransaction tid ares = do
365 | otherwise -> throwIO $ TransactionFailed 420 let trans = TransactionQ (connectionId conn) tid request
366 Left msg -> throwIO $ ParserFailure msg 421 BS.sendAllTo sock (encode trans) addr
422 takeMVar ares
367 423
368{----------------------------------------------------------------------- 424{-----------------------------------------------------------------------
369-- Connection cache 425-- Connection cache