Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6b39242

Browse files
committedSep 20, 2024·
code refactoring
1 parent c4586d1 commit 6b39242

File tree

4 files changed

+48
-26
lines changed

4 files changed

+48
-26
lines changed
 

‎app/exporter/exporter.go

+36-23
Original file line numberDiff line numberDiff line change
@@ -28,74 +28,87 @@ import (
2828
"github.com/sirupsen/logrus"
2929
)
3030

31-
func StartExporter(
32-
ctx context.Context,
33-
logger *logrus.Entry,
34-
key, secret, orgid string,
35-
tagsF *string,
36-
resolution, timeWindowMinutes int,
37-
destinationS3Bucket string,
38-
aggregationStat string,
39-
compress, enableAlignTimeWindow bool) error {
31+
type samplesExporter struct {
32+
iotClient *iot.Client
33+
logger *logrus.Entry
34+
tagsF *string
35+
compress bool
36+
enableAlignTimeWindow bool
37+
}
4038

41-
// Init client
39+
func New(key, secret, orgid string, tagsF *string, compress, enableAlignTimeWindow bool, logger *logrus.Entry) (*samplesExporter, error) {
4240
iotcl, err := iot.NewClient(key, secret, orgid)
4341
if err != nil {
44-
return err
42+
return nil, err
4543
}
4644

47-
if tagsF != nil {
48-
logger.Infoln("Filtering things linked to configured account using tags: ", *tagsF)
45+
return &samplesExporter{
46+
iotClient: iotcl,
47+
logger: logger,
48+
tagsF: tagsF,
49+
compress: compress,
50+
enableAlignTimeWindow: enableAlignTimeWindow,
51+
}, nil
52+
}
53+
54+
func (s *samplesExporter) StartExporter(
55+
ctx context.Context,
56+
resolution, timeWindowMinutes int,
57+
destinationS3Bucket string,
58+
aggregationStat string) error {
59+
60+
if s.tagsF != nil {
61+
s.logger.Infoln("Filtering things linked to configured account using tags: ", *s.tagsF)
4962
} else {
50-
logger.Infoln("Importing all things linked to configured account")
63+
s.logger.Infoln("Importing all things linked to configured account")
5164
}
5265

53-
things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF))
66+
things, err := s.iotClient.ThingList(ctx, nil, nil, true, utils.ParseTags(s.tagsF))
5467
if err != nil {
5568
return err
5669
}
5770
thingsMap := make(map[string]iotclient.ArduinoThing, len(things))
5871
for _, thing := range things {
59-
logger.Infoln(" Thing: ", thing.Id, thing.Name)
72+
s.logger.Infoln(" Thing: ", thing.Id, thing.Name)
6073
thingsMap[thing.Id] = thing
6174
}
6275

6376
// Extract data points from thing and push to S3
64-
tsextractorClient := tsextractor.New(iotcl, logger)
77+
tsextractorClient := tsextractor.New(s.iotClient, s.logger)
6578

6679
// Open s3 output writer
6780
s3cl, err := s3.NewS3Client(destinationS3Bucket)
6881
if err != nil {
6982
return err
7083
}
7184

72-
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil {
85+
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, s.enableAlignTimeWindow); err != nil {
7386
if writer != nil {
7487
writer.Close()
7588
defer writer.Delete()
7689
}
77-
logger.Error("Error aligning time series samples: ", err)
90+
s.logger.Error("Error aligning time series samples: ", err)
7891
return err
7992
} else {
8093
writer.Close()
8194
defer writer.Delete()
8295

8396
fileToUpload := writer.GetFilePath()
8497
destinationKeyFormat := "%s/%s.csv"
85-
if compress {
86-
logger.Infof("Compressing file: %s\n", fileToUpload)
98+
if s.compress {
99+
s.logger.Infof("Compressing file: %s\n", fileToUpload)
87100
compressedFile, err := utils.GzipFileCompression(fileToUpload)
88101
if err != nil {
89102
return err
90103
}
91104
fileToUpload = compressedFile
92-
logger.Infof("Generated compressed file: %s\n", fileToUpload)
105+
s.logger.Infof("Generated compressed file: %s\n", fileToUpload)
93106
destinationKeyFormat = "%s/%s.csv.gz"
94107
defer func(f string) { os.Remove(f) }(fileToUpload)
95108
}
96109

97110
destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
98-
logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
111+
s.logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
99112
if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil {
100113
return err
101114
}

‎business/tsextractor/tsextractor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func (a *TsExtractor) populateStringTSDataIntoS3(
360360
if !slices.Contains(populatedProperties, propertyID) {
361361
populatedProperties = append(populatedProperties, propertyID)
362362
}
363-
samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), ""))
363+
samples = append(samples, composeRow(ts, thing.Id, thing.Name, propertyID, propertyName, propertyType, a.interfaceToString(value), "SAMPLED"))
364364
}
365365
}
366366

‎lambda.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,11 @@ func HandleRequest(ctx context.Context, event *AWSS3ImportTrigger) (*string, err
188188
logger.Infoln("file compression enabled:", enabledCompression)
189189
logger.Infoln("align time window:", enableAlignTimeWindow)
190190

191-
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat, enabledCompression, enableAlignTimeWindow)
191+
tsExporter, err := exporter.New(*apikey, *apiSecret, organizationId, tags, enabledCompression, enableAlignTimeWindow, logger)
192+
if err != nil {
193+
return nil, err
194+
}
195+
err = tsExporter.StartExporter(ctx, *resolution, *extractionWindowMinutes, *destinationS3Bucket, *aggregationStat)
192196
if err != nil {
193197
message := "Error detected during data export"
194198
return &message, err

‎resources/test/localexecution.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,15 @@ func HandleRequest(ctx context.Context, dev bool) (*string, error) {
8989
logger.Infoln("tags:", *tags)
9090
}
9191

92-
err = exporter.StartExporter(ctx, logger, *apikey, *apiSecret, organizationId, tags, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX", true, true)
92+
tsExporter, err := exporter.New(*apikey, *apiSecret, organizationId, tags, true, true, logger)
9393
if err != nil {
9494
return nil, err
9595
}
96+
err = tsExporter.StartExporter(ctx, *resolution, TimeExtractionWindowMinutes, *destinationS3Bucket, "MAX")
97+
if err != nil {
98+
message := "Error detected during data export"
99+
return &message, err
100+
}
96101

97102
message := "Data exported successfully"
98103
return &message, nil

0 commit comments

Comments
 (0)
Please sign in to comment.