1 {-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-} 2 module Databrary.Ingest.JSON 3 ( ingestJSON 4 ) where 5 6 import Control.Arrow (left) 7 import Control.Monad (join, when, unless, void, mfilter, forM_, (<=<)) 8 import Control.Monad.Except (ExceptT(..), runExceptT, mapExceptT, catchError, throwError) 9 import Control.Monad.IO.Class (MonadIO(liftIO)) 10 import Control.Monad.Trans.Class (lift) 11 import qualified Data.Aeson.BetterErrors as JE 12 import qualified Data.Attoparsec.ByteString as P 13 import qualified Data.ByteString as BS 14 import Data.ByteString.Lazy.Internal (defaultChunkSize) 15 import Data.Function (on) 16 import qualified Data.JsonSchema.Draft4 as JS 17 import Data.List (find) 18 import Data.Maybe (isJust, fromMaybe, isNothing) 19 import Data.Monoid ((<>)) 20 import Data.Time.Format (parseTimeM, defaultTimeLocale) 21 import qualified Data.Text as T 22 import qualified Data.Text.Encoding as TE 23 import qualified Database.PostgreSQL.Typed.Range as Range 24 import System.FilePath ((</>)) 25 import System.IO (withBinaryFile, IOMode(ReadMode)) 26 27 import Paths_databrary 28 import Databrary.Ops 29 import Databrary.Has (Has, view, focusIO) 30 import qualified Databrary.JSON as J 31 import Databrary.Files hiding ((</>)) 32 import Databrary.Store.Stage 33 import Databrary.Store.Probe 34 import Databrary.Store.Transcode 35 import Databrary.Model.Time 36 import Databrary.Model.Kind 37 import Databrary.Model.Id.Types 38 import Databrary.Model.Volume 39 import Databrary.Model.Container 40 import Databrary.Model.Segment 41 import Databrary.Model.Slot.Types 42 import Databrary.Model.Release 43 import Databrary.Model.Record 44 import Databrary.Model.Category 45 import Databrary.Model.Metric 46 import Databrary.Model.Measure 47 import Databrary.Model.RecordSlot 48 import Databrary.Model.Asset 49 import Databrary.Model.AssetSlot 50 import Databrary.Model.AssetRevision 51 import Databrary.Model.Transcode 52 import Databrary.Model.Ingest 53 import Databrary.Action.Types 54 55 type IngestM a = JE.ParseT T.Text Handler a 56 57 loadSchema :: ExceptT [T.Text] IO (J.Value -> [JS.Failure]) 58 loadSchema = do 59 schema <- lift $ getDataFileName "volume.json" 60 r <- lift $ withBinaryFile schema ReadMode (\h -> 61 P.parseWith (BS.hGetSome h defaultChunkSize) J.json' BS.empty) 62 js <- ExceptT . return . left (return . T.pack) $ eitherJSON =<< P.eitherResult r 63 ExceptT $ return $ left (map (T.pack . show)) $ JS.checkSchema (JS.SchemaCache js mempty) (JS.SchemaContext Nothing js) 64 where 65 eitherJSON = J.parseEither J.parseJSON 66 67 throwPE :: T.Text -> IngestM a 68 throwPE = JE.throwCustomError 69 70 inObj :: forall a b . (Kinded a, Has (Id a) a, Show (IdType a)) => a -> IngestM b -> IngestM b 71 inObj o = JE.mapError (<> (" for " <> kindOf o <> T.pack (' ' : show (view o :: Id a)))) 72 73 noKey :: T.Text -> IngestM () 74 noKey k = void $ JE.keyMay k $ throwPE "unhandled value" 75 76 asKey :: IngestM IngestKey 77 asKey = JE.asText 78 79 asDate :: IngestM Date 80 asDate = JE.withString (maybe (Left "expecting %F") Right . parseTimeM True defaultTimeLocale "%F") 81 82 asRelease :: IngestM (Maybe Release) 83 asRelease = JE.perhaps JE.fromAesonParser 84 85 asCategory :: IngestM Category 86 asCategory = 87 JE.withIntegral (err . getCategory . Id) `catchError` \_ -> do 88 JE.withText (\n -> err $ find ((n ==) . categoryName) allCategories) 89 where err = maybe (Left "category not found") Right 90 91 asSegment :: IngestM Segment 92 asSegment = JE.fromAesonParser 93 94 data StageFile = StageFile 95 { stageFileRel :: !FilePath 96 , stageFileAbs :: !FilePath 97 } 98 99 asStageFile :: FilePath -> IngestM StageFile 100 asStageFile b = do 101 r <- (b </>) <$> JE.asString 102 a <- fromMaybeM (throwPE "stage file not found") <=< lift $ focusIO $ \a -> do 103 rfp <- rawFilePath r 104 stageFileRaw <- stageFile rfp a 105 mapM unRawFilePath stageFileRaw 106 return $ StageFile r a 107 108 ingestJSON :: Volume -> J.Value -> Bool -> Bool -> Handler (Either [T.Text] [Container]) 109 ingestJSON vol jdata run overwrite = runExceptT $ do 110 schema <- mapExceptT liftIO loadSchema 111 let errs = schema jdata 112 unless (null errs) $ throwError $ map (T.pack . show) errs 113 if run 114 then ExceptT $ left (JE.displayError id) <$> JE.parseValueM volume jdata 115 else return [] 116 where 117 check :: (Eq a, Show a) => a -> a -> IngestM (Maybe a) 118 check cur new 119 | cur == new = return Nothing 120 | not overwrite = throwPE $ "conflicting value: " <> T.pack (show new) <> " <> " <> T.pack (show cur) 121 | otherwise = return $ Just new 122 volume :: IngestM [Container] 123 volume = do 124 dir <- JE.keyOrDefault "directory" "" $ stageFileRel <$> asStageFile "" 125 _ <- JE.keyMay "name" $ do 126 name <- check (volumeName $ volumeRow vol) =<< JE.asText 127 forM_ name $ \n -> lift $ changeVolume vol{ volumeRow = (volumeRow vol){ volumeName = n } } 128 top <- lift (lookupVolumeTopContainer vol) 129 JE.key "containers" $ JE.eachInArray (container top dir) 130 container :: Container -> String -> IngestM Container 131 container topc dir = do 132 cid <- JE.keyMay "id" $ Id <$> JE.asIntegral 133 key <- JE.key "key" $ asKey 134 c' <- lift (lookupIngestContainer vol key) 135 c <- maybe 136 (do 137 c <- maybe 138 (do 139 top <- JE.keyOrDefault "top" False JE.asBool 140 name <- JE.keyMay "name" JE.asText 141 date <- JE.keyMay "date" asDate 142 let c = blankContainer vol 143 lift $ addContainer c 144 { containerRow = (containerRow c) 145 { containerTop = top 146 , containerName = name 147 , containerDate = date 148 } 149 }) 150 (\i -> fromMaybeM (throwPE $ "container " <> T.pack (show i) <> "/" <> key <> " not found") 151 =<< lift (lookupVolumeContainer vol i)) 152 cid 153 inObj c $ lift $ addIngestContainer c key 154 return c) 155 (\c -> inObj c $ do 156 unless (all (containerId (containerRow c) ==) cid) $ do 157 throwPE "id mismatch" 158 top <- fmap join . JE.keyMay "top" $ check (containerTop $ containerRow c) =<< JE.asBool 159 name <- fmap join . JE.keyMay "name" $ check (containerName $ containerRow c) =<< JE.perhaps JE.asText 160 date <- fmap join . JE.keyMay "date" $ check (containerDate $ containerRow c) =<< JE.perhaps asDate 161 when (isJust top || isJust name || isJust date) $ lift $ changeContainer c 162 { containerRow = (containerRow c) 163 { containerTop = fromMaybe (containerTop $ containerRow c) top 164 , containerName = fromMaybe (containerName $ containerRow c) name 165 , containerDate = fromMaybe (containerDate $ containerRow c) date 166 } 167 } 168 return c) 169 c' 170 let s = containerSlot c 171 inObj c $ do 172 _ <- JE.keyMay "release" $ do 173 release <- maybe (return . fmap Just) (check . containerRelease) c' =<< asRelease 174 forM_ release $ \r -> do 175 o <- lift $ changeRelease s r 176 unless o $ throwPE "update failed" 177 _ <- JE.key "records" $ JE.eachInArray $ do 178 r <- record 179 inObj r $ do 180 rs' <- lift $ lookupRecordSlotRecords r s 181 segm <- (if null rs' then return . Just else check (map (slotSegment . recordSlot) rs')) =<< JE.keyOrDefault "positions" [fullSegment] (JE.eachInArray asSegment) 182 forM_ segm $ \segs -> do 183 let rs = RecordSlot r . Slot c 184 unless (null rs') $ do 185 o <- lift $ moveRecordSlot (rs fullSegment) emptySegment 186 unless o $ throwPE "record clear failed" 187 o <- lift $ mapM (moveRecordSlot (rs emptySegment)) segs 188 unless (and o) $ throwPE "record link failed" 189 _ <- JE.key "assets" $ JE.eachInArray $ do 190 (a, probe) <- asset dir 191 inObj a $ do 192 as' <- lift $ mfilter (((/=) `on` containerId . containerRow) topc . slotContainer) . assetSlot <$> lookupAssetAssetSlot a 193 seg <- JE.keyOrDefault "position" (maybe fullSegment slotSegment as') $ 194 JE.withTextM (\t -> if t == "auto" 195 then maybe (Right . Segment . Range.point <$> probeAutoPosition c probe) (return . Right . slotSegment) $ mfilter (((==) `on` containerId . containerRow) c . slotContainer) as' 196 else return $ Left "invalid asset position") 197 `catchError` \_ -> asSegment 198 let seg' 199 | Just p <- Range.getPoint (segmentRange seg) 200 , Just d <- assetDuration (assetRow a) = Segment $ Range.bounded p (p + d) 201 | otherwise = seg 202 ss = Slot c seg' 203 u <- maybe (return True) (\s' -> isJust <$> on check slotId s' ss) as' 204 when u $ do 205 o <- lift $ changeAssetSlot $ AssetSlot a $ Just ss 206 unless o $ throwPE "asset link failed" 207 return c 208 record :: IngestM Record 209 record = do 210 -- handle record shell 211 (rid :: Maybe (Id Record)) <- JE.keyMay "id" $ Id <$> JE.asIntegral -- insert = nothing, update = just id 212 (key :: IngestKey) <- JE.key "key" $ asKey 213 (mIngestRecord :: Maybe Record) <- lift (lookupIngestRecord vol key) 214 (r :: Record) <- maybe 215 -- first run of any ingest for this record. could be updating or insert, but need an ingest entry 216 (do 217 (r :: Record) <- maybe 218 (do -- if no existing record, then add a new record 219 (category :: Category) <- JE.key "category" asCategory 220 lift $ addRecord $ blankRecord category vol) 221 (\i -> do -- else find the existing record by vol + record id 222 (mRecord :: Maybe Record) <- lift (lookupVolumeRecord vol i) 223 fromMaybeM (throwPE $ "record " <> T.pack (show i) <> "/" <> key <> " not found") mRecord) 224 rid 225 inObj r $ lift $ addIngestRecord r key -- log that a record was ingested, assoc key with the record 226 return r) 227 -- there has been a prior ingest using the same key for this record 228 (\priorIngestRecord -> inObj priorIngestRecord $ do 229 unless (all (recordId (recordRow priorIngestRecord) ==) rid) $ do -- all here refers to either value in maybe or nothing 230 throwPE "id mismatch" 231 _ <- JE.key "category" $ do 232 (category :: Category) <- asCategory 233 (category' :: Maybe Category) <- 234 (category <$) -- check whether category name is different from the category on the existing record 235 <$> on check categoryName (recordCategory $ recordRow priorIngestRecord) category 236 -- update record category for a prior ingest, if category changed 237 forM_ category' 238 $ \c -> 239 lift 240 $ changeRecord priorIngestRecord 241 { recordRow = (recordRow priorIngestRecord) { recordCategory = c } } 242 return priorIngestRecord) 243 mIngestRecord 244 -- handle structure (metrics) + field values (measures) for record 245 _ <- inObj r $ JE.forEachInObject $ \mn -> 246 unless (mn `elem` ["id", "key", "category", "positions"]) $ do -- for all non special keys, treat as data 247 (metric :: Metric) <- do 248 let mMetric = find (\m -> mn == metricName m && recordCategory (recordRow r) == metricCategory m) allMetrics 249 fromMaybeM (throwPE $ "metric " <> mn <> " not found") mMetric 250 (datum :: Maybe BS.ByteString) <- do 251 (newMeasureVal :: T.Text) <- JE.asText 252 let newMeasureValBS :: BS.ByteString 253 newMeasureValBS = TE.encodeUtf8 newMeasureVal 254 maybe 255 (return (Just newMeasureValBS)) -- always update 256 (\existingMeasure -> check (measureDatum existingMeasure) newMeasureValBS) -- only update if changed and allowed 257 (getMeasure metric (recordMeasures r)) -- look for existing measure for this metric on the record 258 forM_ datum 259 $ \measureDatumVal -> (lift . changeRecordMeasure) (Measure r metric measureDatumVal) -- save measure data 260 -- return record 261 return r 262 asset :: String -> IngestM (Asset, Maybe Probe) 263 asset dir = do 264 sa <- fromMaybeM 265 (JE.key "file" $ do 266 file <- asStageFile dir 267 stageFileRelRaw <- lift $ liftIO $ rawFilePath $ stageFileRel file 268 stageFileRelAbs <- lift $ liftIO $ rawFilePath $ stageFileAbs file 269 (,) . Just . (,) file 270 <$> (either throwPE return 271 =<< lift (probeFile stageFileRelRaw stageFileRelAbs)) 272 <*> lift (lookupIngestAsset vol $ stageFileRel file)) 273 =<< (JE.keyMay "id" $ do 274 maybe (throwPE "asset not found") (return . (,) Nothing . Just) =<< lift . lookupVolumeAsset vol . Id =<< JE.asIntegral) 275 when (isNothing $ fst sa) $ noKey "file" 276 orig <- JE.keyMay "replace" $ 277 let err = fmap $ maybe (Left "asset not found") Right in 278 JE.withIntegralM (err . lookupVolumeAsset vol . Id) `catchError` \_ -> 279 JE.withStringM (err . lookupIngestAsset vol) 280 a <- case sa of 281 (_, Just a) -> inObj a $ do 282 unless (assetBacked a) $ throwPE "ingested asset incomplete" 283 -- compareFiles file =<< getAssetFile -- assume correct 284 release <- fmap join . JE.keyMay "release" $ check (assetRelease $ assetRow a) =<< asRelease 285 name <- fmap join . JE.keyMay "name" $ check (assetName $ assetRow a) =<< JE.perhaps JE.asText 286 a' <- if isJust release || isJust name 287 then lift $ changeAsset a 288 { assetRow = (assetRow a) 289 { assetRelease = fromMaybe (assetRelease $ assetRow a) release 290 , assetName = fromMaybe (assetName $ assetRow a) name 291 } 292 } Nothing 293 else return a 294 forM_ orig $ \o -> lift $ replaceSlotAsset o a' 295 return a' 296 (~(Just (file, probe)), Nothing) -> do 297 release <- JE.key "release" asRelease 298 name <- JE.keyMay "name" JE.asText 299 stageFileAbsRaw <- lift $ liftIO $ rawFilePath $ stageFileAbs file 300 let ba = blankAsset vol 301 a <- lift $ addAsset ba 302 { assetRow = (assetRow ba) 303 { assetFormat = probeFormat probe 304 , assetRelease = release 305 , assetName = name 306 } 307 } (Just stageFileAbsRaw) 308 lift $ addIngestAsset a (stageFileRel file) 309 forM_ orig $ \o -> lift $ replaceAsset o a -- FIXME 310 return a 311 inObj a $ case sa of 312 (Just (_, probe@ProbeAV{}), ae) -> do 313 clip <- JE.keyOrDefault "clip" fullSegment asSegment 314 opts <- JE.keyOrDefault "options" defaultTranscodeOptions $ JE.eachInArray JE.asString 315 t <- lift $ fromMaybeM 316 (do 317 t <- addTranscode a clip opts probe 318 _ <- startTranscode t 319 return t) 320 =<< flatMapM (\_ -> findTranscode a clip opts) ae 321 return (transcodeAsset t, Just probe) 322 _ -> do 323 noKey "clip" 324 noKey "options" 325 return (a, Nothing)