1 {-# LANGUAGE OverloadedStrings, ScopedTypeVariables #-}
    2 module Databrary.Ingest.JSON
    3   ( ingestJSON
    4   ) where
    5 
    6 import Control.Arrow (left)
    7 import Control.Monad (join, when, unless, void, mfilter, forM_, (<=<))
    8 import Control.Monad.Except (ExceptT(..), runExceptT, mapExceptT, catchError, throwError)
    9 import Control.Monad.IO.Class (MonadIO(liftIO))
   10 import Control.Monad.Trans.Class (lift)
   11 import qualified Data.Aeson.BetterErrors as JE
   12 import qualified Data.Attoparsec.ByteString as P
   13 import qualified Data.ByteString as BS
   14 import Data.ByteString.Lazy.Internal (defaultChunkSize)
   15 import Data.Function (on)
   16 import qualified Data.JsonSchema.Draft4 as JS
   17 import Data.List (find)
   18 import Data.Maybe (isJust, fromMaybe, isNothing)
   19 import Data.Monoid ((<>))
   20 import Data.Time.Format (parseTimeM, defaultTimeLocale)
   21 import qualified Data.Text as T
   22 import qualified Data.Text.Encoding as TE
   23 import qualified Database.PostgreSQL.Typed.Range as Range
   24 import System.FilePath ((</>))
   25 import System.IO (withBinaryFile, IOMode(ReadMode))
   26 
   27 import Paths_databrary
   28 import Databrary.Ops
   29 import Databrary.Has (Has, view, focusIO, MonadHas)
   30 import qualified Databrary.JSON as J
   31 import Databrary.Files hiding ((</>))
   32 import Databrary.Store.AV (AV)
   33 import Databrary.Store.Stage
   34 import Databrary.Store.Probe
   35 import Databrary.Store.Transcode
   36 import Databrary.Store.Types (MonadStorage)
   37 import Databrary.Model.Audit (MonadAudit)
   38 import Databrary.Model.Time
   39 import Databrary.Model.Kind
   40 import Databrary.Model.Id.Types
   41 import Databrary.Model.Volume
   42 import Databrary.Model.Container
   43 import Databrary.Model.Segment
   44 import Databrary.Model.Slot.Types
   45 import Databrary.Model.Release
   46 import Databrary.Model.Record
   47 import Databrary.Model.Category
   48 import Databrary.Model.Metric
   49 import Databrary.Model.Measure
   50 import Databrary.Model.RecordSlot
   51 import Databrary.Model.Asset
   52 import Databrary.Model.AssetSlot
   53 import Databrary.Model.AssetRevision
   54 import Databrary.Model.Transcode
   55 import Databrary.Model.Ingest
   56 import Databrary.Model.Party (SiteAuth)
   57 -- import Databrary.Action.Types
   58 import Databrary.Service.Log (MonadLog)
   59 import Databrary.Service.Types (Secret)
   60 
   61 -- type IngestM a = JE.ParseT T.Text Handler a -- TODO: bring back as constraint alias
   62 
   63 loadSchema :: ExceptT [T.Text] IO (J.Value -> [JS.Failure])
   64 loadSchema = do
   65   schema <- lift $ getDataFileName "volume.json"
   66   r <- lift $ withBinaryFile schema ReadMode (\h ->
   67     P.parseWith (BS.hGetSome h defaultChunkSize) J.json' BS.empty)
   68   js <- ExceptT . return . left (return . T.pack) $ eitherJSON =<< P.eitherResult r
   69   ExceptT $ return $ left (map (T.pack . show)) $ JS.checkSchema (JS.SchemaCache js mempty) (JS.SchemaContext Nothing js)
   70   where
   71     eitherJSON = J.parseEither J.parseJSON                                             
   72 
   73 throwPE :: (Monad m) => T.Text -> JE.ParseT T.Text m a
   74 throwPE = JE.throwCustomError                          
   75 
   76 inObj :: forall a b m. (Kinded a, Has (Id a) a, Show (IdType a), Monad m) => a -> JE.ParseT T.Text m b -> JE.ParseT T.Text m b
   77 inObj o = JE.mapError (<> (" for " <> kindOf o <> T.pack (' ' : show (view o :: Id a))))
   78 
   79 noKey :: (Monad m) => T.Text -> JE.ParseT T.Text m ()
   80 noKey k = void $ JE.keyMay k $ throwPE "unhandled value"                                          
   81 
   82 asKey :: (Monad m) => JE.ParseT T.Text m IngestKey
   83 asKey = JE.asText                       
   84 
   85 asDate :: (Monad m) => JE.ParseT T.Text m Date
   86 asDate = JE.withString (maybe (Left "expecting %F") Right . parseTimeM True defaultTimeLocale "%F")
   87 
   88 asRelease :: (Monad m) => JE.ParseT T.Text m (Maybe Release)
   89 asRelease = JE.perhaps JE.fromAesonParser           
   90 
   91 asCategory :: (Monad m) => JE.ParseT T.Text m Category
   92 asCategory =                 
   93   JE.withIntegral (err . getCategory . Id) `catchError` \_ -> do
   94     JE.withText (\n -> err $ find ((n ==) . categoryName) allCategories)
   95   where err = maybe (Left "category not found") Right
   96 
   97 asSegment :: (Monad m) => JE.ParseT T.Text m Segment
   98 asSegment = JE.fromAesonParser
   99 
  100 data StageFile = StageFile
  101   { stageFileRel :: !FilePath
  102   , stageFileAbs :: !FilePath
  103   }
  104 
  105 asStageFile :: (MonadStorage c m) => FilePath -> JE.ParseT T.Text m StageFile
  106 asStageFile b = do     
  107   r <- (b </>) <$> JE.asString
  108   a <- fromMaybeM (throwPE "stage file not found") <=< lift $ focusIO $ \a -> do
  109     rfp <- rawFilePath r
  110     stageFileRaw <- stageFile rfp a
  111     mapM unRawFilePath stageFileRaw
  112   return $ StageFile r a
  113 
  114 ingestJSON :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m)
  115   => Volume -> J.Value -> Bool -> Bool -> m (Either [T.Text] [Container])
  116 ingestJSON vol jdata run overwrite = runExceptT $ do
  117   schema <- mapExceptT liftIO loadSchema
  118   let errs = schema jdata
  119   unless (null errs) $ throwError $ map (T.pack . show) errs
  120   if run
  121   then ExceptT $ left (JE.displayError id) <$> JE.parseValueM volume jdata
  122   else return []
  123     where
  124   check :: (Eq a, Show a, Monad m) => a -> a -> JE.ParseT T.Text m (Maybe a)
  125   check cur new
  126     | cur == new = return Nothing
  127     | not overwrite = throwPE $ "conflicting value: " <> T.pack (show new) <> " <> " <> T.pack (show cur)
  128     | otherwise = return $ Just new
  129   volume :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m)
  130     => JE.ParseT T.Text m [Container]
  131   volume = do
  132     dir <- JE.keyOrDefault "directory" "" $ stageFileRel <$> asStageFile ""
  133     _ <- JE.keyMay "name" $ do
  134       name <- check (volumeName $ volumeRow vol) =<< JE.asText
  135       forM_ name $ \n -> lift $ changeVolume vol{ volumeRow = (volumeRow vol){ volumeName = n } }
  136     top <- lift (lookupVolumeTopContainer vol)
  137     JE.key "containers" $ JE.eachInArray (container top dir)
  138   container :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m)
  139     => Container -> String -> JE.ParseT T.Text m Container
  140   container topc dir = do
  141     cid <- JE.keyMay "id" $ Id <$> JE.asIntegral
  142     key <- JE.key "key" $ asKey
  143     c' <- lift (lookupIngestContainer vol key)
  144     c <- maybe
  145       (do
  146         c <- maybe
  147           (do
  148             top <- JE.keyOrDefault "top" False JE.asBool
  149             name <- JE.keyMay "name" JE.asText
  150             date <- JE.keyMay "date" asDate
  151             let c = blankContainer vol
  152             lift $ addContainer c
  153               { containerRow = (containerRow c)
  154                 { containerTop = top
  155                 , containerName = name
  156                 , containerDate = date
  157                 }
  158               })
  159           (\i -> fromMaybeM (throwPE $ "container " <> T.pack (show i) <> "/" <> key <> " not found")
  160             =<< lift (lookupVolumeContainer vol i))
  161           cid
  162         inObj c $ lift $ addIngestContainer c key
  163         return c)
  164       (\c -> inObj c $ do
  165         unless (all (containerId (containerRow c) ==) cid) $ do
  166           throwPE "id mismatch"
  167         top <- fmap join . JE.keyMay "top" $ check (containerTop $ containerRow c) =<< JE.asBool
  168         name <- fmap join . JE.keyMay "name" $ check (containerName $ containerRow c) =<< JE.perhaps JE.asText
  169         date <- fmap join . JE.keyMay "date" $ check (containerDate $ containerRow c) =<< JE.perhaps asDate
  170         when (isJust top || isJust name || isJust date) $ lift $ changeContainer c
  171           { containerRow = (containerRow c)
  172             { containerTop = fromMaybe (containerTop $ containerRow c) top
  173             , containerName = fromMaybe (containerName $ containerRow c) name
  174             , containerDate = fromMaybe (containerDate $ containerRow c) date
  175             }
  176           }
  177         return c)
  178       c'
  179     let s = containerSlot c
  180     inObj c $ do
  181       _ <- JE.keyMay "release" $ do
  182         release <- maybe (return . fmap Just) (check . containerRelease) c' =<< asRelease
  183         forM_ release $ \r -> do
  184           o <- lift $ changeRelease s r
  185           unless o $ throwPE "update failed"
  186       _ <- JE.key "records" $ JE.eachInArray $ do
  187         r <- record
  188         inObj r $ do
  189           rs' <- lift $ lookupRecordSlotRecords r s
  190           segm <- (if null rs' then return . Just else check (map (slotSegment . recordSlot) rs')) =<< JE.keyOrDefault "positions" [fullSegment] (JE.eachInArray asSegment)
  191           forM_ segm $ \segs -> do
  192             let rs = RecordSlot r . Slot c
  193             unless (null rs') $ do
  194               o <- lift $ moveRecordSlot (rs fullSegment) emptySegment
  195               unless o $ throwPE "record clear failed"
  196             o <- lift $ mapM (moveRecordSlot (rs emptySegment)) segs
  197             unless (and o) $ throwPE "record link failed"
  198       _ <- JE.key "assets" $ JE.eachInArray $ do
  199         (a, probe) <- asset dir
  200         inObj a $ do
  201           as' <- lift $ mfilter (((/=) `on` containerId . containerRow) topc . slotContainer) . assetSlot <$> lookupAssetAssetSlot a
  202           seg <- JE.keyOrDefault "position" (maybe fullSegment slotSegment as') $
  203             JE.withTextM (\t -> if t == "auto"
  204               then maybe (Right . Segment . Range.point <$> probeAutoPosition c probe) (return . Right . slotSegment) $ mfilter (((==) `on` containerId . containerRow) c . slotContainer) as'
  205               else return $ Left "invalid asset position")
  206               `catchError` \_ -> asSegment
  207           let seg'
  208                 | Just p <- Range.getPoint (segmentRange seg)
  209                 , Just d <- assetDuration (assetRow a) = Segment $ Range.bounded p (p + d)
  210                 | otherwise = seg
  211               ss = Slot c seg'
  212           u <- maybe (return True) (\s' -> isJust <$> on check slotId s' ss) as'
  213           when u $ do
  214             o <- lift $ changeAssetSlot $ AssetSlot a $ Just ss
  215             unless o $ throwPE "asset link failed"
  216       return c
  217   record :: (MonadAudit c m) => JE.ParseT T.Text m Record
  218   record = do
  219     -- handle record shell
  220     (rid :: Maybe (Id Record)) <- JE.keyMay "id" $ Id <$> JE.asIntegral -- insert = nothing, update = just id
  221     (key :: IngestKey) <- JE.key "key" $ asKey
  222     (mIngestRecord :: Maybe Record) <- lift (lookupIngestRecord vol key)
  223     (r :: Record) <- maybe
  224       -- first run of any ingest for this record. could be updating or insert, but need an ingest entry
  225       (do
  226         (r :: Record) <- maybe
  227           (do -- if no existing record, then add a new record
  228             (category :: Category) <- JE.key "category" asCategory
  229             lift $ addRecord $ blankRecord category vol)
  230           (\i -> do  -- else find the existing record by vol + record id
  231             (mRecord :: Maybe Record) <- lift (lookupVolumeRecord vol i)
  232             fromMaybeM (throwPE $ "record " <> T.pack (show i) <> "/" <> key <> " not found") mRecord)
  233           rid
  234         inObj r $ lift $ addIngestRecord r key -- log that a record was ingested, assoc key with the record
  235         return r)
  236       -- there has been a prior ingest using the same key for this record
  237       (\priorIngestRecord -> inObj priorIngestRecord $ do
  238         unless (all (recordId (recordRow priorIngestRecord) ==) rid) $ do -- all here refers to either value in maybe or nothing
  239           throwPE "id mismatch"
  240         _ <- JE.key "category" $ do
  241           (category :: Category) <- asCategory
  242           (category' :: Maybe Category) <-
  243               (category <$) -- check whether category name is different from the category on the existing record
  244                   <$> on check categoryName (recordCategory $ recordRow priorIngestRecord) category
  245           -- update record category for a prior ingest, if category changed
  246           forM_ category'
  247             $ \c ->
  248                  lift
  249                    $ changeRecord priorIngestRecord
  250                        { recordRow = (recordRow priorIngestRecord) { recordCategory = c } }
  251         return priorIngestRecord)
  252       mIngestRecord
  253     -- handle structure (metrics) + field values (measures) for record
  254     _ <- inObj r $ JE.forEachInObject $ \mn ->
  255       unless (mn `elem` ["id", "key", "category", "positions"]) $ do -- for all non special keys, treat as data
  256         (metric :: Metric) <- do
  257             let mMetric = find (\m -> mn == metricName m && recordCategory (recordRow r) == metricCategory m) allMetrics
  258             fromMaybeM (throwPE $ "metric " <> mn <> " not found") mMetric
  259         (datum :: Maybe BS.ByteString) <- do
  260           (newMeasureVal :: T.Text) <- JE.asText
  261           let newMeasureValBS :: BS.ByteString
  262               newMeasureValBS = TE.encodeUtf8 newMeasureVal
  263           maybe
  264             (return (Just newMeasureValBS))  -- always update
  265             (\existingMeasure -> check (measureDatum existingMeasure) newMeasureValBS) -- only update if changed and allowed
  266             (getMeasure metric (recordMeasures r)) -- look for existing measure for this metric on the record
  267         forM_ datum
  268           $ \measureDatumVal -> (lift . changeRecordMeasure) (Measure r metric measureDatumVal) -- save measure data
  269     -- return record
  270     return r
  271   asset :: (MonadStorage c m, MonadAudit c m, MonadHas AV c m, MonadHas Secret c m, MonadHas Timestamp c m, MonadLog c m, MonadHas SiteAuth c m)
  272     => String -> JE.ParseT T.Text m (Asset, Maybe Probe)
  273   asset dir = do
  274     sa <- fromMaybeM
  275       (JE.key "file" $ do
  276         file <- asStageFile dir
  277         stageFileRelRaw <- lift $ liftIO $ rawFilePath $ stageFileRel file
  278         stageFileRelAbs <- lift $ liftIO $ rawFilePath $ stageFileAbs file
  279         (,) . Just . (,) file
  280           <$> (either throwPE return
  281             =<< lift (probeFile stageFileRelRaw stageFileRelAbs))
  282           <*> lift (lookupIngestAsset vol $ stageFileRel file))
  283       =<< (JE.keyMay "id" $ do
  284         maybe (throwPE "asset not found") (return . (,) Nothing . Just) =<< lift . lookupVolumeAsset vol . Id =<< JE.asIntegral)
  285     when (isNothing $ fst sa) $ noKey "file"
  286     orig <- JE.keyMay "replace" $
  287       let err = fmap $ maybe (Left "asset not found") Right in
  288       JE.withIntegralM (err . lookupVolumeAsset vol . Id) `catchError` \_ ->
  289         JE.withStringM (err . lookupIngestAsset vol)
  290     a <- case sa of
  291       (_, Just a) -> inObj a $ do
  292         unless (assetBacked a) $ throwPE "ingested asset incomplete"
  293         -- compareFiles file =<< getAssetFile -- assume correct
  294         release <- fmap join . JE.keyMay "release" $ check (assetRelease $ assetRow a) =<< asRelease
  295         name <- fmap join . JE.keyMay "name" $ check (assetName $ assetRow a) =<< JE.perhaps JE.asText
  296         a' <- if isJust release || isJust name
  297           then lift $ changeAsset a
  298             { assetRow = (assetRow a)
  299               { assetRelease = fromMaybe (assetRelease $ assetRow a) release
  300               , assetName = fromMaybe (assetName $ assetRow a) name
  301               }
  302             } Nothing
  303           else return a
  304         forM_ orig $ \o -> lift $ replaceSlotAsset o a'
  305         return a'
  306       (~(Just (file, probe)), Nothing) -> do
  307         release <- JE.key "release" asRelease
  308         name <- JE.keyMay "name" JE.asText
  309         stageFileAbsRaw <- lift $ liftIO $ rawFilePath $ stageFileAbs file
  310         let ba = blankAsset vol
  311         a <- lift $ addAsset ba
  312           { assetRow = (assetRow ba)
  313             { assetFormat = probeFormat probe
  314             , assetRelease = release
  315             , assetName = name
  316             }
  317           } (Just stageFileAbsRaw)
  318         lift $ addIngestAsset a (stageFileRel file)
  319         forM_ orig $ \o -> lift $ replaceAsset o a -- FIXME
  320         return a
  321     inObj a $ case sa of
  322       (Just (_, probe@ProbeAV{}), ae) -> do
  323         clip <- JE.keyOrDefault "clip" fullSegment asSegment
  324         opts <- JE.keyOrDefault "options" defaultTranscodeOptions $ JE.eachInArray JE.asString
  325         t <- lift $ fromMaybeM
  326           (do
  327             t <- addTranscode a clip opts probe
  328             _ <- startTranscode t
  329             return t)
  330           =<< flatMapM (\_ -> findTranscode a clip opts) ae
  331         return (transcodeAsset t, Just probe)
  332       _ -> do
  333         noKey "clip"
  334         noKey "options"
  335         return (a, Nothing)