Skip to content

Commit c728804

Browse files
committed
configurable time extraction window
1 parent 96a2034 commit c728804

File tree

4 files changed

+45
-13
lines changed

4 files changed

+45
-13
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ These parameters are filled by CFT at stack creation time and can be adjusted la
7373
| /arduino/s3-importer/iot/filter/tags | (optional) tags filtering. Syntax: tag=value,tag2=value2 |
7474
| /arduino/s3-importer/iot/samples-resolution | (optional) samples aggregation resolution (1/5/15 minutes, 1 hour, raw) |
7575
| /arduino/s3-importer/destination-bucket | S3 destination bucket |
76+
| /arduino/s3-importer/iot/scheduling | Execution scheduling |
7677

7778
### Tag filtering
7879

business/tsextractor/tsextractor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ func (a *TsExtractor) ExportTSToS3(
5252
resolution int,
5353
destinationS3Bucket string) error {
5454

55-
to := time.Now().Truncate(time.Hour).UTC()
55+
// Truncate time to given resolution
56+
to := time.Now().Truncate(time.Duration(resolution) * time.Second).UTC()
5657
from := to.Add(-time.Duration(timeWindowInMinutes) * time.Minute)
5758

5859
// Open s3 output writer

deployment/cloud-formation-template/deployment.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,15 @@ Resources:
171171
Ref: DestinationS3Bucket
172172
Tier: Standard
173173

174+
ExecutionSchedulingParameter:
175+
Type: AWS::SSM::Parameter
176+
Properties:
177+
Name: /arduino/s3-importer/iot/scheduling
178+
Type: String
179+
Value:
180+
Ref: ExecutionScheduling
181+
Tier: Standard
182+
174183
# EventBridge Rule to trigger Lambda every hour
175184
EventBridgeRule:
176185
Type: AWS::Events::Rule

lambda.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,17 @@ type AWSS3ImportTrigger struct {
3131
}
3232

3333
const (
34-
ArduinoPrefix = "/arduino/s3-importer"
35-
IoTApiKey = ArduinoPrefix + "/iot/api-key"
36-
IoTApiSecret = ArduinoPrefix + "/iot/api-secret"
37-
IoTApiOrgId = ArduinoPrefix + "/iot/org-id"
38-
IoTApiTags = ArduinoPrefix + "/iot/filter/tags"
39-
SamplesResoSec = ArduinoPrefix + "/iot/samples-resolution-seconds"
40-
SamplesReso = ArduinoPrefix + "/iot/samples-resolution"
41-
DestinationS3Bucket = ArduinoPrefix + "/destination-bucket"
42-
SamplesResolutionSeconds = 300
43-
TimeExtractionWindowMinutes = 60
34+
ArduinoPrefix = "/arduino/s3-importer"
35+
IoTApiKey = ArduinoPrefix + "/iot/api-key"
36+
IoTApiSecret = ArduinoPrefix + "/iot/api-secret"
37+
IoTApiOrgId = ArduinoPrefix + "/iot/org-id"
38+
IoTApiTags = ArduinoPrefix + "/iot/filter/tags"
39+
SamplesResoSec = ArduinoPrefix + "/iot/samples-resolution-seconds"
40+
SamplesReso = ArduinoPrefix + "/iot/samples-resolution"
41+
Scheduling = ArduinoPrefix + "/iot/scheduling"
42+
DestinationS3Bucket = ArduinoPrefix + "/destination-bucket"
43+
SamplesResolutionSeconds = 300
44+
DefaultTimeExtractionWindowMinutes = 60
4445
)
4546

4647
func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, error) {
@@ -78,6 +79,8 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
7879
if tagsParam != nil {
7980
tags = tagsParam
8081
}
82+
83+
// Resolve resolution
8184
resolution, err := paramReader.ReadIntConfig(SamplesResoSec)
8285
if err != nil {
8386
// Possibly this parameter is not set. Try SamplesReso
@@ -101,12 +104,29 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
101104
}
102105
resolution = &val
103106
}
104-
105107
if *resolution > 3600 {
106108
logger.Errorf("Resolution %d is invalid", *resolution)
107109
return nil, errors.New("resolution must be between -1 and 3600")
108110
}
109111

112+
// Resolve scheduling
113+
schedule, err := paramReader.ReadConfig(Scheduling)
114+
if err != nil {
115+
logger.Error("Error reading parameter "+Scheduling, err)
116+
return nil, err
117+
}
118+
extractionWindowMinutes := DefaultTimeExtractionWindowMinutes
119+
switch *schedule {
120+
case "5 minutes":
121+
extractionWindowMinutes = 5
122+
case "15 minutes":
123+
extractionWindowMinutes = 15
124+
case "1 hour":
125+
extractionWindowMinutes = 60
126+
case "1 day":
127+
extractionWindowMinutes = 24 * 60
128+
}
129+
110130
logger.Infoln("------ Running import...")
111131
if event.Dev || os.Getenv("DEV") == "true" {
112132
logger.Infoln("Running in dev mode")
@@ -127,8 +147,9 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
127147
} else {
128148
logger.Infoln("resolution:", *resolution, "seconds")
129149
}
150+
logger.Infoln("data extraction time windows:", extractionWindowMinutes, "minutes")
130151

131-
err = importer.StartImport(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket)
152+
err = importer.StartImport(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, extractionWindowMinutes, *destinationS3Bucket)
132153
if err != nil {
133154
return nil, err
134155
}

0 commit comments

Comments
 (0)