1 {-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-} 2 module Ingest.JSON 3 ( ingestJSON 4 ) where 5 6 import Control.Arrow (left) 7 import Control.Monad (join, when, unless, void, mfilter, forM_, (<=<)) 8 import Control.Monad.Except (ExceptT(..), runExceptT, mapExceptT, catchError, throwError) 9 import Control.Monad.IO.Class (MonadIO(liftIO)) 10 import Control.Monad.Trans.Class (lift) 11 import qualified Data.Aeson.BetterErrors as JE 12 import qualified Data.Attoparsec.ByteString as P 13 import qualified Data.ByteString as BS 14 import Data.ByteString.Lazy.Internal (defaultChunkSize) 15 import Data.Function (on) 16 import qualified Data.JsonSchema.Draft4 as JS 17 import Data.List (find) 18 import Data.Maybe (isJust, fromMaybe, isNothing) 19 import Data.Monoid ((<>)) 20 import Data.Time.Format (parseTimeM, defaultTimeLocale) 21 import qualified Data.Text as T 22 import qualified Data.Text.Encoding as TE 23 import qualified Database.PostgreSQL.Typed.Range as Range 24 import System.FilePath ((</>)) 25 import System.IO (withBinaryFile, IOMode(ReadMode)) 26 27 import Paths_databrary 28 import Ops 29 import Has (Has, view, focusIO, MonadHas) 30 import qualified JSON as J 31 import Files hiding ((</>)) 32 import Store.AV (AV) 33 import Store.Stage 34 import Store.Probe 35 import Store.Transcode 36 import Store.Types (MonadStorage) 37 import Model.Audit (MonadAudit) 38 import Model.Time 39 import Model.Kind 40 import Model.Id.Types 41 import Model.Volume 42 import Model.Container 43 import Model.Segment 44 import Model.Slot.Types 45 import Model.Release 46 import Model.Record 47 import Model.Category 48 import Model.Metric 49 import Model.Measure 50 import Model.RecordSlot 51 import Model.Asset 52 import Model.AssetSlot 53 import Model.AssetRevision 54 import Model.Transcode 55 import Model.Ingest 56 import Model.Party (SiteAuth) 57 -- import Action.Types 58 import Service.Log (MonadLog) 59 import Service.Types (Secret) 60 61 -- type IngestM a = JE.ParseT T.Text Handler a -- TODO: bring back as constraint alias 62 63 loadSchema :: ExceptT [T.Text] IO (J.Value -> [JS.Failure]) 64 loadSchema = do 65 schema <- lift $ getDataFileName "volume.json" 66 r <- lift $ withBinaryFile schema ReadMode (\h -> 67 P.parseWith (BS.hGetSome h defaultChunkSize) J.json' BS.empty) 68 js <- ExceptT . return . left (return . T.pack) $ eitherJSON =<< P.eitherResult r 69 ExceptT $ return $ left (map (T.pack . show)) $ JS.checkSchema (JS.SchemaCache js mempty) (JS.SchemaContext Nothing js) 70 where 71 eitherJSON = J.parseEither J.parseJSON 72 73 throwPE :: (Monad m) => T.Text -> JE.ParseT T.Text m a 74 throwPE = JE.throwCustomError 75 76 inObj :: forall a b m. (Kinded a, Has (Id a) a, Show (IdType a), Monad m) => a -> JE.ParseT T.Text m b -> JE.ParseT T.Text m b 77 inObj o = JE.mapError (<> (" for " <> kindOf o <> T.pack (' ' : show (view o :: Id a)))) 78 79 noKey :: (Monad m) => T.Text -> JE.ParseT T.Text m () 80 noKey k = void $ JE.keyMay k $ throwPE "unhandled value" 81 82 asKey :: (Monad m) => JE.ParseT T.Text m IngestKey 83 asKey = JE.asText 84 85 asDate :: (Monad m) => JE.ParseT T.Text m Date 86 asDate = JE.withString (maybe (Left "expecting %F") Right . parseTimeM True defaultTimeLocale "%F") 87 88 asRelease :: (Monad m) => JE.ParseT T.Text m (Maybe Release) 89 asRelease = JE.perhaps JE.fromAesonParser 90 91 asCategory :: (Monad m) => JE.ParseT T.Text m Category 92 asCategory = 93 JE.withIntegral (err . getCategory . Id) `catchError` \_ -> 94 JE.withText (\n -> err $ find ((n ==) . categoryName) allCategories) 95 where err = maybe (Left "category not found") Right 96 97 asSegment :: (Monad m) => JE.ParseT T.Text m Segment 98 asSegment = JE.fromAesonParser 99 100 data StageFile = StageFile 101 { stageFileRel :: !FilePath 102 , stageFileAbs :: !FilePath 103 } 104 105 asStageFile :: (MonadStorage c m) => FilePath -> JE.ParseT T.Text m StageFile 106 asStageFile b = do 107 r <- (b </>) <$> JE.asString 108 a <- fromMaybeM (throwPE "stage file not found") <=< lift $ focusIO $ \a -> do 109 rfp <- rawFilePath r 110 stageFileRaw <- stageFile rfp a 111 mapM unRawFilePath stageFileRaw 112 return $ StageFile r a 113 114 ingestJSON :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m) 115 => Volume -> J.Value -> Bool -> Bool -> m (Either [T.Text] [Container]) 116 ingestJSON vol jdata run overwrite = runExceptT $ do 117 schema <- mapExceptT liftIO loadSchema 118 let errs = schema jdata 119 unless (null errs) $ throwError $ map (T.pack . show) errs 120 if run 121 then ExceptT $ left (JE.displayError id) <$> JE.parseValueM volume jdata 122 else return [] 123 where 124 check :: (Eq a, Show a, Monad m) => a -> a -> JE.ParseT T.Text m (Maybe a) 125 check cur new 126 | cur == new = return Nothing 127 | not overwrite = throwPE $ "conflicting value: " <> T.pack (show new) <> " <> " <> T.pack (show cur) 128 | otherwise = return $ Just new 129 volume :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m) 130 => JE.ParseT T.Text m [Container] 131 volume = do 132 dir <- JE.keyOrDefault "directory" "" $ stageFileRel <$> asStageFile "" 133 _ <- JE.keyMay "name" $ do 134 name <- check (volumeName $ volumeRow vol) =<< JE.asText 135 forM_ name $ \n -> lift $ changeVolume vol{ volumeRow = (volumeRow vol){ volumeName = n } } 136 top <- lift (lookupVolumeTopContainer vol) 137 JE.key "containers" $ JE.eachInArray (container top dir) 138 container :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m) 139 => Container -> String -> JE.ParseT T.Text m Container 140 container topc dir = do 141 cid <- JE.keyMay "id" $ Id <$> JE.asIntegral 142 key <- JE.key "key" asKey 143 c' <- lift (lookupIngestContainer vol key) 144 c <- maybe 145 (do 146 c <- maybe 147 (do 148 top <- JE.keyOrDefault "top" False JE.asBool 149 name <- JE.keyMay "name" JE.asText 150 date <- JE.keyMay "date" asDate 151 let c = blankContainer vol 152 lift $ addContainer c 153 { containerRow = (containerRow c) 154 { containerTop = top 155 , containerName = name 156 , containerDate = date 157 } 158 }) 159 (\i -> fromMaybeM (throwPE $ "container " <> T.pack (show i) <> "/" <> key <> " not found") 160 =<< lift (lookupVolumeContainer vol i)) 161 cid 162 inObj c $ lift $ addIngestContainer c key 163 return c) 164 (\c -> inObj c $ do 165 unless (all (containerId (containerRow c) ==) cid) $ 166 throwPE "id mismatch" 167 top <- fmap join . JE.keyMay "top" $ check (containerTop $ containerRow c) =<< JE.asBool 168 name <- fmap join . JE.keyMay "name" $ check (containerName $ containerRow c) =<< JE.perhaps JE.asText 169 date <- fmap join . JE.keyMay "date" $ check (containerDate $ containerRow c) =<< JE.perhaps asDate 170 when (isJust top || isJust name || isJust date) $ lift $ changeContainer c 171 { containerRow = (containerRow c) 172 { containerTop = fromMaybe (containerTop $ containerRow c) top 173 , containerName = fromMaybe (containerName $ containerRow c) name 174 , containerDate = fromMaybe (containerDate $ containerRow c) date 175 } 176 } 177 return c) 178 c' 179 let s = containerSlot c 180 inObj c $ do 181 _ <- JE.keyMay "release" $ do 182 release <- maybe (return . fmap Just) (check . containerRelease) c' =<< asRelease 183 forM_ release $ \r -> do 184 o <- lift $ changeRelease s r 185 unless o $ throwPE "update failed" 186 _ <- JE.key "records" $ JE.eachInArray $ do 187 r <- record 188 inObj r $ do 189 rs' <- lift $ lookupRecordSlotRecords r s 190 segm <- (if null rs' then return . Just else check (map (slotSegment . recordSlot) rs')) =<< JE.keyOrDefault "positions" [fullSegment] (JE.eachInArray asSegment) 191 forM_ segm $ \segs -> do 192 let rs = RecordSlot r . Slot c 193 unless (null rs') $ do 194 o <- lift $ moveRecordSlot (rs fullSegment) emptySegment 195 unless o $ throwPE "record clear failed" 196 o <- lift $ mapM (moveRecordSlot (rs emptySegment)) segs 197 unless (and o) $ throwPE "record link failed" 198 _ <- JE.key "assets" $ JE.eachInArray $ do 199 (a, probe) <- asset dir 200 inObj a $ do 201 as' <- lift $ mfilter (((/=) `on` containerId . containerRow) topc . slotContainer) . assetSlot <$> lookupAssetAssetSlot a 202 seg <- JE.keyOrDefault "position" (maybe fullSegment slotSegment as') $ 203 JE.withTextM (\t -> if t == "auto" 204 then maybe (Right . Segment . Range.point <$> probeAutoPosition c probe) (return . Right . slotSegment) $ mfilter (((==) `on` containerId . containerRow) c . slotContainer) as' 205 else return $ Left "invalid asset position") 206 `catchError` \_ -> asSegment 207 let seg' 208 | Just p <- Range.getPoint (segmentRange seg) 209 , Just d <- assetDuration (assetRow a) = Segment $ Range.bounded p (p + d) 210 | otherwise = seg 211 ss = Slot c seg' 212 u <- maybe (return True) (\s' -> isJust <$> on check slotId s' ss) as' 213 when u $ do 214 o <- lift $ changeAssetSlot $ AssetSlot a $ Just ss 215 unless o $ throwPE "asset link failed" 216 return c 217 record :: (MonadAudit c m) => JE.ParseT T.Text m Record 218 record = do 219 -- handle record shell 220 (rid :: Maybe (Id Record)) <- JE.keyMay "id" $ Id <$> JE.asIntegral -- insert = nothing, update = just id 221 (key :: IngestKey) <- JE.key "key" asKey 222 (mIngestRecord :: Maybe Record) <- lift (lookupIngestRecord vol key) 223 (r :: Record) <- maybe 224 -- first run of any ingest for this record. could be updating or insert, but need an ingest entry 225 (do 226 (r :: Record) <- maybe 227 (do -- if no existing record, then add a new record 228 (category :: Category) <- JE.key "category" asCategory 229 lift $ addRecord $ blankRecord category vol) 230 (\i -> do -- else find the existing record by vol + record id 231 (mRecord :: Maybe Record) <- lift (lookupVolumeRecord vol i) 232 fromMaybeM (throwPE $ "record " <> T.pack (show i) <> "/" <> key <> " not found") mRecord) 233 rid 234 inObj r $ lift $ addIngestRecord r key -- log that a record was ingested, assoc key with the record 235 return r) 236 -- there has been a prior ingest using the same key for this record 237 (\priorIngestRecord -> inObj priorIngestRecord $ do 238 unless (all (recordId (recordRow priorIngestRecord) ==) rid) $ -- all here refers to either value in maybe or nothing 239 throwPE "id mismatch" 240 _ <- JE.key "category" $ do 241 (category :: Category) <- asCategory 242 (category' :: Maybe Category) <- 243 (category <$) -- check whether category name is different from the category on the existing record 244 <$> on check categoryName (recordCategory $ recordRow priorIngestRecord) category 245 -- update record category for a prior ingest, if category changed 246 forM_ category' 247 $ \c -> 248 lift 249 $ changeRecord priorIngestRecord 250 { recordRow = (recordRow priorIngestRecord) { recordCategory = c } } 251 return priorIngestRecord) 252 mIngestRecord 253 -- handle structure (metrics) + field values (measures) for record 254 _ <- inObj r $ JE.forEachInObject $ \mn -> 255 unless (mn `elem` ["id", "key", "category", "positions"]) $ do -- for all non special keys, treat as data 256 (metric :: Metric) <- do 257 let mMetric = find (\m -> mn == metricName m && recordCategory (recordRow r) == metricCategory m) allMetrics 258 fromMaybeM (throwPE $ "metric " <> mn <> " not found") mMetric 259 (datum :: Maybe BS.ByteString) <- do 260 (newMeasureVal :: T.Text) <- JE.asText 261 let newMeasureValBS :: BS.ByteString 262 newMeasureValBS = TE.encodeUtf8 newMeasureVal 263 maybe 264 (return (Just newMeasureValBS)) -- always update 265 (\existingMeasure -> check (measureDatum existingMeasure) newMeasureValBS) -- only update if changed and allowed 266 (getMeasure metric (recordMeasures r)) -- look for existing measure for this metric on the record 267 forM_ datum 268 $ \measureDatumVal -> (lift . changeRecordMeasure) (Measure r metric measureDatumVal) -- save measure data 269 -- return record 270 return r 271 asset :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m) 272 => String -> JE.ParseT T.Text m (Asset, Maybe Probe) 273 asset dir = do 274 sa <- fromMaybeM 275 (JE.key "file" $ do 276 file <- asStageFile dir 277 stageFileRelRaw <- lift $ liftIO $ rawFilePath $ stageFileRel file 278 stageFileRelAbs <- lift $ liftIO $ rawFilePath $ stageFileAbs file 279 (,) . Just . (,) file 280 <$> (either throwPE return 281 =<< lift (probeFile stageFileRelRaw stageFileRelAbs)) 282 <*> lift (lookupIngestAsset vol $ stageFileRel file)) 283 =<< JE.keyMay "id" ( 284 maybe (throwPE "asset not found") (return . (,) Nothing . Just) =<< lift . lookupVolumeAsset vol . Id =<< JE.asIntegral) 285 when (isNothing $ fst sa) $ noKey "file" 286 orig <- JE.keyMay "replace" $ 287 let err = fmap $ maybe (Left "asset not found") Right in 288 JE.withIntegralM (err . lookupVolumeAsset vol . Id) `catchError` \_ -> 289 JE.withStringM (err . lookupIngestAsset vol) 290 a <- case sa of 291 (_, Just a) -> inObj a $ do 292 unless (assetBacked a) $ throwPE "ingested asset incomplete" 293 -- compareFiles file =<< getAssetFile -- assume correct 294 release <- fmap join . JE.keyMay "release" $ check (assetRelease $ assetRow a) =<< asRelease 295 name <- fmap join . JE.keyMay "name" $ check (assetName $ assetRow a) =<< JE.perhaps JE.asText 296 a' <- if isJust release || isJust name 297 then lift $ changeAsset a 298 { assetRow = (assetRow a) 299 { assetRelease = fromMaybe (assetRelease $ assetRow a) release 300 , assetName = fromMaybe (assetName $ assetRow a) name 301 } 302 } Nothing 303 else return a 304 forM_ orig $ \o -> lift $ replaceSlotAsset o a' 305 return a' 306 (~(Just (file, probe)), Nothing) -> do 307 release <- JE.key "release" asRelease 308 name <- JE.keyMay "name" JE.asText 309 stageFileAbsRaw <- lift $ liftIO $ rawFilePath $ stageFileAbs file 310 let ba = blankAsset vol 311 a <- lift $ addAsset ba 312 { assetRow = (assetRow ba) 313 { assetFormat = probeFormat probe 314 , assetRelease = release 315 , assetName = name 316 } 317 } (Just stageFileAbsRaw) 318 lift $ addIngestAsset a (stageFileRel file) 319 forM_ orig $ \o -> lift $ replaceAsset o a -- FIXME 320 return a 321 inObj a $ case sa of 322 (Just (_, probe@ProbeAV{}), ae) -> do 323 clip <- JE.keyOrDefault "clip" fullSegment asSegment 324 opts <- JE.keyOrDefault "options" defaultTranscodeOptions $ JE.eachInArray JE.asString 325 t <- lift $ fromMaybeM 326 (do 327 t <- addTranscode a clip opts probe 328 _ <- startTranscode t 329 return t) 330 =<< flatMapM (\_ -> findTranscode a clip opts) ae 331 return (transcodeAsset t, Just probe) 332 _ -> do 333 noKey "clip" 334 noKey "options" 335 return (a, Nothing)