{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}
module Striot.Orchestration ( Cost
, CostedPlan(..)
, distributeProgram
, chopAndChange
, viableRewrites
, deriveRewritesAndPartitionings
, makePlans
, planCost
, simpleStream
, partitionGraph
, GenerateOpts(..)
, defaultOpts
, htf_thisModulesTests
) where
import Algebra.Graph
import Data.List (nub, sortOn, sort)
import Data.Maybe (fromJust, isJust)
import Test.Framework
import Data.Function ((&))
import Control.Arrow ((>>>))
import Striot.CompileIoT
import Striot.CompileIoT.Compose (generateDockerCompose)
import Striot.Jackson
import Striot.LogicalOptimiser
import Striot.Partition
import Striot.StreamGraph
import Striot.VizGraph
import Striot.Bandwidth
type Cost = Maybe Int
distributeProgram :: GenerateOpts -> StreamGraph -> IO ()
distributeProgram :: GenerateOpts -> StreamGraph -> IO ()
distributeProgram GenerateOpts
opts StreamGraph
sg = let
Plan StreamGraph
best PartitionMap
partMap = GenerateOpts -> StreamGraph -> Plan
chopAndChange GenerateOpts
opts StreamGraph
sg
in do
StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph StreamGraph
best PartitionMap
partMap GenerateOpts
opts
FilePath -> FilePath -> IO ()
writeFile FilePath
"compose.yml"
(FilePath -> IO ()) -> FilePath -> IO ()
forall a b. (a -> b) -> a -> b
$ PartitionedGraph -> FilePath
generateDockerCompose
(PartitionedGraph -> FilePath) -> PartitionedGraph -> FilePath
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
best PartitionMap
partMap
chopAndChange :: GenerateOpts -> StreamGraph -> Plan
chopAndChange :: GenerateOpts -> StreamGraph -> Plan
chopAndChange GenerateOpts
opts StreamGraph
sg = case GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites GenerateOpts
opts StreamGraph
sg of
[] -> FilePath -> Plan
forall a. HasCallStack => FilePath -> a
error FilePath
"chopAndChange: no viable programs"
[CostedPlan]
rs -> [CostedPlan]
rs [CostedPlan] -> ([CostedPlan] -> [CostedPlan]) -> [CostedPlan]
forall a b. a -> (a -> b) -> b
& (CostedPlan -> Cost) -> [CostedPlan] -> [CostedPlan]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn CostedPlan -> Cost
costedPlanCost [CostedPlan] -> ([CostedPlan] -> CostedPlan) -> CostedPlan
forall a b. a -> (a -> b) -> b
& [CostedPlan] -> CostedPlan
forall a. [a] -> a
head CostedPlan -> (CostedPlan -> Plan) -> Plan
forall a b. a -> (a -> b) -> b
& CostedPlan -> Plan
costedPlanPlan
data CostedPlan = CostedPlan
{ CostedPlan -> Plan
costedPlanPlan :: Plan
, CostedPlan -> Cost
costedPlanCost :: Cost
}
viableRewrites :: GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites :: GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites GenerateOpts
opts = [LabelledRewriteRule] -> StreamGraph -> [Plan]
deriveRewritesAndPartitionings (GenerateOpts -> [LabelledRewriteRule]
rules GenerateOpts
opts)
(StreamGraph -> [Plan])
-> ([Plan] -> [CostedPlan]) -> StreamGraph -> [CostedPlan]
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (Plan -> CostedPlan) -> [Plan] -> [CostedPlan]
forall a b. (a -> b) -> [a] -> [b]
map (\Plan
a -> Plan -> Cost -> CostedPlan
CostedPlan Plan
a (GenerateOpts -> Plan -> Cost
planCost GenerateOpts
opts Plan
a))
([Plan] -> [CostedPlan])
-> ([CostedPlan] -> [CostedPlan]) -> [Plan] -> [CostedPlan]
forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> (CostedPlan -> Bool) -> [CostedPlan] -> [CostedPlan]
forall a. (a -> Bool) -> [a] -> [a]
filter (Cost -> Bool
forall a. Maybe a -> Bool
isJust (Cost -> Bool) -> (CostedPlan -> Cost) -> CostedPlan -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CostedPlan -> Cost
costedPlanCost)
test_viableRewrites_graph :: IO ()
test_viableRewrites_graph = [CostedPlan] -> IO ()
forall a. HasCallStack => [a] -> IO ()
assertNotEmpty ([CostedPlan] -> IO ()) -> [CostedPlan] -> IO ()
forall a b. (a -> b) -> a -> b
$ GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites GenerateOpts
defaultOpts StreamGraph
graph
test_viableRewrites_tooMuch :: IO ()
test_viableRewrites_tooMuch = [CostedPlan] -> IO ()
forall a. HasCallStack => [a] -> IO ()
assertEmpty ([CostedPlan] -> IO ()) -> [CostedPlan] -> IO ()
forall a b. (a -> b) -> a -> b
$ GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites GenerateOpts
defaultOpts StreamGraph
tooMuch
deriveRewritesAndPartitionings :: [LabelledRewriteRule] -> StreamGraph -> [Plan]
deriveRewritesAndPartitionings :: [LabelledRewriteRule] -> StreamGraph -> [Plan]
deriveRewritesAndPartitionings [LabelledRewriteRule]
rs = (StreamGraph -> [Plan]) -> [StreamGraph] -> [Plan]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap StreamGraph -> [Plan]
makePlans
([StreamGraph] -> [Plan])
-> (StreamGraph -> [StreamGraph]) -> StreamGraph -> [Plan]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> [StreamGraph]
forall a. Eq a => [a] -> [a]
nub
([StreamGraph] -> [StreamGraph])
-> (StreamGraph -> [StreamGraph]) -> StreamGraph -> [StreamGraph]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Variant -> StreamGraph) -> [Variant] -> [StreamGraph]
forall a b. (a -> b) -> [a] -> [b]
map Variant -> StreamGraph
variantGraph
([Variant] -> [StreamGraph])
-> (StreamGraph -> [Variant]) -> StreamGraph -> [StreamGraph]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [LabelledRewriteRule] -> StreamGraph -> [Variant]
rewriteGraph [LabelledRewriteRule]
rs
makePlans :: StreamGraph -> [Plan]
makePlans :: StreamGraph -> [Plan]
makePlans StreamGraph
sg = (PartitionMap -> Plan) -> [PartitionMap] -> [Plan]
forall a b. (a -> b) -> [a] -> [b]
map (StreamGraph -> PartitionMap -> Plan
Plan StreamGraph
sg) (StreamGraph -> [PartitionMap]
allPartitions StreamGraph
sg)
planCost :: GenerateOpts -> Plan -> Cost
planCost :: GenerateOpts -> Plan -> Cost
planCost GenerateOpts
opts plan :: Plan
plan@(Plan StreamGraph
sg PartitionMap
pm) = let
oi :: [OperatorInfo]
oi = StreamGraph -> [OperatorInfo]
calcAllSg StreamGraph
sg
in if [OperatorInfo] -> Bool
isOverUtilised [OperatorInfo]
oi
Bool -> Bool -> Bool
|| (Double -> Bool) -> [Double] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> GenerateOpts -> Double
maxNodeUtil GenerateOpts
opts) ([OperatorInfo] -> PartitionMap -> [Double]
totalNodeUtilisations [OperatorInfo]
oi PartitionMap
pm)
Bool -> Bool -> Bool
|| Plan -> Double -> Bool
overBandwidthLimit Plan
plan (GenerateOpts -> Double
maxBandwidth GenerateOpts
opts)
then Cost
forall a. Maybe a
Nothing
else Int -> Cost
forall a. a -> Maybe a
Just (PartitionMap -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length PartitionMap
pm)
opts :: GenerateOpts
opts = GenerateOpts
defaultOpts { imports :: [FilePath]
imports = GenerateOpts -> [FilePath]
imports GenerateOpts
defaultOpts [FilePath] -> [FilePath] -> [FilePath]
forall a. [a] -> [a] -> [a]
++ [ FilePath
"System.Random" ] }
source :: Q Exp
source = [| do
i <- getStdRandom (randomR (1,10)) :: IO Int
threadDelay 1000000
putStrLn $ "client sending " ++ (show i)
return i
|]
v1 :: StreamVertex
v1 = Int
-> StreamOperator
-> [Q Exp]
-> FilePath
-> FilePath
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [Q Exp
source] FilePath
"Int" FilePath
"Int" Double
0
v2 :: StreamVertex
v2 = Int
-> StreamOperator
-> [Q Exp]
-> FilePath
-> FilePath
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Source Double
1) [Q Exp
source] FilePath
"Int" FilePath
"Int" Double
0
v3 :: StreamVertex
v3 = Int
-> StreamOperator
-> [Q Exp]
-> FilePath
-> FilePath
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Merge [] FilePath
"Int" FilePath
"Int" Double
0
v4 :: StreamVertex
v4 = Int
-> StreamOperator
-> [Q Exp]
-> FilePath
-> FilePath
-> Double
-> StreamVertex
StreamVertex Int
3 (Double -> StreamOperator
Filter Double
0.5) [[| (>3) |]] FilePath
"Int" FilePath
"Int" Double
1
v5 :: StreamVertex
v5 = Int
-> StreamOperator
-> [Q Exp]
-> FilePath
-> FilePath
-> Double
-> StreamVertex
StreamVertex Int
4 StreamOperator
Sink [[| mapM_ print |]] FilePath
"Int" FilePath
"Int" Double
0
graph :: StreamGraph
graph = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
overlay ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v1,StreamVertex
v3,StreamVertex
v4,StreamVertex
v5]) ([StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [StreamVertex
v2,StreamVertex
v3])
test_graph_notviable :: IO ()
test_graph_notviable = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool ([OperatorInfo] -> Bool
isOverUtilised ([OperatorInfo] -> Bool)
-> (StreamGraph -> [OperatorInfo]) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [OperatorInfo]
calcAllSg (StreamGraph -> Bool) -> StreamGraph -> Bool
forall a b. (a -> b) -> a -> b
$ StreamGraph
graph)
tooMuch :: StreamGraph
tooMuch = [(StreamOperator, [Q Exp], FilePath, Double)] -> StreamGraph
simpleStream
[ (Double -> StreamOperator
Source Double
2, [[| return 1 |]], FilePath
"Int", Double
0)
, (StreamOperator
Map, [[| (+1) |]], FilePath
"Int", Double
1)
, (StreamOperator
Sink, [[| mapM_ print |]], FilePath
"Int", Double
0)
]
test_tooMuch_notviable :: IO ()
test_tooMuch_notviable = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool ([OperatorInfo] -> Bool
isOverUtilised ([OperatorInfo] -> Bool)
-> (StreamGraph -> [OperatorInfo]) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [OperatorInfo]
calcAllSg (StreamGraph -> Bool) -> StreamGraph -> Bool
forall a b. (a -> b) -> a -> b
$ StreamGraph
tooMuch)
main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests
totalNodeUtilisations :: [OperatorInfo] -> PartitionMap -> [Double]
totalNodeUtilisations :: [OperatorInfo] -> PartitionMap -> [Double]
totalNodeUtilisations [OperatorInfo]
oi PartitionMap
pm =
let oi' :: [(Int, OperatorInfo)]
oi' = (OperatorInfo -> (Int, OperatorInfo))
-> [OperatorInfo] -> [(Int, OperatorInfo)]
forall a b. (a -> b) -> [a] -> [b]
map ((OperatorInfo -> Int) -> OperatorInfo -> (Int, OperatorInfo)
forall a b. (a -> b) -> a -> (b, a)
toFst OperatorInfo -> Int
opId) [OperatorInfo]
oi
in ([Int] -> Double) -> PartitionMap -> [Double]
forall a b. (a -> b) -> [a] -> [b]
map ([(Int, OperatorInfo)] -> [Int] -> Double
sumPartitionUtilisation [(Int, OperatorInfo)]
oi') PartitionMap
pm
toFst :: (a -> b) -> a -> (b, a)
toFst :: forall a b. (a -> b) -> a -> (b, a)
toFst a -> b
f a
a = (a -> b
f a
a, a
a)
sumPartitionUtilisation :: [(Int, OperatorInfo)] -> [Partition] -> Double
sumPartitionUtilisation :: [(Int, OperatorInfo)] -> [Int] -> Double
sumPartitionUtilisation [(Int, OperatorInfo)]
opInfo =
[Double] -> Double
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Double] -> Double) -> ([Int] -> [Double]) -> [Int] -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Double) -> [Int] -> [Double]
forall a b. (a -> b) -> [a] -> [b]
map (OperatorInfo -> Double
util (OperatorInfo -> Double) -> (Int -> OperatorInfo) -> Int -> Double
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe OperatorInfo -> OperatorInfo
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe OperatorInfo -> OperatorInfo)
-> (Int -> Maybe OperatorInfo) -> Int -> OperatorInfo
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Int -> [(Int, OperatorInfo)] -> Maybe OperatorInfo)
-> [(Int, OperatorInfo)] -> Int -> Maybe OperatorInfo
forall a b c. (a -> b -> c) -> b -> a -> c
flip Int -> [(Int, OperatorInfo)] -> Maybe OperatorInfo
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup) [(Int, OperatorInfo)]
opInfo)
partUtilGraph :: StreamGraph
partUtilGraph = [(StreamOperator, [Q Exp], FilePath, Double)] -> StreamGraph
simpleStream
[ ( Double -> StreamOperator
Source Double
1, [[| tempSensor |]], FilePath
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], FilePath
"Int", Double
1 )
, ( Double -> StreamOperator
Filter Double
1, [[| over100 |]], FilePath
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], FilePath
"Int", Double
1 )
, ( Double -> StreamOperator
Filter Double
1, [[| over100 |]], FilePath
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], FilePath
"Int", Double
1 )
, ( Double -> StreamOperator
Filter Double
1, [[| over100 |]], FilePath
"Int", Double
1 )
, ( StreamOperator
Map, [[| farToCels |]], FilePath
"Int", Double
1 )
, ( StreamOperator
Sink, [[| mapM_ print |]], FilePath
"IO ()", Double
1 )
]
test_overUtilisedPartition_minThreePartitions :: IO ()
test_overUtilisedPartition_minThreePartitions = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$
(Bool -> Bool
not (Bool -> Bool) -> (StreamGraph -> Bool) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Bool) -> [Int] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<Int
3) ([Int] -> Bool) -> (StreamGraph -> [Int]) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (CostedPlan -> Int) -> [CostedPlan] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map (PartitionMap -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length(PartitionMap -> Int)
-> (CostedPlan -> PartitionMap) -> CostedPlan -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
.Plan -> PartitionMap
planPartitionMap(Plan -> PartitionMap)
-> (CostedPlan -> Plan) -> CostedPlan -> PartitionMap
forall b c a. (b -> c) -> (a -> b) -> a -> c
.CostedPlan -> Plan
costedPlanPlan) ([CostedPlan] -> [Int])
-> (StreamGraph -> [CostedPlan]) -> StreamGraph -> [Int]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites GenerateOpts
defaultOpts) StreamGraph
partUtilGraph
test_overUtilisedPartition_rejected :: IO ()
test_overUtilisedPartition_rejected =
Cost -> IO ()
forall a. (HasCallStack, Show a) => Maybe a -> IO ()
assertNothing (GenerateOpts -> Plan -> Cost
planCost GenerateOpts
defaultOpts (StreamGraph -> PartitionMap -> Plan
Plan StreamGraph
partUtilGraph [[Int
1,Int
2],[Int
3,Int
4,Int
5,Int
6,Int
7,Int
8,Int
9]]))
test_overUtilisedPartition_acceptable :: IO ()
test_overUtilisedPartition_acceptable = PartitionMap -> [PartitionMap] -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> [a] -> IO ()
assertElem [[Int
1,Int
2,Int
3],[Int
4,Int
5,Int
6],[Int
7,Int
8,Int
9]]
([PartitionMap] -> IO ()) -> [PartitionMap] -> IO ()
forall a b. (a -> b) -> a -> b
$ (PartitionMap -> PartitionMap) -> [PartitionMap] -> [PartitionMap]
forall a b. (a -> b) -> [a] -> [b]
map (PartitionMap -> PartitionMap
forall a. Ord a => [a] -> [a]
sort (PartitionMap -> PartitionMap)
-> (PartitionMap -> PartitionMap) -> PartitionMap -> PartitionMap
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (([Int] -> [Int]) -> PartitionMap -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort))
([PartitionMap] -> [PartitionMap])
-> [PartitionMap] -> [PartitionMap]
forall a b. (a -> b) -> a -> b
$ ((CostedPlan -> PartitionMap) -> [CostedPlan] -> [PartitionMap]
forall a b. (a -> b) -> [a] -> [b]
map (Plan -> PartitionMap
planPartitionMap(Plan -> PartitionMap)
-> (CostedPlan -> Plan) -> CostedPlan -> PartitionMap
forall b c a. (b -> c) -> (a -> b) -> a -> c
.CostedPlan -> Plan
costedPlanPlan) ([CostedPlan] -> [PartitionMap])
-> (StreamGraph -> [CostedPlan]) -> StreamGraph -> [PartitionMap]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GenerateOpts -> StreamGraph -> [CostedPlan]
viableRewrites GenerateOpts
defaultOpts) StreamGraph
partUtilGraph