From 7627d4dda31ddbececa4c80b340026da9e06c80e Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 25 Feb 2014 14:52:40 +0400 Subject: Implement getConnectionConfig --- src/Network/BitTorrent/Exchange/Session.hs | 79 ++++++++++++++++----------- src/Network/BitTorrent/Exchange/Wire.hs | 35 ++++++------ tests/Network/BitTorrent/Exchange/WireSpec.hs | 4 +- 3 files changed, 66 insertions(+), 52 deletions(-) diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 1537efe1..8cbce4e3 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -77,17 +77,23 @@ cache :: BEncode a => a -> Cached a cache s = Cached s (BE.encode s) data Session = Session - { tpeerId :: PeerId - , infohash :: InfoHash - , metadata :: MVar Metadata.Status - , storage :: Storage - , status :: MVar SessionStatus - , unchoked :: [PeerAddr IP] - , pendingConnections :: TVar (Set (PeerAddr IP)) - , establishedConnections :: TVar (Map (PeerAddr IP) (Connection Session)) - , broadcast :: Chan Message - , logger :: LogFun - , infodict :: MVar (Cached InfoDict) + { sessionPeerId :: !(PeerId) + , sessionTopic :: !(InfoHash) + + , metadata :: !(MVar Metadata.Status) + , infodict :: !(MVar (Cached InfoDict)) + + , status :: !(MVar SessionStatus) + , storage :: !(Storage) + + , broadcast :: !(Chan Message) + + , unchoked :: [PeerAddr IP] + , connectionsPrefs :: !ConnectionPrefs + , connectionsPending :: !(TVar (Set (PeerAddr IP))) + , connectionsEstablished :: !(TVar (Map (PeerAddr IP) (Connection Session))) + + , logger :: !(LogFun) } instance Ord IP @@ -101,6 +107,7 @@ newSession :: LogFun -> InfoDict -- ^ torrent info dictionary; -> IO Session -- ^ newSession logFun addr rootPath dict = do + pid <- maybe genPeerId return (peerId addr) pconnVar <- newTVarIO S.empty econnVar <- newTVarIO M.empty store <- openInfoDict ReadWriteEx rootPath dict @@ -108,16 +115,18 @@ newSession logFun addr rootPath dict = do (piPieceLength (idPieceInfo dict)) chan <- newChan return Session - { tpeerId = fromMaybe (error "newSession: impossible") - (peerId addr) - , infohash = idInfoHash dict + { sessionPeerId = pid + , sessionTopic = idInfoHash dict , status = statusVar , storage = store , unchoked = [] - , pendingConnections = pconnVar - , establishedConnections = econnVar - , broadcast = chan - , logger = logFun + , connectionsPrefs = def + , connectionsPending = pconnVar + , connectionsEstablished = econnVar + , broadcast = chan + , logger = logFun + , metadata = undefined + , infodict = undefined } closeSession :: Session -> IO () @@ -152,12 +161,12 @@ logEvent = logInfoN pendingConnection :: PeerAddr IP -> Session -> IO Bool pendingConnection addr Session {..} = atomically $ do - pSet <- readTVar pendingConnections - eSet <- readTVar establishedConnections + pSet <- readTVar connectionsPending + eSet <- readTVar connectionsEstablished if (addr `S.member` pSet) || (addr `M.member` eSet) then return False else do - modifyTVar' pendingConnections (S.insert addr) + modifyTVar' connectionsPending (S.insert addr) return True establishedConnection :: Connected Session () @@ -172,8 +181,8 @@ finishedConnection = return () -- | There are no state for this connection, remove it. closedConnection :: PeerAddr IP -> Session -> IO () closedConnection addr Session {..} = atomically $ do - modifyTVar pendingConnections $ S.delete addr - modifyTVar establishedConnections $ M.delete addr + modifyTVar connectionsPending $ S.delete addr + modifyTVar connectionsEstablished $ M.delete addr {----------------------------------------------------------------------- -- Connections @@ -190,16 +199,20 @@ mainWire = do lift finishedConnection getConnectionConfig :: Session -> IO (ConnectionConfig Session) -getConnectionConfig s @ Session {..} = undefined --ConnectionConfig --- let caps = def --- let ecaps = def --- let hs = Handshake def caps infohash tpeerId --- chan <- dupChan broadcast - --- { cfgPrefs = undefined --- , cfgSession = ConnectionSession undefined undefined s --- , cfgWire = mainWire --- } +getConnectionConfig s @ Session {..} = do + chan <- dupChan broadcast + let sessionLink = SessionLink { + linkTopic = sessionTopic + , linkPeerId = sessionPeerId + , linkMetadataSize = Nothing + , linkOutputChan = Just chan + , linkSession = s + } + return ConnectionConfig + { cfgPrefs = connectionsPrefs + , cfgSession = sessionLink + , cfgWire = mainWire + } insert :: PeerAddr IP -> Session -> IO () insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 4ddade66..53c9afb2 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -50,9 +50,9 @@ module Network.BitTorrent.Exchange.Wire , connStats -- * Setup - , ConnectionPrefs (..) - , ConnectionSession (..) - , ConnectionConfig (..) + , ConnectionPrefs (..) + , SessionLink (..) + , ConnectionConfig (..) -- ** Initiate , connectWire @@ -753,17 +753,18 @@ instance Default ConnectionPrefs where normalize :: ConnectionPrefs -> ConnectionPrefs normalize = undefined -data ConnectionSession s = ConnectionSession - { sessionTopic :: !(InfoHash) - , sessionPeerId :: !(PeerId) - , metadataSize :: !(Maybe Int) - , outputChan :: !(Maybe (Chan Message)) - , connectionSession :: !(s) +-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'. +data SessionLink s = SessionLink + { linkTopic :: !(InfoHash) + , linkPeerId :: !(PeerId) + , linkMetadataSize :: !(Maybe Int) + , linkOutputChan :: !(Maybe (Chan Message)) + , linkSession :: !(s) } data ConnectionConfig s = ConnectionConfig { cfgPrefs :: !(ConnectionPrefs) - , cfgSession :: !(ConnectionSession s) + , cfgSession :: !(SessionLink s) , cfgWire :: !(Wire s ()) } @@ -771,8 +772,8 @@ configHandshake :: ConnectionConfig s -> Handshake configHandshake ConnectionConfig {..} = Handshake { hsProtocol = prefProtocol cfgPrefs , hsReserved = prefCaps cfgPrefs - , hsInfoHash = sessionTopic cfgSession - , hsPeerId = sessionPeerId cfgSession + , hsInfoHash = linkTopic cfgSession + , hsPeerId = linkPeerId cfgSession } {----------------------------------------------------------------------- @@ -841,13 +842,13 @@ afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair afterHandshaking initiator addr sock hpair @ (HandshakePair hs hs') (ConnectionConfig - { cfgPrefs = ConnectionPrefs {..} - , cfgSession = ConnectionSession {..} + { cfgPrefs = ConnectionPrefs {..} + , cfgSession = SessionLink {..} , cfgWire = wire }) = do let caps = hsReserved hs <> hsReserved hs' cstate <- newIORef def { _connStats = establishedStats hpair } - chan <- maybe newChan return outputChan + chan <- maybe newChan return linkOutputChan let conn = Connection { connInitiatedBy = initiator , connRemoteAddr = addr @@ -858,7 +859,7 @@ afterHandshaking initiator addr sock , connThisPeerId = hsPeerId hs , connOptions = def , connState = cstate - , connSession = connectionSession + , connSession = linkSession , connChan = chan } @@ -897,7 +898,7 @@ connectWire addr cfg = do acceptWire :: PendingConnection -> ConnectionConfig s -> IO () acceptWire pc @ PendingConnection {..} cfg = do bracket (return pendingSock) close $ \ _ -> do - unless (sessionTopic (cfgSession cfg) == pendingTopic) $ do + unless (linkTopic (cfgSession cfg) == pendingTopic) $ do throwIO (ProtocolError (UnexpectedTopic pendingTopic)) let hs = configHandshake cfg diff --git a/tests/Network/BitTorrent/Exchange/WireSpec.hs b/tests/Network/BitTorrent/Exchange/WireSpec.hs index 550c20f9..293e1bd6 100644 --- a/tests/Network/BitTorrent/Exchange/WireSpec.hs +++ b/tests/Network/BitTorrent/Exchange/WireSpec.hs @@ -16,8 +16,8 @@ import Network.BitTorrent.Exchange.Wire import Config import Network.BitTorrent.Exchange.MessageSpec () -nullSession :: InfoHash -> PeerId -> ConnectionSession () -nullSession ih pid = ConnectionSession ih pid Nothing Nothing () +nullSession :: InfoHash -> PeerId -> SessionLink () +nullSession ih pid = SessionLink ih pid Nothing Nothing () instance Arbitrary Options where arbitrary = return def -- cgit v1.2.3