1 {-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-}
    2 module Databrary.Ingest.JSON
    3   ( ingestJSON
    4   ) where
    5 
    6 import Control.Arrow (left)
    7 import Control.Monad (join, when, unless, void, mfilter, forM_, (<=<))
    8 import Control.Monad.Except (ExceptT(..), runExceptT, mapExceptT, catchError, throwError)
    9 import Control.Monad.IO.Class (MonadIO(liftIO))
   10 import Control.Monad.Trans.Class (lift)
   11 import qualified Data.Aeson.BetterErrors as JE
   12 import qualified Data.Attoparsec.ByteString as P
   13 import qualified Data.ByteString as BS
   14 import Data.ByteString.Lazy.Internal (defaultChunkSize)
   15 import Data.Function (on)
   16 import qualified Data.JsonSchema.Draft4 as JS
   17 import Data.List (find)
   18 import Data.Maybe (isJust, fromMaybe, isNothing)
   19 import Data.Monoid ((<>))
   20 import Data.Time.Format (parseTimeM, defaultTimeLocale)
   21 import qualified Data.Text as T
   22 import qualified Data.Text.Encoding as TE
   23 import qualified Database.PostgreSQL.Typed.Range as Range
   24 import System.FilePath ((</>))
   25 import System.IO (withBinaryFile, IOMode(ReadMode))
   26 
   27 import Paths_databrary
   28 import Databrary.Ops
   29 import Databrary.Has (Has, view, focusIO)
   30 import qualified Databrary.JSON as J
   31 import Databrary.Files hiding ((</>))
   32 import Databrary.Store.Stage
   33 import Databrary.Store.Probe
   34 import Databrary.Store.Transcode
   35 import Databrary.Model.Time
   36 import Databrary.Model.Kind
   37 import Databrary.Model.Id.Types
   38 import Databrary.Model.Volume
   39 import Databrary.Model.Container
   40 import Databrary.Model.Segment
   41 import Databrary.Model.Slot.Types
   42 import Databrary.Model.Release
   43 import Databrary.Model.Record
   44 import Databrary.Model.Category
   45 import Databrary.Model.Metric
   46 import Databrary.Model.Measure
   47 import Databrary.Model.RecordSlot
   48 import Databrary.Model.Asset
   49 import Databrary.Model.AssetSlot
   50 import Databrary.Model.AssetRevision
   51 import Databrary.Model.Transcode
   52 import Databrary.Model.Ingest
   53 import Databrary.Action.Types
   54 
   55 type IngestM a = JE.ParseT T.Text Handler a
   56 
   57 loadSchema :: ExceptT [T.Text] IO (J.Value -> [JS.Failure])
   58 loadSchema = do
   59   schema <- lift $ getDataFileName "volume.json"
   60   r <- lift $ withBinaryFile schema ReadMode (\h ->
   61     P.parseWith (BS.hGetSome h defaultChunkSize) J.json' BS.empty)
   62   js <- ExceptT . return . left (return . T.pack) $ eitherJSON =<< P.eitherResult r
   63   ExceptT $ return $ left (map (T.pack . show)) $ JS.checkSchema (JS.SchemaCache js mempty) (JS.SchemaContext Nothing js)
   64   where
   65     eitherJSON = J.parseEither J.parseJSON
   66 
   67 throwPE :: T.Text -> IngestM a
   68 throwPE = JE.throwCustomError
   69 
   70 inObj :: forall a b . (Kinded a, Has (Id a) a, Show (IdType a)) => a -> IngestM b -> IngestM b
   71 inObj o = JE.mapError (<> (" for " <> kindOf o <> T.pack (' ' : show (view o :: Id a))))
   72 
   73 noKey :: T.Text -> IngestM ()
   74 noKey k = void $ JE.keyMay k $ throwPE "unhandled value"
   75 
   76 asKey :: IngestM IngestKey
   77 asKey = JE.asText
   78 
   79 asDate :: IngestM Date
   80 asDate = JE.withString (maybe (Left "expecting %F") Right . parseTimeM True defaultTimeLocale "%F")
   81 
   82 asRelease :: IngestM (Maybe Release)
   83 asRelease = JE.perhaps JE.fromAesonParser
   84 
   85 asCategory :: IngestM Category
   86 asCategory =
   87   JE.withIntegral (err . getCategory . Id) `catchError` \_ -> do
   88     JE.withText (\n -> err $ find ((n ==) . categoryName) allCategories)
   89   where err = maybe (Left "category not found") Right
   90 
   91 asSegment :: IngestM Segment
   92 asSegment = JE.fromAesonParser
   93 
   94 data StageFile = StageFile
   95   { stageFileRel :: !FilePath
   96   , stageFileAbs :: !FilePath
   97   }
   98 
   99 asStageFile :: FilePath -> IngestM StageFile
  100 asStageFile b = do
  101   r <- (b </>) <$> JE.asString
  102   a <- fromMaybeM (throwPE "stage file not found") <=< lift $ focusIO $ \a -> do
  103     rfp <- rawFilePath r
  104     stageFileRaw <- stageFile rfp a
  105     mapM unRawFilePath stageFileRaw
  106   return $ StageFile r a
  107 
  108 ingestJSON :: Volume -> J.Value -> Bool -> Bool -> Handler (Either [T.Text] [Container])
  109 ingestJSON vol jdata run overwrite = runExceptT $ do
  110   schema <- mapExceptT liftIO loadSchema
  111   let errs = schema jdata
  112   unless (null errs) $ throwError $ map (T.pack . show) errs
  113   if run
  114   then ExceptT $ left (JE.displayError id) <$> JE.parseValueM volume jdata
  115   else return []
  116     where
  117   check :: (Eq a, Show a) => a -> a -> IngestM (Maybe a)
  118   check cur new
  119     | cur == new = return Nothing
  120     | not overwrite = throwPE $ "conflicting value: " <> T.pack (show new) <> " <> " <> T.pack (show cur)
  121     | otherwise = return $ Just new
  122   volume :: IngestM [Container]
  123   volume = do
  124     dir <- JE.keyOrDefault "directory" "" $ stageFileRel <$> asStageFile ""
  125     _ <- JE.keyMay "name" $ do
  126       name <- check (volumeName $ volumeRow vol) =<< JE.asText
  127       forM_ name $ \n -> lift $ changeVolume vol{ volumeRow = (volumeRow vol){ volumeName = n } }
  128     top <- lift (lookupVolumeTopContainer vol)
  129     JE.key "containers" $ JE.eachInArray (container top dir)
  130   container :: Container -> String -> IngestM Container
  131   container topc dir = do
  132     cid <- JE.keyMay "id" $ Id <$> JE.asIntegral
  133     key <- JE.key "key" $ asKey
  134     c' <- lift (lookupIngestContainer vol key)
  135     c <- maybe
  136       (do
  137         c <- maybe
  138           (do
  139             top <- JE.keyOrDefault "top" False JE.asBool
  140             name <- JE.keyMay "name" JE.asText
  141             date <- JE.keyMay "date" asDate
  142             let c = blankContainer vol
  143             lift $ addContainer c
  144               { containerRow = (containerRow c)
  145                 { containerTop = top
  146                 , containerName = name
  147                 , containerDate = date
  148                 }
  149               })
  150           (\i -> fromMaybeM (throwPE $ "container " <> T.pack (show i) <> "/" <> key <> " not found")
  151             =<< lift (lookupVolumeContainer vol i))
  152           cid
  153         inObj c $ lift $ addIngestContainer c key
  154         return c)
  155       (\c -> inObj c $ do
  156         unless (all (containerId (containerRow c) ==) cid) $ do
  157           throwPE "id mismatch"
  158         top <- fmap join . JE.keyMay "top" $ check (containerTop $ containerRow c) =<< JE.asBool
  159         name <- fmap join . JE.keyMay "name" $ check (containerName $ containerRow c) =<< JE.perhaps JE.asText
  160         date <- fmap join . JE.keyMay "date" $ check (containerDate $ containerRow c) =<< JE.perhaps asDate
  161         when (isJust top || isJust name || isJust date) $ lift $ changeContainer c
  162           { containerRow = (containerRow c)
  163             { containerTop = fromMaybe (containerTop $ containerRow c) top
  164             , containerName = fromMaybe (containerName $ containerRow c) name
  165             , containerDate = fromMaybe (containerDate $ containerRow c) date
  166             }
  167           }
  168         return c)
  169       c'
  170     let s = containerSlot c
  171     inObj c $ do
  172       _ <- JE.keyMay "release" $ do
  173         release <- maybe (return . fmap Just) (check . containerRelease) c' =<< asRelease
  174         forM_ release $ \r -> do
  175           o <- lift $ changeRelease s r
  176           unless o $ throwPE "update failed"
  177       _ <- JE.key "records" $ JE.eachInArray $ do
  178         r <- record
  179         inObj r $ do
  180           rs' <- lift $ lookupRecordSlotRecords r s
  181           segm <- (if null rs' then return . Just else check (map (slotSegment . recordSlot) rs')) =<< JE.keyOrDefault "positions" [fullSegment] (JE.eachInArray asSegment)
  182           forM_ segm $ \segs -> do
  183             let rs = RecordSlot r . Slot c
  184             unless (null rs') $ do
  185               o <- lift $ moveRecordSlot (rs fullSegment) emptySegment
  186               unless o $ throwPE "record clear failed"
  187             o <- lift $ mapM (moveRecordSlot (rs emptySegment)) segs
  188             unless (and o) $ throwPE "record link failed"
  189       _ <- JE.key "assets" $ JE.eachInArray $ do
  190         (a, probe) <- asset dir
  191         inObj a $ do
  192           as' <- lift $ mfilter (((/=) `on` containerId . containerRow) topc . slotContainer) . assetSlot <$> lookupAssetAssetSlot a
  193           seg <- JE.keyOrDefault "position" (maybe fullSegment slotSegment as') $
  194             JE.withTextM (\t -> if t == "auto"
  195               then maybe (Right . Segment . Range.point <$> probeAutoPosition c probe) (return . Right . slotSegment) $ mfilter (((==) `on` containerId . containerRow) c . slotContainer) as'
  196               else return $ Left "invalid asset position")
  197               `catchError` \_ -> asSegment
  198           let seg'
  199                 | Just p <- Range.getPoint (segmentRange seg)
  200                 , Just d <- assetDuration (assetRow a) = Segment $ Range.bounded p (p + d)
  201                 | otherwise = seg
  202               ss = Slot c seg'
  203           u <- maybe (return True) (\s' -> isJust <$> on check slotId s' ss) as'
  204           when u $ do
  205             o <- lift $ changeAssetSlot $ AssetSlot a $ Just ss
  206             unless o $ throwPE "asset link failed"
  207       return c
  208   record :: IngestM Record
  209   record = do
  210     -- handle record shell
  211     (rid :: Maybe (Id Record)) <- JE.keyMay "id" $ Id <$> JE.asIntegral -- insert = nothing, update = just id
  212     (key :: IngestKey) <- JE.key "key" $ asKey
  213     (mIngestRecord :: Maybe Record) <- lift (lookupIngestRecord vol key)
  214     (r :: Record) <- maybe
  215       -- first run of any ingest for this record. could be updating or insert, but need an ingest entry
  216       (do
  217         (r :: Record) <- maybe
  218           (do -- if no existing record, then add a new record
  219             (category :: Category) <- JE.key "category" asCategory
  220             lift $ addRecord $ blankRecord category vol)
  221           (\i -> do  -- else find the existing record by vol + record id
  222             (mRecord :: Maybe Record) <- lift (lookupVolumeRecord vol i)
  223             fromMaybeM (throwPE $ "record " <> T.pack (show i) <> "/" <> key <> " not found") mRecord)
  224           rid
  225         inObj r $ lift $ addIngestRecord r key -- log that a record was ingested, assoc key with the record
  226         return r)
  227       -- there has been a prior ingest using the same key for this record
  228       (\priorIngestRecord -> inObj priorIngestRecord $ do
  229         unless (all (recordId (recordRow priorIngestRecord) ==) rid) $ do -- all here refers to either value in maybe or nothing
  230           throwPE "id mismatch"
  231         _ <- JE.key "category" $ do
  232           (category :: Category) <- asCategory
  233           (category' :: Maybe Category) <-
  234               (category <$) -- check whether category name is different from the category on the existing record
  235                   <$> on check categoryName (recordCategory $ recordRow priorIngestRecord) category
  236           -- update record category for a prior ingest, if category changed
  237           forM_ category'
  238             $ \c ->
  239                  lift
  240                    $ changeRecord priorIngestRecord
  241                        { recordRow = (recordRow priorIngestRecord) { recordCategory = c } }
  242         return priorIngestRecord)
  243       mIngestRecord
  244     -- handle structure (metrics) + field values (measures) for record
  245     _ <- inObj r $ JE.forEachInObject $ \mn ->
  246       unless (mn `elem` ["id", "key", "category", "positions"]) $ do -- for all non special keys, treat as data
  247         (metric :: Metric) <- do
  248             let mMetric = find (\m -> mn == metricName m && recordCategory (recordRow r) == metricCategory m) allMetrics
  249             fromMaybeM (throwPE $ "metric " <> mn <> " not found") mMetric
  250         (datum :: Maybe BS.ByteString) <- do
  251           (newMeasureVal :: T.Text) <- JE.asText
  252           let newMeasureValBS :: BS.ByteString
  253               newMeasureValBS = TE.encodeUtf8 newMeasureVal
  254           maybe
  255             (return (Just newMeasureValBS))  -- always update
  256             (\existingMeasure -> check (measureDatum existingMeasure) newMeasureValBS) -- only update if changed and allowed
  257             (getMeasure metric (recordMeasures r)) -- look for existing measure for this metric on the record
  258         forM_ datum
  259           $ \measureDatumVal -> (lift . changeRecordMeasure) (Measure r metric measureDatumVal) -- save measure data
  260     -- return record
  261     return r
  262   asset :: String -> IngestM (Asset, Maybe Probe)
  263   asset dir = do
  264     sa <- fromMaybeM
  265       (JE.key "file" $ do
  266         file <- asStageFile dir
  267         stageFileRelRaw <- lift $ liftIO $ rawFilePath $ stageFileRel file
  268         stageFileRelAbs <- lift $ liftIO $ rawFilePath $ stageFileAbs file
  269         (,) . Just . (,) file
  270           <$> (either throwPE return
  271             =<< lift (probeFile stageFileRelRaw stageFileRelAbs))
  272           <*> lift (lookupIngestAsset vol $ stageFileRel file))
  273       =<< (JE.keyMay "id" $ do
  274         maybe (throwPE "asset not found") (return . (,) Nothing . Just) =<< lift . lookupVolumeAsset vol . Id =<< JE.asIntegral)
  275     when (isNothing $ fst sa) $ noKey "file"
  276     orig <- JE.keyMay "replace" $
  277       let err = fmap $ maybe (Left "asset not found") Right in
  278       JE.withIntegralM (err . lookupVolumeAsset vol . Id) `catchError` \_ ->
  279         JE.withStringM (err . lookupIngestAsset vol)
  280     a <- case sa of
  281       (_, Just a) -> inObj a $ do
  282         unless (assetBacked a) $ throwPE "ingested asset incomplete"
  283         -- compareFiles file =<< getAssetFile -- assume correct
  284         release <- fmap join . JE.keyMay "release" $ check (assetRelease $ assetRow a) =<< asRelease
  285         name <- fmap join . JE.keyMay "name" $ check (assetName $ assetRow a) =<< JE.perhaps JE.asText
  286         a' <- if isJust release || isJust name
  287           then lift $ changeAsset a
  288             { assetRow = (assetRow a)
  289               { assetRelease = fromMaybe (assetRelease $ assetRow a) release
  290               , assetName = fromMaybe (assetName $ assetRow a) name
  291               }
  292             } Nothing
  293           else return a
  294         forM_ orig $ \o -> lift $ replaceSlotAsset o a'
  295         return a'
  296       (~(Just (file, probe)), Nothing) -> do
  297         release <- JE.key "release" asRelease
  298         name <- JE.keyMay "name" JE.asText
  299         stageFileAbsRaw <- lift $ liftIO $ rawFilePath $ stageFileAbs file
  300         let ba = blankAsset vol
  301         a <- lift $ addAsset ba
  302           { assetRow = (assetRow ba)
  303             { assetFormat = probeFormat probe
  304             , assetRelease = release
  305             , assetName = name
  306             }
  307           } (Just stageFileAbsRaw)
  308         lift $ addIngestAsset a (stageFileRel file)
  309         forM_ orig $ \o -> lift $ replaceAsset o a -- FIXME
  310         return a
  311     inObj a $ case sa of
  312       (Just (_, probe@ProbeAV{}), ae) -> do
  313         clip <- JE.keyOrDefault "clip" fullSegment asSegment
  314         opts <- JE.keyOrDefault "options" defaultTranscodeOptions $ JE.eachInArray JE.asString
  315         t <- lift $ fromMaybeM
  316           (do
  317             t <- addTranscode a clip opts probe
  318             _ <- startTranscode t
  319             return t)
  320           =<< flatMapM (\_ -> findTranscode a clip opts) ae
  321         return (transcodeAsset t, Just probe)
  322       _ -> do
  323         noKey "clip"
  324         noKey "options"
  325         return (a, Nothing)