summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-02-08 07:19:31 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-02-08 07:19:31 +0400
commit7edacaaedd432c71169bbd59c9c0948e9a83da26 (patch)
tree1ca88d119c3575c502e184bfcc15572ee6a406f9 /src/Network/BitTorrent
parentf1f28f1a128caa3df5cdab2eb4c22ec07633af06 (diff)
Add multitracker session
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r--src/Network/BitTorrent/Tracker/Cache.hs164
-rw-r--r--src/Network/BitTorrent/Tracker/Session.hs206
2 files changed, 370 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Tracker/Cache.hs b/src/Network/BitTorrent/Tracker/Cache.hs
new file mode 100644
index 00000000..28a4adcb
--- /dev/null
+++ b/src/Network/BitTorrent/Tracker/Cache.hs
@@ -0,0 +1,164 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2014
3-- License : BSD
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Cached data for tracker responses.
9--
10module Network.BitTorrent.Tracker.Cache
11 ( -- * Cache
12 Cached
13 , lastUpdated
14 , updateInterval
15 , minUpdateInterval
16
17 -- * Construction
18 , newCached
19 , newCached_
20
21 -- * Query
22 , isAlive
23 , isStalled
24 , isExpired
25 , canUpdate
26 , shouldUpdate
27
28 -- * Cached data
29 , tryTakeData
30 , takeData
31 ) where
32
33import Control.Applicative
34import Data.Monoid
35import Data.Default
36import Data.Time
37import Data.Time.Clock.POSIX
38
39
40data Cached a = Cached
41 { -- | Time of resource creation.
42 lastUpdated :: !POSIXTime
43
44 -- | Minimum invalidation timeout.
45 , minUpdateInterval :: !NominalDiffTime
46
47 -- | Resource lifetime.
48 , updateInterval :: !NominalDiffTime
49
50 -- | Resource data.
51 , cachedData :: a
52 } deriving (Show, Eq)
53
54-- INVARIANT: minUpdateInterval <= updateInterval
55
56instance Default (Cached a) where
57 def = mempty
58
59instance Functor Cached where
60 fmap f (Cached t i m a) = Cached t i m (f a)
61
62posixEpoch :: NominalDiffTime
63posixEpoch = 1000000000000000000000000000000000000000000000000000000
64
65instance Applicative Cached where
66 pure = Cached 0 posixEpoch posixEpoch
67 f <*> c = Cached
68 { lastUpdated = undefined
69 , minUpdateInterval = undefined
70 , updateInterval = undefined
71 , cachedData = cachedData f (cachedData c)
72 }
73
74instance Alternative Cached where
75 empty = mempty
76 (<|>) = error "cached alternative instance: not implemented"
77
78instance Monad Cached where
79 return = pure
80 Cached {..} >>= f = Cached
81 { lastUpdated = undefined
82 , updateInterval = undefined
83 , minUpdateInterval = undefined
84 , cachedData = undefined
85 }
86
87instance Monoid (Cached a) where
88 mempty = Cached
89 { lastUpdated = 0
90 , minUpdateInterval = 0
91 , updateInterval = 0
92 , cachedData = error "cached mempty: impossible happen"
93 }
94
95 mappend a b
96 | expirationTime a > expirationTime b = a
97 | otherwise = b
98
99normalize :: NominalDiffTime -> NominalDiffTime
100 -> (NominalDiffTime, NominalDiffTime)
101normalize a b
102 | a < b = (a, b)
103 | otherwise = (b, a)
104{-# INLINE normalize #-}
105
106newCached :: NominalDiffTime -> NominalDiffTime -> a -> IO (Cached a)
107newCached minInterval interval x = do
108 t <- getPOSIXTime
109 let (mui, ui) = normalize minInterval interval
110 return Cached
111 { lastUpdated = t
112 , minUpdateInterval = mui
113 , updateInterval = ui
114 , cachedData = x
115 }
116
117newCached_ :: NominalDiffTime -> a -> IO (Cached a)
118newCached_ interval x = newCached interval interval x
119{-# INLINE newCached_ #-}
120
121expirationTime :: Cached a -> POSIXTime
122expirationTime Cached {..} = undefined
123
124isAlive :: Cached a -> IO Bool
125isAlive Cached {..} = do
126 currentTime <- getPOSIXTime
127 return $ lastUpdated + updateInterval > currentTime
128
129isExpired :: Cached a -> IO Bool
130isExpired Cached {..} = undefined
131
132isStalled :: Cached a -> IO Bool
133isStalled Cached {..} = undefined
134
135canUpdate :: Cached a -> IO (Maybe NominalDiffTime)
136canUpdate = undefined --isStaled
137
138shouldUpdate :: Cached a -> IO (Maybe NominalDiffTime)
139shouldUpdate = undefined -- isExpired
140
141tryTakeData :: Cached a -> IO (Maybe a)
142tryTakeData c = do
143 alive <- isAlive c
144 return $ if alive then Just (cachedData c) else Nothing
145
146invalidateData :: Cached a -> IO a -> IO (Cached a)
147invalidateData Cached {..} action = do
148 t <- getPOSIXTime
149 x <- action
150 return Cached
151 { lastUpdated = t
152 , updateInterval = updateInterval
153 , minUpdateInterval = minUpdateInterval
154 , cachedData = x
155 }
156
157takeData :: Cached a -> IO a -> IO a
158takeData c action = do
159 mdata <- tryTakeData c
160 case mdata of
161 Just a -> return a
162 Nothing -> do
163 c' <- invalidateData c action
164 takeData c' action
diff --git a/src/Network/BitTorrent/Tracker/Session.hs b/src/Network/BitTorrent/Tracker/Session.hs
new file mode 100644
index 00000000..7be16fd6
--- /dev/null
+++ b/src/Network/BitTorrent/Tracker/Session.hs
@@ -0,0 +1,206 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2014
3-- License : BSD
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Multitracker sessions.
9--
10module Network.BitTorrent.Tracker.Session
11 ( -- * Session
12 Session
13 , newSession
14 , closeSession
15
16 -- * Events
17 , Event (..)
18 , notify
19
20 -- * Query
21 , askPeers
22 ) where
23
24import Control.Applicative
25import Control.Concurrent
26import Control.Concurrent.STM
27import Control.Exception
28import Control.Monad
29import Data.Default
30import Data.Fixed
31import Data.Foldable
32import Data.List as L
33import Data.Maybe
34import Data.IORef
35import Data.Text as T
36import Data.Time
37import Data.Traversable
38import Network
39import Network.URI
40
41import Data.Torrent
42import Data.Torrent.InfoHash
43import Network.BitTorrent.Core
44import Network.BitTorrent.Tracker.Cache
45import Network.BitTorrent.Tracker.List
46import Network.BitTorrent.Tracker.Message
47import Network.BitTorrent.Tracker.RPC as RPC
48
49{-----------------------------------------------------------------------
50-- Tracker entry
51-----------------------------------------------------------------------}
52
53data Scrape = Scrape
54 { leechersCount :: Maybe Int
55 , seedersCount :: Maybe Int
56 } deriving (Show, Eq)
57
58instance Default Scrape where
59 def = Scrape Nothing Nothing
60
61
62data Status
63 = Running
64 | Paused
65 deriving (Show, Eq)
66
67instance Default Status where
68 def = Paused
69
70nextStatus :: Maybe Event -> Status
71nextStatus Nothing = Running
72nextStatus (Just Started ) = Running
73nextStatus (Just Stopped ) = Paused
74nextStatus (Just Completed) = Running
75
76needNotify :: Maybe Event -> Maybe Status -> Bool
77-- we always send _regular_ announce requests (for e.g. to get more peers);
78needNotify Nothing _ = True
79needNotify (Just Started) Nothing = True
80needNotify (Just Stopped) Nothing = False
81needNotify (Just Completed) Nothing = False
82needNotify Nothing (Just Running) = True
83needNotify Nothing (Just Paused ) = True
84
85-- | Do we need to sent this event to a first working tracker or to
86-- the all known good trackers?
87allNotify :: Maybe Event -> Bool
88allNotify Nothing = False
89allNotify (Just Started) = False
90allNotify (Just Stopped) = True
91allNotify (Just Completed) = True
92
93-- | Single tracker session.
94data TrackerEntry = TrackerEntry
95 { -- | Tracker announce URI.
96 trackerURI :: !URI
97
98 -- | Used to notify 'Stopped' and 'Completed' events.
99 , statusSent :: !(Maybe Status)
100
101 -- |
102 , peersCache :: Cached [PeerAddr IP]
103
104 -- | May be used to show brief swarm stats in client GUI.
105 , scrapeCache :: Cached Scrape
106 }
107
108nullEntry :: URI -> TrackerEntry
109nullEntry uri = TrackerEntry uri Nothing def def
110
111{-----------------------------------------------------------------------
112-- Multitracker Session
113-----------------------------------------------------------------------}
114
115-- | Multitracker session.
116data Session = Session
117 { infohash :: !InfoHash
118 , currentStatus :: !(MVar Status)
119 , trackers :: !(MVar (TrackerList TrackerEntry))
120 }
121
122-- Just Started
123newSession :: InfoHash -> TrackerList URI -> IO Session
124newSession ih origUris = do
125 uris <- shuffleTiers origUris
126 status <- newMVar def
127 entries <- newMVar (fmap nullEntry uris)
128 return (Session ih status entries)
129
130-- Just Stopped
131closeSession :: Session -> IO ()
132closeSession _ = return ()
133
134seconds :: Int -> NominalDiffTime
135seconds n = realToFrac (toEnum n :: Uni)
136
137cachePeers :: AnnounceInfo -> IO (Cached [PeerAddr IP])
138cachePeers AnnounceInfo {..} =
139 newCached (seconds respInterval)
140 (seconds (fromMaybe respInterval respMinInterval))
141 (getPeerList respPeers)
142
143cacheScrape :: AnnounceInfo -> IO (Cached Scrape)
144cacheScrape AnnounceInfo {..} =
145 newCached (seconds respInterval)
146 (seconds (fromMaybe respInterval respMinInterval))
147 Scrape
148 { seedersCount = respComplete
149 , leechersCount = respIncomplete
150 }
151
152announceAll :: Manager -> Session -> Maybe Event -> IO ()
153announceAll mgr Session {..} mevent = do
154 modifyMVar_ trackers (traversal announceTo)
155 where
156 traversal
157 | allNotify mevent = traverseAll
158 | otherwise = traverseTiers
159
160 announceTo entry @ TrackerEntry {..}
161 | mevent `needNotify` statusSent = do
162 let q = SAnnounceQuery infohash def Nothing mevent
163 res <- RPC.announce mgr trackerURI q
164 TrackerEntry trackerURI (Just (nextStatus mevent))
165 <$> cachePeers res <*> cacheScrape res
166 | otherwise = return entry
167
168-- TODO send notifications to tracker periodically.
169-- |
170--
171-- This function /may/ block until tracker query proceed.
172notify :: Manager -> Session -> Event -> IO ()
173notify mgr ses event = announceAll mgr ses (Just event)
174
175-- TODO fork thread for reannounces
176-- |
177announce :: Manager -> Session -> IO ()
178announce mgr ses = announceAll mgr ses Nothing
179
180-- TODO run announce if sesion have no peers
181-- | This function /may/ block. Use async if needed.
182askPeers :: Manager -> Session -> IO [PeerAddr IP]
183askPeers mgr ses = do
184 list <- readMVar (trackers ses)
185 L.concat <$> collect (tryTakeData . peersCache) list
186
187collect :: (a -> IO (Maybe b)) -> TrackerList a -> IO [b]
188collect f lst =(catMaybes . toList) <$> traverse f lst
189
190--sourcePeers :: Session -> Source (PeerAddr IP)
191--sourcePeers
192
193{-----------------------------------------------------------------------
194-- State query
195-----------------------------------------------------------------------}
196
197data TrackerInfo = TrackerInfo
198 {
199 }
200
201--instance ToJSON TrackerInfo where
202-- toJSON = undefined
203
204-- |
205--getSessionState :: Session -> IO (TrackerList TrackerInfo)
206--getSessionState = undefined