{-# OPTIONS_GHC -F -pgmF htfpp #-}
{-# OPTIONS_HADDOCK prune #-}
{-# LANGUAGE TemplateHaskell #-}
module Striot.CompileIoT ( createPartitions
, partitionGraph
, GenerateOpts(..)
, defaultOpts
, Partition
, PartitionMap
, Plan(..)
, generateCode
, writePart
, genDockerfile
, generateCodeFromStreamGraph
, nodeFn
, nodeType
, generateNodeSrc
, connectNodeId
, htf_thisModulesTests
) where
import Data.List (intercalate, nub)
import Algebra.Graph
import Algebra.Graph.ToGraph (reachable)
import Test.Framework
import System.FilePath ((</>))
import System.Directory (createDirectoryIfMissing)
import Data.Function ((&))
import Data.Maybe (catMaybes)
import Data.List (nub,sort)
import Data.List.Match (compareLength)
import Language.Haskell.TH
import Striot.StreamGraph
import Striot.LogicalOptimiser
import Striot.Partition
type Partition = Int
type PartitionMap = [[Int]]
data Plan = Plan { Plan -> StreamGraph
planStreamGraph :: StreamGraph
, Plan -> PartitionMap
planPartitionMap :: PartitionMap }
deriving (Plan -> Plan -> Bool
(Plan -> Plan -> Bool) -> (Plan -> Plan -> Bool) -> Eq Plan
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Plan -> Plan -> Bool
$c/= :: Plan -> Plan -> Bool
== :: Plan -> Plan -> Bool
$c== :: Plan -> Plan -> Bool
Eq)
createPartitions :: StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions :: StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
_ [] = ([],StreamGraph
forall a. Graph a
empty)
createPartitions StreamGraph
g ([Int]
p:PartitionMap
ps) = (StreamGraph
thisGraphStreamGraph -> [StreamGraph] -> [StreamGraph]
forall a. a -> [a] -> [a]
:[StreamGraph]
tailParts, StreamGraph
edgesOut StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
`overlay` StreamGraph
tailCuts) where
fv :: StreamVertex -> Bool
fv StreamVertex
v = (StreamVertex -> Int
vertexId StreamVertex
v) Int -> [Int] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Int]
p
thisGraph :: StreamGraph
thisGraph = (StreamVertex -> Bool) -> StreamGraph -> StreamGraph
forall a. (a -> Bool) -> Graph a -> Graph a
induce StreamVertex -> Bool
fv StreamGraph
g
edgesOut :: StreamGraph
edgesOut = [(StreamVertex, StreamVertex)] -> StreamGraph
forall a. [(a, a)] -> Graph a
edges ([(StreamVertex, StreamVertex)] -> StreamGraph)
-> [(StreamVertex, StreamVertex)] -> StreamGraph
forall a b. (a -> b) -> a -> b
$ ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
v1,StreamVertex
v2) -> (StreamVertex -> Bool
fv StreamVertex
v1) Bool -> Bool -> Bool
&& (Bool -> Bool
not(StreamVertex -> Bool
fv StreamVertex
v2))) (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
g)
([StreamGraph]
tailParts, StreamGraph
tailCuts) = StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
g PartitionMap
ps
unPartition :: PartitionedGraph -> Graph StreamVertex
unPartition :: PartitionedGraph -> StreamGraph
unPartition ([StreamGraph]
a,StreamGraph
b) = [StreamGraph] -> StreamGraph
forall a. [Graph a] -> Graph a
overlays (StreamGraph
bStreamGraph -> [StreamGraph] -> [StreamGraph]
forall a. a -> [a] -> [a]
:[StreamGraph]
a)
data GenerateOpts = GenerateOpts
{ GenerateOpts -> [String]
imports :: [String]
, GenerateOpts -> [String]
packages :: [String]
, GenerateOpts -> Maybe String
preSource :: Maybe String
, GenerateOpts -> [LabelledRewriteRule]
rules :: [LabelledRewriteRule]
, GenerateOpts -> Double
maxNodeUtil :: Double
, GenerateOpts -> Double
maxBandwidth :: Double
}
defaultOpts :: GenerateOpts
defaultOpts = GenerateOpts :: [String]
-> [String]
-> Maybe String
-> [LabelledRewriteRule]
-> Double
-> Double
-> GenerateOpts
GenerateOpts
{ imports :: [String]
imports = [ String
"Striot.FunctionalIoTtypes"
, String
"Striot.FunctionalProcessing"
, String
"Striot.Nodes"
, String
"Control.Concurrent"
, String
"Control.Category ((>>>))"
]
, packages :: [String]
packages = []
, preSource :: Maybe String
preSource = Maybe String
forall a. Maybe a
Nothing
, rules :: [LabelledRewriteRule]
rules = [LabelledRewriteRule]
defaultRewriteRules
, maxNodeUtil :: Double
maxNodeUtil = Double
3.0
, maxBandwidth :: Double
maxBandwidth= Double
200
}
generateCode :: GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode :: GenerateOpts -> StreamGraph -> PartitionMap -> [String]
generateCode GenerateOpts
opts StreamGraph
sg PartitionMap
pm = let
([StreamGraph]
sgs,StreamGraph
cuts) = StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
sg (PartitionMap -> PartitionMap
forall a. Ord a => [a] -> [a]
sort (([Int] -> [Int]) -> PartitionMap -> PartitionMap
forall a b. (a -> b) -> [a] -> [b]
map [Int] -> [Int]
forall a. Ord a => [a] -> [a]
sort PartitionMap
pm))
enumeratedParts :: [(Integer, StreamGraph)]
enumeratedParts = [Integer] -> [StreamGraph] -> [(Integer, StreamGraph)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [StreamGraph]
sgs
in ((Integer, StreamGraph) -> String)
-> [(Integer, StreamGraph)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (GenerateOpts
-> [(Integer, StreamGraph)]
-> StreamGraph
-> (Integer, StreamGraph)
-> String
generateCodeFromStreamGraph GenerateOpts
opts [(Integer, StreamGraph)]
enumeratedParts StreamGraph
cuts) [(Integer, StreamGraph)]
enumeratedParts
data NodeType = NodeSource | NodeSink | NodeLink deriving (Int -> NodeType -> ShowS
[NodeType] -> ShowS
NodeType -> String
(Int -> NodeType -> ShowS)
-> (NodeType -> String) -> ([NodeType] -> ShowS) -> Show NodeType
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [NodeType] -> ShowS
$cshowList :: [NodeType] -> ShowS
show :: NodeType -> String
$cshow :: NodeType -> String
showsPrec :: Int -> NodeType -> ShowS
$cshowsPrec :: Int -> NodeType -> ShowS
Show)
nodeType :: StreamGraph -> NodeType
nodeType :: StreamGraph -> NodeType
nodeType StreamGraph
sg = if StreamOperator -> Bool
isSource (StreamOperator -> Bool) -> StreamOperator -> Bool
forall a b. (a -> b) -> a -> b
$ StreamVertex -> StreamOperator
operator ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg))
then NodeType
NodeSource
else if (StreamVertex -> StreamOperator
operator(StreamVertex -> StreamOperator)
-> (StreamGraph -> StreamVertex) -> StreamGraph -> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
.[StreamVertex] -> StreamVertex
forall a. [a] -> a
head([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
.[StreamVertex] -> [StreamVertex]
forall a. [a] -> [a]
reverse([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
.StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Sink
then NodeType
NodeSink
else NodeType
NodeLink
generateCodeFromStreamGraph :: GenerateOpts -> [(Integer, StreamGraph)] -> StreamGraph -> (Integer,StreamGraph) -> String
generateCodeFromStreamGraph :: GenerateOpts
-> [(Integer, StreamGraph)]
-> StreamGraph
-> (Integer, StreamGraph)
-> String
generateCodeFromStreamGraph GenerateOpts
opts [(Integer, StreamGraph)]
parts StreamGraph
cuts (Integer
partId,StreamGraph
sg) = String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$
String
nodeId String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
[String]
imports' [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++
[(Integer, StreamGraph)] -> StreamGraph -> String
forall {t :: * -> *} {a}.
Foldable t =>
t a -> StreamGraph -> String
possibleSrcFn [(Integer, StreamGraph)]
parts StreamGraph
sg String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
[(Integer, StreamGraph)] -> StreamGraph -> String
forall {t :: * -> *} {a}.
Foldable t =>
t a -> StreamGraph -> String
possibleSinkFn [(Integer, StreamGraph)]
parts StreamGraph
sg String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
String
sgTypeSignature String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
String
sgIntro String -> [String] -> [String]
forall a. a -> [a] -> [a]
:
[String]
sgBody [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++
[String
padding String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"in " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
lastIdentifier,String
"\n",
String
"main :: IO ()",
[(Integer, StreamGraph)]
-> StreamGraph -> Integer -> StreamGraph -> GenerateOpts -> String
nodeFn [(Integer, StreamGraph)]
parts StreamGraph
sg Integer
partId StreamGraph
cuts GenerateOpts
opts] where
nodeId :: String
nodeId = String
"-- node"String -> ShowS
forall a. [a] -> [a] -> [a]
++(Integer -> String
forall a. Show a => a -> String
show Integer
partId)
padding :: String
padding = String
" "
pad :: [String] -> [String]
pad = ShowS -> [String] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String
paddingString -> ShowS
forall a. [a] -> [a] -> [a]
++)
sgTypeSignature :: String
sgTypeSignature = String
"streamGraphFn ::" String -> ShowS
forall a. [a] -> [a] -> [a]
++ case StreamGraph -> StreamOperator
startsWith StreamGraph
sg of
StreamOperator
Join -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"
String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"
String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
StreamOperator
Merge -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
StreamOperator
_ -> String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
inType StreamGraph
sgString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" ->"String -> ShowS
forall a. [a] -> [a] -> [a]
++String
" Stream "String -> ShowS
forall a. [a] -> [a] -> [a]
++StreamGraph -> String
outType StreamGraph
sg
sgIntro :: String
sgIntro = String
"streamGraphFn "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
sgArgsString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" = let"
sgArgs :: String
sgArgs = if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then String
"n1 n2"
else String
"n1"
sgBody :: [String]
sgBody = [String] -> [String]
pad ([String] -> [String]) -> [String] -> [String]
forall a b. (a -> b) -> a -> b
$ case [StreamVertex]
intVerts of
[] -> [String
"n2 = n1"]
[StreamVertex]
ns -> if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then ((Int, StreamVertex) -> String)
-> [(Int, StreamVertex)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Int, StreamVertex) -> String
generateCodeFromVertex ([Int] -> [StreamVertex] -> [(Int, StreamVertex)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
3..] [StreamVertex]
ns)
else ((Int, StreamVertex) -> String)
-> [(Int, StreamVertex)] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (Int, StreamVertex) -> String
generateCodeFromVertex ([Int] -> [StreamVertex] -> [(Int, StreamVertex)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
2..] [StreamVertex]
ns)
imports' :: [String]
imports' = (ShowS -> [String] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (String
"import "String -> ShowS
forall a. [a] -> [a] -> [a]
++) (GenerateOpts -> [String]
imports GenerateOpts
opts)) [String] -> [String] -> [String]
forall a. [a] -> [a] -> [a]
++ [String
"\n"]
lastIdentifier :: String
lastIdentifier = Char
'n'Char -> ShowS
forall a. a -> [a] -> [a]
:(Int -> String
forall a. Show a => a -> String
show (Int -> String) -> Int -> String
forall a b. (a -> b) -> a -> b
$ [StreamVertex] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [StreamVertex]
intVerts
Int -> Int -> Int
forall a. Num a => a -> a -> a
+ if StreamGraph -> Bool
startsWithJoin StreamGraph
sg then Int
2 else Int
1)
intVerts :: [StreamVertex]
intVerts= (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (StreamVertex -> Bool) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> Bool
singleton) ([StreamVertex] -> [StreamVertex])
-> [StreamVertex] -> [StreamVertex]
forall a b. (a -> b) -> a -> b
$ StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg
nodeFn :: [(Integer, StreamGraph)]
-> StreamGraph -> Integer -> StreamGraph -> GenerateOpts -> String
nodeFn [(Integer, StreamGraph)]
parts StreamGraph
sg Integer
partId StreamGraph
cuts GenerateOpts
opts =
if [(Integer, StreamGraph)] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [(Integer, StreamGraph)]
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then String
"main = nodeSimple src1 streamGraphFn sink1"
else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSource -> Integer
-> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc Integer
partId (StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId StreamGraph
sg [(Integer, StreamGraph)]
parts StreamGraph
cuts) GenerateOpts
opts [(Integer, StreamGraph)]
parts
NodeType
NodeLink -> Integer -> String
generateNodeLink (Integer
partId Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1)
NodeType
NodeSink -> StreamGraph -> String
generateNodeSink StreamGraph
sg
possibleSrcFn :: t a -> StreamGraph -> String
possibleSrcFn t a
parts StreamGraph
sg =
if t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then StreamGraph -> String
generateSrcFn StreamGraph
sg
else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSource -> StreamGraph -> String
generateSrcFn StreamGraph
sg
NodeType
_ -> String
""
possibleSinkFn :: t a -> StreamGraph -> String
possibleSinkFn t a
parts StreamGraph
sg =
if t a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a
parts Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
then StreamGraph -> String
generateSinkFn StreamGraph
sg
else case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSink -> StreamGraph -> String
generateSinkFn StreamGraph
sg
NodeType
_ -> String
""
possibleSrcSinkFn :: StreamGraph -> String
possibleSrcSinkFn StreamGraph
sg = case (StreamGraph -> NodeType
nodeType StreamGraph
sg) of
NodeType
NodeSource -> StreamGraph -> String
generateSrcFn StreamGraph
sg
NodeType
NodeLink -> String
""
NodeType
NodeSink -> StreamGraph -> String
generateSinkFn StreamGraph
sg
outType :: StreamGraph -> String
outType :: StreamGraph -> String
outType StreamGraph
sg = let node :: StreamVertex
node = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
last ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
in if StreamVertex -> StreamOperator
operator StreamVertex
node StreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
== StreamOperator
Sink
then StreamVertex -> String
intype StreamVertex
node
else StreamVertex -> String
outtype StreamVertex
node
inType :: StreamGraph -> String
inType :: StreamGraph -> String
inType StreamGraph
sg = let node :: StreamVertex
node = ([StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
in if StreamOperator -> Bool
isSource (StreamOperator -> Bool) -> StreamOperator -> Bool
forall a b. (a -> b) -> a -> b
$ StreamVertex -> StreamOperator
operator StreamVertex
node
then StreamVertex -> String
outtype StreamVertex
node
else StreamVertex -> String
intype StreamVertex
node
t :: StreamGraph
t = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 (Double -> StreamOperator
Source Double
1) [[| return 0 |]] String
"IO Int" String
"Int" Double
1
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Map [[| show |]] String
"Int" String
"String" Double
2
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 StreamOperator
Sink [[| mapM_ putStrLn |]] String
"String" String
"IO ()" Double
3
]
test_outType :: IO ()
test_outType = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"String" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
(StreamGraph -> String
outType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1],[Int
2]])
test_outType_sink :: IO ()
test_outType_sink = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"String" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
(StreamGraph -> String
outType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1,Int
2]])
test_inType :: IO ()
test_inType = String -> String -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual String
"Int" (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$
(StreamGraph -> String
inType (StreamGraph -> String)
-> (PartitionedGraph -> StreamGraph) -> PartitionedGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamGraph] -> StreamGraph
forall a. [a] -> a
head ([StreamGraph] -> StreamGraph)
-> (PartitionedGraph -> [StreamGraph])
-> PartitionedGraph
-> StreamGraph
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartitionedGraph -> [StreamGraph]
forall a b. (a, b) -> a
fst) (StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
t [[Int
0,Int
1],[Int
2]])
connectNodeId :: StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId :: StreamGraph -> [(Integer, StreamGraph)] -> StreamGraph -> [Integer]
connectNodeId StreamGraph
sg [(Integer, StreamGraph)]
parts StreamGraph
cuts = let
edges :: [(StreamVertex, StreamVertex)]
edges = StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
cuts
outs :: [StreamVertex]
outs = StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg
outEs :: [(StreamVertex, StreamVertex)]
outEs = ((StreamVertex, StreamVertex) -> Bool)
-> [(StreamVertex, StreamVertex)] -> [(StreamVertex, StreamVertex)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(StreamVertex
f,StreamVertex
t) -> StreamVertex
f StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [StreamVertex]
outs) [(StreamVertex, StreamVertex)]
edges
destVs :: [StreamVertex]
destVs= ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd [(StreamVertex, StreamVertex)]
outEs
destGs :: [(Integer, StreamGraph)]
destGs= (StreamVertex -> [(Integer, StreamGraph)])
-> [StreamVertex] -> [(Integer, StreamGraph)]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\StreamVertex
v -> ((Integer, StreamGraph) -> Bool)
-> [(Integer, StreamGraph)] -> [(Integer, StreamGraph)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(Integer
n,StreamGraph
sg) -> StreamVertex
v StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` (StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList StreamGraph
sg)) [(Integer, StreamGraph)]
parts) [StreamVertex]
destVs
in case ((Integer, StreamGraph) -> Integer)
-> [(Integer, StreamGraph)] -> [Integer]
forall a b. (a -> b) -> [a] -> [b]
map (Integer, StreamGraph) -> Integer
forall a b. (a, b) -> a
fst [(Integer, StreamGraph)]
destGs of
[] -> String -> [Integer]
forall a. HasCallStack => String -> a
error String
"connectNodeId returned an empty list, last vertex optimised away?"
[Integer]
x -> [Integer]
x
generateSrcFn :: StreamGraph -> String
generateSrcFn :: StreamGraph -> String
generateSrcFn StreamGraph
sg = String
"src1 = " String -> ShowS
forall a. [a] -> [a] -> [a]
++
(String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String)
-> (StreamGraph -> [String]) -> StreamGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam ([ExpQ] -> [String])
-> (StreamGraph -> [ExpQ]) -> StreamGraph -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters (StreamVertex -> [ExpQ])
-> (StreamGraph -> StreamVertex) -> StreamGraph -> [ExpQ]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> String) -> StreamGraph -> String
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n"
generateSinkFn:: StreamGraph -> String
generateSinkFn :: StreamGraph -> String
generateSinkFn StreamGraph
sg = String
"sink1 :: Show a => Stream a -> IO ()\nsink1 = " String -> ShowS
forall a. [a] -> [a] -> [a]
++
(String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
"\n" ([String] -> String)
-> (StreamGraph -> [String]) -> StreamGraph -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map ExpQ -> String
showParam ([ExpQ] -> [String])
-> (StreamGraph -> [ExpQ]) -> StreamGraph -> [String]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamVertex -> [ExpQ]
parameters (StreamVertex -> [ExpQ])
-> (StreamGraph -> StreamVertex) -> StreamGraph -> [ExpQ]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> [StreamVertex]
forall a. [a] -> [a]
reverse ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList (StreamGraph -> String) -> StreamGraph -> String
forall a b. (a -> b) -> a -> b
$ StreamGraph
sg) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n"
generateNodeLink :: Integer -> String
generateNodeLink :: Integer -> String
generateNodeLink Integer
n = String
"main = nodeLink (defaultLink \"9001\" \"node"String -> ShowS
forall a. [a] -> [a] -> [a]
++(Integer -> String
forall a. Show a => a -> String
show Integer
n)String -> ShowS
forall a. [a] -> [a] -> [a]
++String
"\" \"9001\") streamGraphFn"
generateNodeSrc :: Integer -> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc :: Integer
-> [Integer] -> GenerateOpts -> [(Integer, StreamGraph)] -> String
generateNodeSrc Integer
partId [Integer]
nodes GenerateOpts
opts [(Integer, StreamGraph)]
parts = let
node :: Integer
node = [Integer] -> Integer
forall a. [a] -> a
head [Integer]
nodes
host :: String
host = String
"node" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (Integer -> String
forall a. Show a => a -> String
show Integer
node)
port :: Integer
port = case Integer -> [(Integer, StreamGraph)] -> Maybe StreamGraph
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup Integer
node [(Integer, StreamGraph)]
parts of
Just StreamGraph
sg -> if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then Integer
9001 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
partId Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
-Integer
1
else Integer
9001
Maybe StreamGraph
Nothing -> Integer
9001
pref :: String
pref = case GenerateOpts -> Maybe String
preSource GenerateOpts
opts of
Maybe String
Nothing -> String
""
Just String
f -> String
f String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"\n "
in String
"main = do\n " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
pref String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ExpQ -> String
showParam [|
nodeSource (defaultSource $(litE (StringL host))
$(litE (StringL (show port))))
src1 streamGraphFn
|])
startsWithJoin :: StreamGraph -> Bool
startsWithJoin :: StreamGraph -> Bool
startsWithJoin = (StreamOperator
JoinStreamOperator -> StreamOperator -> Bool
forall a. Eq a => a -> a -> Bool
==) (StreamOperator -> Bool)
-> (StreamGraph -> StreamOperator) -> StreamGraph -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> StreamOperator
startsWith
startsWith :: StreamGraph -> StreamOperator
startsWith :: StreamGraph -> StreamOperator
startsWith StreamGraph
sg = let
inEdges :: [StreamVertex]
inEdges = ((StreamVertex, StreamVertex) -> StreamVertex)
-> [(StreamVertex, StreamVertex)] -> [StreamVertex]
forall a b. (a -> b) -> [a] -> [b]
map (StreamVertex, StreamVertex) -> StreamVertex
forall a b. (a, b) -> b
snd (StreamGraph -> [(StreamVertex, StreamVertex)]
forall a. Ord a => Graph a -> [(a, a)]
edgeList StreamGraph
sg)
in (StreamVertex -> StreamOperator
operator (StreamVertex -> StreamOperator)
-> (StreamGraph -> StreamVertex) -> StreamGraph -> StreamOperator
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamVertex
forall a. [a] -> a
head ([StreamVertex] -> StreamVertex)
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> StreamVertex
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamVertex -> Bool) -> [StreamVertex] -> [StreamVertex]
forall a. (a -> Bool) -> [a] -> [a]
filter (Bool -> Bool
not (Bool -> Bool) -> (StreamVertex -> Bool) -> StreamVertex -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((StreamVertex -> [StreamVertex] -> Bool)
-> [StreamVertex] -> StreamVertex -> Bool
forall a b c. (a -> b -> c) -> b -> a -> c
flip StreamVertex -> [StreamVertex] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem) [StreamVertex]
inEdges) ([StreamVertex] -> [StreamVertex])
-> (StreamGraph -> [StreamVertex]) -> StreamGraph -> [StreamVertex]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> [StreamVertex]
forall a. Ord a => Graph a -> [a]
vertexList) StreamGraph
sg
test_startsWithJoin_1 :: IO ()
test_startsWithJoin_1 = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ())
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Bool
startsWithJoin (StreamGraph -> Bool)
-> ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path ([StreamVertex] -> IO ()) -> [StreamVertex] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Join [] String
"" String
"" Double
1, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Merge [] String
"" String
"" Double
2]
test_startsWithJoin_2 :: IO ()
test_startsWithJoin_2 = HasCallStack => Bool -> IO ()
Bool -> IO ()
assertBool (Bool -> IO ())
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> Bool
not (Bool -> Bool)
-> ([StreamVertex] -> Bool) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamGraph -> Bool
startsWithJoin (StreamGraph -> Bool)
-> ([StreamVertex] -> StreamGraph) -> [StreamVertex] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path ([StreamVertex] -> IO ()) -> [StreamVertex] -> IO ()
forall a b. (a -> b) -> a -> b
$
[Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 StreamOperator
Merge [] String
"" String
"" Double
3, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 StreamOperator
Join [] String
"" String
"" Double
4]
generateNodeSink :: StreamGraph -> String
generateNodeSink :: StreamGraph -> String
generateNodeSink StreamGraph
sg = String
"main = " String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ExpQ -> String
showParam (ExpQ -> String) -> ExpQ -> String
forall a b. (a -> b) -> a -> b
$
if StreamGraph -> Bool
startsWithJoin StreamGraph
sg
then [| nodeSink2 streamGraphFn sink1 "9001" "9002" |]
else [| nodeSink (defaultSink "9001") streamGraphFn sink1 |])
generateCodeFromVertex :: (Int, StreamVertex) -> String
generateCodeFromVertex :: (Int, StreamVertex) -> String
generateCodeFromVertex (Int
opid, StreamVertex
v) = let
op :: StreamOperator
op = StreamVertex -> StreamOperator
operator StreamVertex
v
n :: String
n = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show Int
opid
n_1 :: String
n_1 = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show (Int
opid Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
in case StreamOperator
op of
StreamOperator
Merge -> String
nString -> ShowS
forall a. [a] -> [a] -> [a]
++String
" = "String -> ShowS
forall a. [a] -> [a] -> [a]
++String
n_1
StreamOperator
Join -> let n_2 :: String
n_2 = Char
'n' Char -> ShowS
forall a. a -> [a] -> [a]
: Int -> String
forall a. Show a => a -> String
show (Int
opid Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
2)
in [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
n, String
" = streamJoin ", String
n_2, String
" ", String
n_1]
StreamOperator
_ -> let params :: String
params = String -> [String] -> String
forall a. [a] -> [[a]] -> [a]
intercalate String
" " ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ (ExpQ -> String) -> [ExpQ] -> [String]
forall a b. (a -> b) -> [a] -> [b]
map (ShowS
parenShowS -> (ExpQ -> String) -> ExpQ -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
.ExpQ -> String
showParam) (StreamVertex -> [ExpQ]
parameters StreamVertex
v)
in [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String
n, String
" = (\\s -> ", StreamOperator -> String
printOp StreamOperator
op, String
" ", String
params, String
" s) ", String
n_1]
paren :: String -> String
paren :: ShowS
paren String
s = String
"("String -> ShowS
forall a. [a] -> [a] -> [a]
++String
sString -> ShowS
forall a. [a] -> [a] -> [a]
++String
")"
printOp :: StreamOperator -> String
printOp :: StreamOperator -> String
printOp (Filter Double
_) = String
"streamFilter"
printOp (FilterAcc Double
_) = String
"streamFilterAcc"
printOp StreamOperator
op = String
"stream" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (StreamOperator -> String
forall a. Show a => a -> String
show StreamOperator
op)
main :: IO ()
main = TestSuite -> IO ()
forall t. TestableHTF t => t -> IO ()
htfMain htf_thisModulesTests
s0 :: StreamGraph
s0 = StreamGraph -> StreamGraph -> StreamGraph
forall a. Graph a -> Graph a -> Graph a
connect (StreamVertex -> StreamGraph
forall a. a -> Graph a
Vertex (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 ((Double -> StreamOperator
Source Double
1)) [] String
"String" String
"String" Double
1))
(StreamVertex -> StreamGraph
forall a. a -> Graph a
Vertex (Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (StreamOperator
Sink) [] String
"String" String
"String" Double
2))
s1 :: StreamGraph
s1 = [StreamVertex] -> StreamGraph
forall a. [a] -> Graph a
path [ Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
0 ((Double -> StreamOperator
Source Double
1)) [] String
"String" String
"String" Double
3
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
1 (Double -> StreamOperator
Filter Double
0.5) [] String
"String" String
"String" Double
4
, Int
-> StreamOperator
-> [ExpQ]
-> String
-> String
-> Double
-> StreamVertex
StreamVertex Int
2 (StreamOperator
Sink) [] String
"String" String
"String" Double
5
]
test_reform_s0 :: IO ()
test_reform_s0 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s0 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s0 [[Int
0],[Int
1]])
test_reform_s1 :: IO ()
test_reform_s1 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s1 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s1 [[Int
0,Int
1],[Int
2]])
test_reform_s1_2 :: IO ()
test_reform_s1_2 = StreamGraph -> StreamGraph -> IO ()
forall a. (Eq a, Show a, HasCallStack) => a -> a -> IO ()
assertEqual StreamGraph
s1 (PartitionedGraph -> StreamGraph
unPartition (PartitionedGraph -> StreamGraph)
-> PartitionedGraph -> StreamGraph
forall a b. (a -> b) -> a -> b
$ StreamGraph -> PartitionMap -> PartitionedGraph
createPartitions StreamGraph
s1 [[Int
0],[Int
1,Int
2]])
genDockerfile listen opts =
let pkgs = packages opts in concat
[ "FROM ghcr.io/striot/striot:main\n"
, "WORKDIR /opt/node\n"
, "COPY . /opt/node\n"
, if pkgs /= [] then "RUN cabal install " ++ (intercalate " " pkgs) else ""
, "\n"
, "RUN ghc node.hs\n"
, if listen then "EXPOSE 9001\n" else ""
, "CMD /opt/node/node\n"
]
mergeEx = path
[ StreamVertex 0 (Source 1) [] "Int" "Int" 1
, StreamVertex 1 Merge [] "[Int]" "Int" 2
, StreamVertex 2 Map [[| show |]] "Int" "String" 3
, StreamVertex 3 Sink [[| mapM_ print |]] "String" "String" 4
]
writePart :: GenerateOpts -> (Int, String) -> IO ()
writePart opts (x,y) = let
dockerfile = genDockerfile True opts
bn = "node" ++ (show x)
fn = bn </> "node.hs"
in do
createDirectoryIfMissing True bn
writeFile (bn </> "Dockerfile") dockerfile
writeFile fn y
partitionGraph :: StreamGraph -> PartitionMap -> GenerateOpts -> IO ()
partitionGraph graph partitions opts = do
mapM_ (writePart opts) $ zip [1..] $ generateCode opts graph partitions