summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bittorrent.cabal20
-rw-r--r--examples/dht.hs80
-rw-r--r--examples/dhtd.hs192
-rw-r--r--src/Network/SocketLike.hs104
-rw-r--r--src/Network/StreamServer.hs143
5 files changed, 539 insertions, 0 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index 0fddf5d0..fbd17397 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -83,6 +83,8 @@ library
83 Network.BitTorrent.DHT.Routing 83 Network.BitTorrent.DHT.Routing
84 Network.BitTorrent.DHT.Session 84 Network.BitTorrent.DHT.Session
85 Network.BitTorrent.DHT.Token 85 Network.BitTorrent.DHT.Token
86 Network.StreamServer
87 Network.SocketLike
86 other-modules: Paths_bittorrent 88 other-modules: Paths_bittorrent
87 if !flag(dht-only) 89 if !flag(dht-only)
88 exposed-modules: Network.BitTorrent 90 exposed-modules: Network.BitTorrent
@@ -336,6 +338,24 @@ benchmark bench
336 , criterion 338 , criterion
337 ghc-options: -O2 -fforce-recomp 339 ghc-options: -O2 -fforce-recomp
338 340
341executable dht
342 hs-source-dirs: examples
343 main-is: dht.hs
344 default-language: Haskell2010
345 build-depends: base, haskeline, network, bytestring
346
347executable dhtd
348 hs-source-dirs: examples
349 main-is: dhtd.hs
350 default-language: Haskell2010
351 build-depends: base, network, bytestring
352 , mtl
353 , lifted-base
354 , pretty
355 , data-default
356 , monad-logger
357 , bittorrent
358
339-- Utility to work with torrent files. 359-- Utility to work with torrent files.
340executable mktorrent 360executable mktorrent
341 if !flag(examples) 361 if !flag(examples)
diff --git a/examples/dht.hs b/examples/dht.hs
new file mode 100644
index 00000000..feeee9ff
--- /dev/null
+++ b/examples/dht.hs
@@ -0,0 +1,80 @@
1{-# LANGUAGE NondecreasingIndentation #-}
2import Control.Monad
3import Control.Monad.Fix
4import Control.Monad.IO.Class
5import Data.Char
6import Network.Socket as Socket
7import System.Console.Haskeline
8import System.Environment
9import System.Exit
10import System.IO
11import System.IO.Unsafe
12import qualified Data.ByteString as B
13
14-- | Reads one character. If it is not a digit,
15-- then it is discarded and 'Nothing' is returned.
16hReadDigit :: Handle -> IO (Maybe Char)
17hReadDigit h = do c <- hGetChar h
18 return $ guard (isDigit c) >> pure c
19
20-- | Expected input: "nnn:..."
21-- Here we read the digit sequence "nnn" and drop the colon
22-- as it is the first non-digit.
23hReadInt :: Handle -> IO Int
24hReadInt h = do
25 nstr <- fix $ \readDigits ->
26 maybe (return []) -- dropped non-digit character
27 (($ unsafeInterleaveIO readDigits) . fmap . (:))
28 =<< hReadDigit h
29 readIO nstr :: IO Int
30
31
32-- | Read a length prefixed string from a handle.
33-- The format is "nnn:..." where /nnn/ is an ascii-encoded character count
34-- and /.../ is the sequence of characters
35--
36-- Note: The first byte after the count is ignored and discarded.
37readResponse :: Handle -> IO String
38readResponse h = do
39 n <- hReadInt h
40 sequence $ replicate n (hGetChar h)
41
42-- | Send a command to the dhtd daemon and then print the response.
43sendCommand :: Handle -> String -> InputT IO ()
44sendCommand h cmd = do resp <- liftIO $ do hPutStrLn h cmd
45 readResponse h
46 outputStrLn resp
47
48-- | Get one line of input and send it to the daemon, then run the
49-- passed continuation if it wasn't "quit".
50interactiveMode :: Handle -> InputT IO () -> InputT IO ()
51interactiveMode h repl = do
52 minput <- getInputLine "dht> "
53 case minput of
54 Nothing -> return ()
55 Just "quit" -> sendCommand h "quit" >> return ()
56 Just cmd -> sendCommand h cmd >> repl
57
58main :: IO ()
59main = do
60 -- Open the control socket to the daemon.
61 h <- liftIO $ handle (\e -> do hPutStrLn stderr (show (e ::IOError))
62 exitFailure)
63 $ do sock <- socket AF_UNIX Stream defaultProtocol
64 connect sock (SockAddrUnix "dht.sock")
65 socketToHandle sock ReadWriteMode
66
67 -- Haskeline's default looks only at our stdin and not our stdout.
68 -- That's a bad idea because we can take input from the command line.
69 behavior <- do
70 useTerminal <- and <$> mapM hIsTerminalDevice [stdin,stdout]
71 return $ if useTerminal then preferTerm else useFileHandle stdin
72
73 runInputTBehaviorWithPrefs behavior defaultPrefs defaultSettings $ do
74
75 -- A command may be specified on the command line
76 -- or else we enter an interactive shell.
77 args <- dropWhile isSpace . unwords <$> liftIO getArgs
78 case args of
79 (_:_) -> sendCommand h args >> sendCommand h "quit"
80 _ -> fix $ interactiveMode h
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
new file mode 100644
index 00000000..6bf48595
--- /dev/null
+++ b/examples/dhtd.hs
@@ -0,0 +1,192 @@
1{-# LANGUAGE NondecreasingIndentation #-}
2{-# LANGUAGE FlexibleContexts #-}
3{-# LANGUAGE LambdaCase #-}
4{-# LANGUAGE OverloadedStrings #-}
5{-# LANGUAGE ScopedTypeVariables #-}
6{-# LANGUAGE TupleSections #-}
7
8import Control.Arrow;
9import Control.Concurrent
10import Control.Exception.Lifted as Lifted
11import Control.Monad
12import Control.Monad.Logger
13import Control.Monad.Reader
14import Data.Char
15import Data.Default
16import Data.List as L
17import Data.Maybe
18import qualified Data.ByteString as B (ByteString,writeFile,readFile)
19 ; import Data.ByteString (ByteString)
20import System.IO
21import System.IO.Error
22import Text.PrettyPrint.HughesPJClass
23import Text.Printf
24import Control.Monad.Reader.Class
25
26import Network.BitTorrent.Address
27import Network.BitTorrent.DHT
28import qualified Network.BitTorrent.DHT.Routing as R
29import Network.BitTorrent.DHT.Session
30import Network.SocketLike
31import Network.StreamServer
32
33mkNodeAddr :: SockAddr -> NodeAddr IPv4
34mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr)
35 (fromMaybe 0 $ sockAddrPort addr) -- FIXME
36
37btBindAddr :: String -> Bool -> IO (NodeAddr IPv4)
38btBindAddr s b = mkNodeAddr <$> getBindAddress s b
39
40printReport :: MonadIO m => [(String,String)] -> m ()
41printReport kvs = liftIO $ do
42 putStrLn (showReport kvs)
43 hFlush stdout
44
45showReport :: [(String,String)] -> String
46showReport kvs = do
47 let colwidth = maximum $ map (length . fst) kvs
48 (k,v) <- kvs
49 concat [ printf " %-*s" (colwidth+1) k, v, "\n" ]
50
51showEnry :: Show a => (NodeInfo a, t) -> [Char]
52showEnry (n,_) = intercalate " "
53 [ show $ pPrint (nodeId n)
54 , show $ nodeAddr n
55 ]
56
57printTable :: DHT IPv4 ()
58printTable = do
59 t <- showTable
60 liftIO $ do
61 putStrLn t
62 hFlush stdout
63
64showTable :: DHT IPv4 String
65showTable = do
66 nodes <- R.toList <$> getTable
67 return $ showReport
68 $ map (show *** showEnry)
69 $ concat $ zipWith map (map (,) [0::Int ..]) nodes
70
71bootstrapNodes :: IO [NodeAddr IPv4]
72bootstrapNodes = mapMaybe fromAddr
73 <$> mapM resolveHostName defaultBootstrapNodes
74
75-- ExtendedCaps (Map.singleton
76
77noDebugPrints :: LogSource -> LogLevel -> Bool
78noDebugPrints _ = \case LevelDebug -> False
79 _ -> True
80
81noLogging :: LogSource -> LogLevel -> Bool
82noLogging _ _ = False
83
84
85resume :: DHT IPv4 (Maybe B.ByteString)
86resume = do
87 restore_attempt <- liftIO $ tryIOError $ B.readFile "dht-nodes.dat"
88 saved_nodes <-
89 either (const $ do liftIO $ putStrLn "Error reading dht-nodes.dat"
90 return Nothing)
91 (return . Just)
92 restore_attempt
93 return saved_nodes
94
95godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b
96godht f = do
97 a <- btBindAddr "8008" False
98 dht def { optTimeout = 5 } a (const $ const True) $ do
99 me0 <- asks tentativeNodeId
100 printReport [("tentative node-id",show $ pPrint me0)
101 ,("listen-address", show a)
102 ]
103 f a me0
104
105marshalForClient :: String -> String
106marshalForClient s = show (length s) ++ ":" ++ s
107
108hPutClient :: Handle -> String -> IO ()
109hPutClient h s = hPutStr h (marshalForClient s)
110
111clientSession :: Node IPv4 -> MVar () -> RestrictedSocket -> Int -> Handle -> IO ()
112clientSession st signalQuit sock n h = do
113 line <- map toLower . dropWhile isSpace <$> hGetLine h
114 let cmd action = action >> clientSession st signalQuit sock n h
115 case line of
116
117 "quit" -> hPutClient h "goodbye." >> hClose h
118
119 "stop" -> do hPutClient h "Terminating DHT Daemon."
120 hClose h
121 putMVar signalQuit ()
122
123 "ls" -> cmd $ join $ runDHT st $ do
124 tbl <- getTable
125 t <- showTable
126 me <- myNodeIdAccordingTo (read "8.8.8.8:6881")
127 ip <- routableAddress
128 return $ do
129 hPutClient h $ unlines
130 [ t
131 , showReport
132 [ ("node-id", show $ pPrint me)
133 , ("internet address", show ip)
134 , ("buckets", show $ R.shape tbl)]
135 ]
136
137 _ -> cmd $ hPutClient h "error."
138
139main :: IO ()
140main = do
141 godht $ \a me0 -> do
142 printTable
143 bs <- liftIO bootstrapNodes
144 `onException`
145 (Lifted.ioError $ userError "unable to resolve bootstrap nodes")
146 saved_nodes <- resume
147
148 when (isJust saved_nodes) $ do
149 b <- isBootstrapped
150 tbl <- getTable
151 bc <- optBucketCount <$> asks options
152 printTable
153 me <- case concat $ R.toList tbl of
154 (n,_):_ -> myNodeIdAccordingTo (nodeAddr n)
155 _ -> return me0
156 printReport [("node-id",show $ pPrint me)
157 ,("listen-address", show a)
158 ,("bootstrapped", show b)
159 ,("buckets", show $ R.shape tbl)
160 ,("optBucketCount", show bc)
161 ,("dht-nodes.dat", "Running bootstrap...")
162 ]
163
164 st <- ask
165 waitForSignal <- liftIO $ do
166 signalQuit <- newEmptyMVar
167 srv <- streamServer (withSession $ clientSession st signalQuit) (SockAddrUnix "dht.sock")
168 return $ liftIO $ do
169 () <- takeMVar signalQuit
170 quitListening srv
171
172 bootstrap saved_nodes bs
173
174 b <- isBootstrapped
175 tbl <- getTable
176 bc <- optBucketCount <$> asks options
177 printTable
178 ip <- routableAddress
179 me <- case concat $ R.toList tbl of
180 (n,_):_ -> myNodeIdAccordingTo (nodeAddr n)
181 _ -> return me0
182 printReport [("node-id",show $ pPrint me)
183 ,("internet address", show ip)
184 ,("listen-address", show a)
185 ,("bootstrapped", show b)
186 ,("buckets", show $ R.shape tbl)
187 ,("optBucketCount", show bc)
188 ]
189 snapshot >>= liftIO . B.writeFile "dht-nodes.dat"
190
191 waitForSignal
192
diff --git a/src/Network/SocketLike.hs b/src/Network/SocketLike.hs
new file mode 100644
index 00000000..2aa78e3e
--- /dev/null
+++ b/src/Network/SocketLike.hs
@@ -0,0 +1,104 @@
1{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2-- |
3--
4-- A socket could be used indirectly via a 'System.IO.Handle' or a conduit from
5-- Michael Snoyman's conduit package. But doing so presents an encapsulation
6-- problem. Do we allow access to the underlying socket and trust that it wont
7-- be used in an unsafe way? Or do we protect it at the higher level and deny
8-- access to various state information?
9--
10-- The 'SocketLike' class enables the approach that provides a safe wrapper to
11-- the underlying socket and gives access to various state information without
12-- enabling direct reads or writes.
13module Network.SocketLike
14 ( SocketLike(..)
15 , RestrictedSocket
16 , restrictSocket
17 , restrictHandleSocket
18 -- * Re-exports
19 --
20 -- | To make the 'SocketLike' methods less awkward to use, the types
21 -- 'CUInt', 'SockAddr', and 'PortNumber' are re-exported.
22 , CUInt
23 , PortNumber
24 , SockAddr(..)
25 ) where
26
27import Network.Socket
28 ( PortNumber
29 , SockAddr
30 )
31import Foreign.C.Types ( CUInt )
32
33import qualified Network.Socket as NS
34import System.IO (Handle,hClose,hIsOpen)
35
36-- | A safe (mostly read-only) interface to a 'NS.Socket'. Note that despite
37-- how this class is named, it provides no access to typical 'NS.Socket' uses
38-- like sending or receiving network packets.
39class SocketLike sock where
40 -- | See 'NS.getSocketName'
41 getSocketName :: sock -> IO SockAddr
42 -- | See 'NS.getPeerName'
43 getPeerName :: sock -> IO SockAddr
44 -- | See 'NS.getPeerCred'
45 getPeerCred :: sock -> IO (CUInt, CUInt, CUInt)
46 -- | See 'NS.socketPort'
47 socketPort :: sock -> IO PortNumber
48 -- | See 'NS.sIsConnected'
49 --
50 -- __Warning__: Don't rely on this method if it's possible the socket was
51 -- converted into a 'Handle'.
52 sIsConnected :: sock -> IO Bool
53 -- | See 'NS.sIsBound'
54 sIsBound :: sock -> IO Bool
55 -- | See 'NS.sIsListening'
56 sIsListening :: sock -> IO Bool
57 -- | See 'NS.sIsReadable'
58 sIsReadable :: sock -> IO Bool
59 -- | See 'NS.sIsWritable'
60 sIsWritable :: sock -> IO Bool
61
62 -- | This is the only exposed write-access method to the
63 -- underlying state. Usually implemented by 'NS.close'
64 sClose :: sock -> IO ()
65
66instance SocketLike NS.Socket where
67 getSocketName = NS.getSocketName
68 getPeerName = NS.getPeerName
69 getPeerCred = NS.getPeerCred
70 socketPort = NS.socketPort
71 sIsConnected = NS.sIsConnected -- warning: this is always False if the socket
72 -- was converted to a Handle
73 sIsBound = NS.sIsBound
74 sIsListening = NS.sIsListening
75 sIsReadable = NS.sIsReadable
76 sIsWritable = NS.sIsWritable
77
78 sClose = NS.sClose
79
80-- | An encapsulated socket. Data reads and writes are not possible.
81data RestrictedSocket = Restricted (Maybe Handle) NS.Socket deriving Show
82
83instance SocketLike RestrictedSocket where
84 getSocketName (Restricted mb sock) = NS.getSocketName sock
85 getPeerName (Restricted mb sock) = NS.getPeerName sock
86 getPeerCred (Restricted mb sock) = NS.getPeerCred sock
87 socketPort (Restricted mb sock) = NS.socketPort sock
88 sIsConnected (Restricted mb sock) = maybe (NS.sIsConnected sock) (hIsOpen) mb
89 sIsBound (Restricted mb sock) = NS.sIsBound sock
90 sIsListening (Restricted mb sock) = NS.sIsListening sock
91 sIsReadable (Restricted mb sock) = NS.sIsReadable sock
92 sIsWritable (Restricted mb sock) = NS.sIsWritable sock
93 sClose (Restricted mb sock) = maybe (NS.sClose sock) (\h -> hClose h >> NS.sClose sock) mb
94
95-- | Create a 'RestrictedSocket' that explicitly disallows sending or
96-- receiving data.
97restrictSocket :: NS.Socket -> RestrictedSocket
98restrictSocket socket = Restricted Nothing socket
99
100-- | Build a 'RestrictedSocket' for which 'sClose' will close the given
101-- 'Handle'. It is intended that this 'Handle' was obtained via
102-- 'NS.socketToHandle'.
103restrictHandleSocket :: Handle -> NS.Socket -> RestrictedSocket
104restrictHandleSocket h socket = Restricted (Just h) socket
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs
new file mode 100644
index 00000000..a6cead0e
--- /dev/null
+++ b/src/Network/StreamServer.hs
@@ -0,0 +1,143 @@
1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE TypeFamilies #-}
3{-# LANGUAGE TypeOperators #-}
4{-# LANGUAGE OverloadedStrings #-}
5{-# LANGUAGE RankNTypes #-}
6module Network.StreamServer
7 ( streamServer
8 , ServerHandle
9 , ServerConfig(..)
10 , withSession
11 , quitListening
12 , dummyServerHandle
13 ) where
14
15import Data.Monoid
16import Network.Socket as Socket
17import Data.ByteString.Char8
18 ( hGetNonBlocking
19 )
20import qualified Data.ByteString.Char8 as S
21 ( hPutStrLn
22 )
23import System.Directory (removeFile)
24import System.IO
25 ( IOMode(..)
26 , hSetBuffering
27 , BufferMode(..)
28 , hWaitForInput
29 , hClose
30 , hIsEOF
31 , hPutStrLn
32 , stderr
33 , hFlush
34 )
35import Control.Monad
36import Control.Monad.Fix (fix)
37import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId)
38import Control.Exception (catch,handle,try,finally)
39import System.IO.Error (tryIOError)
40import System.Mem.Weak
41import System.IO.Error
42
43-- import Data.Conduit
44import Control.Monad.IO.Class (MonadIO (liftIO))
45import qualified Data.ByteString as S (ByteString)
46import System.IO (Handle)
47import Control.Concurrent.MVar (newMVar)
48
49import Network.SocketLike
50
51data ServerHandle = ServerHandle Socket (Weak ThreadId)
52
53
54-- | Create a useless do-nothing 'ServerHandle'.
55dummyServerHandle :: IO ServerHandle
56dummyServerHandle = do
57 mvar <- newMVar Closed
58 let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar
59 thread <- mkWeakThreadId <=< forkIO $ return ()
60 return (ServerHandle sock thread)
61
62removeSocketFile :: SockAddr -> IO ()
63removeSocketFile (SockAddrUnix fname) = removeFile fname
64removeSocketFile _ = return ()
65
66-- | Terminate the server accept-loop. Call this to shut down the server.
67quitListening :: ServerHandle -> IO ()
68quitListening (ServerHandle socket _) =
69 finally (Socket.getSocketName socket >>= removeSocketFile)
70 (Socket.close socket)
71
72
73-- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString'
74-- variation. (This is not exported.)
75bshow :: Show a => a -> String
76bshow e = show e
77
78-- | Send a string to stderr. Not exported. Default 'serverWarn' when
79-- 'withSession' is used to configure the server.
80warnStderr :: String -> IO ()
81warnStderr str = hPutStrLn stderr str >> hFlush stderr
82
83data ServerConfig = ServerConfig
84 { serverWarn :: String -> IO ()
85 -- ^ Action to report warnings and errors.
86 , serverSession :: RestrictedSocket -> Int -> Handle -> IO ()
87 -- ^ Action to handle interaction with a client
88 }
89
90-- | Initialize a 'ServerConfig' using the provided session handler.
91withSession :: (RestrictedSocket -> Int -> Handle -> IO ()) -> ServerConfig
92withSession session = ServerConfig warnStderr session
93
94-- | Launch a thread to listen at the given bind address and dispatch
95-- to session handler threads on every incomming connection. Supports
96-- IPv4 and IPv6, TCP and unix sockets.
97--
98-- The returned handle can be used with 'quitListening' to terminate the
99-- thread and prevent any new sessions from starting. Currently active
100-- session threads will not be terminated or signaled in any way.
101streamServer :: ServerConfig -> SockAddr -> IO ServerHandle
102streamServer cfg addr = do
103 let warn = serverWarn cfg
104 family = case addr of
105 SockAddrInet {} -> AF_INET
106 SockAddrInet6 {} -> AF_INET6
107 SockAddrUnix {} -> AF_UNIX
108 sock <- socket family Stream 0
109 setSocketOption sock ReuseAddr 1
110 fix $ \loop ->
111 tryIOError (removeSocketFile addr) >> bind sock addr
112 `catchIOError` \e -> do warn $ "bind-error: " <> bshow addr <> " " <> bshow e
113 threadDelay 5000000
114 loop
115 listen sock maxListenQueue
116 thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0
117 return (ServerHandle sock thread)
118
119-- | Not exported. This, combined with 'acceptException' form a mutually recursive
120-- loop that handles incomming connections. To quit the loop, the socket must be
121-- closed by 'quitListening'.
122acceptLoop :: ServerConfig -> Socket -> Int -> IO ()
123acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
124 con <- accept sock
125 let conkey = n + 1
126 h <- socketToHandle (fst con) ReadWriteMode
127 forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h
128 acceptLoop cfg sock (n + 1)
129
130acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()
131acceptException cfg n sock ioerror = do
132 Socket.close sock
133 case show (ioeGetErrorType ioerror) of
134 "resource exhausted" -> do -- try again
135 serverWarn cfg $ ("acceptLoop: resource exhasted")
136 threadDelay 500000
137 acceptLoop cfg sock (n + 1)
138 "invalid argument" -> do -- quit on closed socket
139 return ()
140 message -> do -- unexpected exception
141 serverWarn cfg $ ("acceptLoop: "<>bshow message)
142 return ()
143