Skip to content

Scheduled Triggers with UTC offset #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: scheduled-triggers-changes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions server/src-lib/Hasura/Eventing/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ The delivery mechanism is similar to Event Triggers; see "Hasura.Eventing.EventT
module Hasura.Eventing.ScheduledTrigger
( processScheduledQueue
, runScheduledEventsGenerator

, ScheduledEventSeed(..)
, generateScheduleTimes
, insertScheduledEvents
Expand All @@ -29,6 +28,7 @@ import Data.Has
import Data.Int (Int64)
import Data.List (unfoldr)
import Data.Time.Clock
import Data.Time.LocalTime (TimeZone(..),minutesToTimeZone)
import Hasura.Eventing.HTTP
import Hasura.Prelude
import Hasura.RQL.DDL.Headers
Expand Down Expand Up @@ -206,14 +206,28 @@ generateScheduledEventsFrom startTime ScheduledTriggerInfo{..} =
let events =
case stiSchedule of
AdHoc _ -> empty -- ad-hoc scheduled events are created through 'create_scheduled_event' API
Cron cron -> generateScheduleTimes startTime 100 cron -- by default, generate next 100 events
in map (ScheduledEventSeed stiName) events

Cron cron tz -> generateScheduleTimes startTime tz 100 cron

in map (ScheduledEventSeed stiName) events

addOffsetToUTCTime :: UTCTime -> TimeZone -> UTCTime
addOffsetToUTCTime ut (TimeZone mins _ _) =
addUTCTime (realToFrac $ (mins * 60)) ut

-- | Generates next @n events starting @from according to 'CronSchedule'
generateScheduleTimes :: UTCTime -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes from n cron = take n $ go from
-- When Timezone is not Nothing, the offset will be added to the `from` value
-- then the cron schedules are generated and then the offset will be subtracted
-- from the generated timestamps.
generateScheduleTimes :: UTCTime -> Maybe TimeZone -> Int -> CronSchedule -> [UTCTime]
generateScheduleTimes from timezone n cron =
case timezone of
Nothing -> take n $ go from
Just tz@(TimeZone mins _ _) ->
map (\t -> addOffsetToUTCTime t (inverseTimeZone mins)) $ take n $ go $ addOffsetToUTCTime from tz
where
go = unfoldr (fmap dup . nextMatch cron)
inverseTimeZone mins = minutesToTimeZone (-1 * mins)

processScheduledQueue
:: HasVersion
Expand Down
3 changes: 3 additions & 0 deletions server/src-lib/Hasura/Incremental/Internal/Dependency.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import Data.Vector (Vector)
import GHC.Generics ((:*:) (..), (:+:) (..), Generic (..), K1 (..),
M1 (..), U1 (..), V1)
import System.Cron.Types
import Data.Time.LocalTime (TimeZone(..))

import Hasura.Incremental.Select

Expand Down Expand Up @@ -166,6 +167,8 @@ instance Cacheable N.URIAuth where unchanged _ = (==)
instance Cacheable DiffTime where unchanged _ = (==)
instance Cacheable NominalDiffTime where unchanged _ = (==)
instance Cacheable UTCTime where unchanged _ = (==)
instance Cacheable TimeZone where unchanged _ = (==)


-- instances for CronSchedule from package `cron`
instance Cacheable StepField
Expand Down
4 changes: 2 additions & 2 deletions server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ addScheduledTriggerToCatalog CreateScheduledTrigger {..} = liftTx $ do
(name, scheduled_time)
VALUES ($1, $2)
|] (stName, timestamp) False
Cron cron -> do
Cron cron tz -> do
currentTime <- liftIO C.getCurrentTime
let scheduleTimes = generateScheduleTimes currentTime 100 cron -- generate next 100 events
let scheduleTimes = generateScheduleTimes currentTime tz 100 cron -- generate next 100 events
events = map (ScheduledEventSeed stName) scheduleTimes
insertScheduledEvents events
_ -> pure ()
Expand Down
3 changes: 3 additions & 0 deletions server/src-lib/Hasura/RQL/Types/Catalog.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import qualified Data.HashMap.Strict as M
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Time.LocalTime (TimeZone(..))

import Hasura.Incremental (Cacheable)
import Hasura.RQL.DDL.ComputedField
Expand All @@ -38,6 +39,8 @@ import Hasura.RQL.Types.SchemaCache
import Hasura.RQL.Types.ScheduledTrigger
import Hasura.SQL.Types

import qualified Database.PG.Query as Q

newtype CatalogForeignKey
= CatalogForeignKey
{ unCatalogForeignKey :: ForeignKey
Expand Down
42 changes: 39 additions & 3 deletions server/src-lib/Hasura/RQL/Types/ScheduledTrigger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ import Data.Time.Format
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Char
import Hasura.Prelude
import System.Cron.Types
import Hasura.Incremental
import Data.Time.LocalTime (TimeZone(..),minutesToTimeZone)

import qualified Data.Text as T
import qualified Data.Aeson as J
import qualified Hasura.RQL.Types.EventTrigger as ET
import qualified Database.PG.Query as Q

data RetryConfST
= RetryConfST
Expand All @@ -46,7 +49,7 @@ defaultRetryConfST =
, rcstTolerance = 21600 -- 6 hours
}

data ScheduleType = Cron CronSchedule | AdHoc (Maybe UTCTime)
data ScheduleType = Cron CronSchedule (Maybe TimeZone) | AdHoc (Maybe UTCTime)
deriving (Show, Eq, Generic)

instance NFData ScheduleType
Expand All @@ -57,15 +60,47 @@ instance FromJSON ScheduleType where
withObject "ScheduleType" $ \o -> do
type' <- o .: "type"
case type' of
String "cron" -> Cron <$> o .: "value"
String "cron" -> Cron <$> o .: "value" <*> o .:? "utc-offset"
String "adhoc" -> AdHoc <$> o .:? "value"
_ -> fail "expected type to be cron or adhoc"

instance ToJSON ScheduleType where
toJSON (Cron cs) = object ["type" .= String "cron", "value" .= toJSON cs]
toJSON (Cron cs (Just offset)) = object ["type" .= String "cron", "value" .= toJSON cs, "utc-offset" .= (show offset)]
toJSON (Cron cs Nothing) = object ["type" .= String "cron", "value" .= toJSON cs]
toJSON (AdHoc (Just ts)) = object ["type" .= String "adhoc", "value" .= toJSON ts]
toJSON (AdHoc Nothing) = object ["type" .= String "adhoc"]

-- convertUTCOffsetToTimeZone can take an offset in any one of
-- the following formats:
-- HHMM,HH:MM,(+/-)HHMM
-- If the length of the offset is 4, then it's assumed that it's a
-- positive offset.
convertUTCOffsetToTimeZone :: String -> Either String TimeZone
convertUTCOffsetToTimeZone offset
| length offset == 4 = convertUTCOffsetToTimeZone ('+':offset)
convertUTCOffsetToTimeZone (h1:h2:':':m1:m2:"") =
convertUTCOffsetToTimeZone('+':h1:h2:m1:m2:"")
convertUTCOffsetToTimeZone (p:h1:h2:':':m1:m2:"") =
convertUTCOffsetToTimeZone(p:h1:h2:m1:m2:"")
convertUTCOffsetToTimeZone ('+':h1:h2:m1:m2:"")
| and [(isDigit h1),(isDigit h2),(isDigit m1),(isDigit m2)] =
let mins = (10 * (digitToInt h1) + (digitToInt h2)) * 60
+ (10 * (digitToInt m1) + (digitToInt m2))
in Right $ TimeZone mins False ('+':h1:h2:m1:m2:"")
| otherwise = Left "Invalid TimeZone Format"
convertUTCOffsetToTimeZone ('-':h1:h2:m1:m2:"") =
case convertUTCOffsetToTimeZone ('+':h1:h2:m1:m2:"") of
Left msg -> Left msg
Right (TimeZone mins isSummerOnly ('+':offset)) -> Right (TimeZone (-1 * mins) isSummerOnly ('-':offset))
convertUTCOffsetToTimeZone _ = Left "Invalid TimeZone Format"

instance FromJSON TimeZone where
parseJSON = withText "TimeZone" $ \o ->
either fail pure $ convertUTCOffsetToTimeZone $ T.unpack o

instance ToJSON TimeZone where
toJSON (TimeZone _ _ offset) = String . T.pack $ offset

data CreateScheduledTrigger
= CreateScheduledTrigger
{ stName :: !ET.TriggerName
Expand All @@ -88,6 +123,7 @@ instance FromJSON CreateScheduledTrigger where
stSchedule <- o .: "schedule"
stRetryConf <- o .:? "retry_conf" .!= defaultRetryConfST
stHeaders <- o .:? "headers" .!= []

pure CreateScheduledTrigger {..}

$(deriveToJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''CreateScheduledTrigger)
Expand Down
80 changes: 73 additions & 7 deletions server/tests-py/test_scheduled_triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from croniter import croniter
from validate import validate_event_webhook,validate_event_headers
from queue import Empty
from pytz import timezone
import time

def stringify_datetime(dt):
Expand All @@ -25,9 +26,73 @@ def get_events_of_scheduled_trigger(hge_ctx,trigger_name):
class TestScheduledTriggerCron(object):

cron_trigger_name = "a_scheduled_trigger"
cron_trigger_with_offset = cron_trigger_name + "_offset"
webhook_payload = {"foo":"baz"}
webhook_path = "/hello"
url = '/v1/query'
timezone_region = "Asia/Kolkata"
offset_at_timezone_region = "+05:30"

def test_create_cron_schedule_triggers_with_offset(self,hge_ctx):
# setting the test to be after 30 mins, to make sure that
# any of the events are not triggered.
local_now = datetime.now().astimezone(timezone(self.timezone_region))
min_after_30_mins = (local_now + timedelta(minutes=30)).minute
TestScheduledTriggerCron.cron_schedule = "{} * * * *".format(min_after_30_mins)

cron_st_api_query = {
"type":"create_scheduled_trigger",
"args":{
"name":self.cron_trigger_with_offset,
"webhook":"http://127.0.0.1:5594" + "/foo",
"schedule":{
"type":"cron",
"value":self.cron_schedule,
"utc-offset":self.offset_at_timezone_region
},
"headers":[
{
"name":"foo",
"value":"baz"
}
],
"payload":self.webhook_payload
}
}
headers = {}
if hge_ctx.hge_key is not None:
headers['X-Hasura-Admin-Secret'] = hge_ctx.hge_key
cron_st_code,cron_st_resp,_ = hge_ctx.anyq(self.url,cron_st_api_query,headers)
TestScheduledTriggerCron.init_time_with_offset = datetime.now().astimezone(timezone(self.timezone_region)) # the cron events will be generated based on the current time, they will not be exactly the same though(the server now and now here)
assert cron_st_code == 200
assert cron_st_resp['message'] == 'success'

def test_check_generated_cron_scheduled_events_with_offset(self,hge_ctx):
expected_schedule_timestamps = []
iter = croniter(self.cron_schedule,self.init_time_with_offset)
for i in range(100):
dt = iter.next(datetime)
expected_schedule_timestamps.append(datetime.timestamp(dt))
sql = '''
select timezone('{}',scheduled_time) as scheduled_time
from hdb_catalog.hdb_scheduled_events where
name = '{}' order by scheduled_time asc;
'''
q = {
"type":"run_sql",
"args":{
"sql":sql.format(self.timezone_region, self.cron_trigger_with_offset)
}
}
st,resp = hge_ctx.v1q(q)
assert st == 200
ts_resp = resp['result'][1:]
assert len(ts_resp) == 100 # 100 events are generated in a cron ST
db_timestamps = []
for ts in ts_resp:
datetime_ts = datetime.strptime(ts[0],"%Y-%m-%d %H:%M:%S")
db_timestamps.append(datetime.timestamp(datetime_ts))
assert db_timestamps == expected_schedule_timestamps

def test_create_cron_schedule_triggers(self,hge_ctx):
# setting the test to be after 30 mins, to make sure that
Expand Down Expand Up @@ -90,14 +155,15 @@ def test_check_generated_cron_scheduled_events(self,hge_ctx):
assert future_schedule_timestamps == scheduled_events_ts

def test_delete_cron_scheduled_trigger(self,hge_ctx):
q = {
"type":"delete_scheduled_trigger",
"args":{
"name":self.cron_trigger_name
for trigger_name in [self.cron_trigger_name,self.cron_trigger_with_offset]:
q = {
"type":"delete_scheduled_trigger",
"args":{
"name":trigger_name
}
}
}
st,resp = hge_ctx.v1q(q)
assert st == 200,resp
st,resp = hge_ctx.v1q(q)
assert st == 200,resp

class ScheduledEventNotFound(Exception):
pass
Expand Down