Skip to content

Commit 53b2e58

Browse files
authored
Populate property last value if not present in data extraction (#12)
1 parent 2d3675b commit 53b2e58

File tree

10 files changed

+513
-101
lines changed

10 files changed

+513
-101
lines changed

app/exporter/exporter.go

+36-23
Original file line numberDiff line numberDiff line change
@@ -28,74 +28,87 @@ import (
2828
"github.com/sirupsen/logrus"
2929
)
3030

31-
func StartExporter(
32-
ctx context.Context,
33-
logger *logrus.Entry,
34-
key, secret, orgid string,
35-
tagsF *string,
36-
resolution, timeWindowMinutes int,
37-
destinationS3Bucket string,
38-
aggregationStat string,
39-
compress, enableAlignTimeWindow bool) error {
31+
type samplesExporter struct {
32+
iotClient *iot.Client
33+
logger *logrus.Entry
34+
tagsF *string
35+
compress bool
36+
enableAlignTimeWindow bool
37+
}
4038

41-
// Init client
39+
func New(key, secret, orgid string, tagsF *string, compress, enableAlignTimeWindow bool, logger *logrus.Entry) (*samplesExporter, error) {
4240
iotcl, err := iot.NewClient(key, secret, orgid)
4341
if err != nil {
44-
return err
42+
return nil, err
4543
}
4644

47-
if tagsF != nil {
48-
logger.Infoln("Filtering things linked to configured account using tags: ", *tagsF)
45+
return &samplesExporter{
46+
iotClient: iotcl,
47+
logger: logger,
48+
tagsF: tagsF,
49+
compress: compress,
50+
enableAlignTimeWindow: enableAlignTimeWindow,
51+
}, nil
52+
}
53+
54+
func (s *samplesExporter) StartExporter(
55+
ctx context.Context,
56+
resolution, timeWindowMinutes int,
57+
destinationS3Bucket string,
58+
aggregationStat string) error {
59+
60+
if s.tagsF != nil {
61+
s.logger.Infoln("Filtering things linked to configured account using tags: ", *s.tagsF)
4962
} else {
50-
logger.Infoln("Importing all things linked to configured account")
63+
s.logger.Infoln("Importing all things linked to configured account")
5164
}
5265

53-
things, err := iotcl.ThingList(ctx, nil, nil, true, utils.ParseTags(tagsF))
66+
things, err := s.iotClient.ThingList(ctx, nil, nil, true, utils.ParseTags(s.tagsF))
5467
if err != nil {
5568
return err
5669
}
5770
thingsMap := make(map[string]iotclient.ArduinoThing, len(things))
5871
for _, thing := range things {
59-
logger.Infoln(" Thing: ", thing.Id, thing.Name)
72+
s.logger.Infoln(" Thing: ", thing.Id, thing.Name)
6073
thingsMap[thing.Id] = thing
6174
}
6275

6376
// Extract data points from thing and push to S3
64-
tsextractorClient := tsextractor.New(iotcl, logger)
77+
tsextractorClient := tsextractor.New(s.iotClient, s.logger)
6578

6679
// Open s3 output writer
6780
s3cl, err := s3.NewS3Client(destinationS3Bucket)
6881
if err != nil {
6982
return err
7083
}
7184

72-
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, enableAlignTimeWindow); err != nil {
85+
if writer, from, err := tsextractorClient.ExportTSToFile(ctx, timeWindowMinutes, thingsMap, resolution, aggregationStat, s.enableAlignTimeWindow); err != nil {
7386
if writer != nil {
7487
writer.Close()
7588
defer writer.Delete()
7689
}
77-
logger.Error("Error aligning time series samples: ", err)
90+
s.logger.Error("Error aligning time series samples: ", err)
7891
return err
7992
} else {
8093
writer.Close()
8194
defer writer.Delete()
8295

8396
fileToUpload := writer.GetFilePath()
8497
destinationKeyFormat := "%s/%s.csv"
85-
if compress {
86-
logger.Infof("Compressing file: %s\n", fileToUpload)
98+
if s.compress {
99+
s.logger.Infof("Compressing file: %s\n", fileToUpload)
87100
compressedFile, err := utils.GzipFileCompression(fileToUpload)
88101
if err != nil {
89102
return err
90103
}
91104
fileToUpload = compressedFile
92-
logger.Infof("Generated compressed file: %s\n", fileToUpload)
105+
s.logger.Infof("Generated compressed file: %s\n", fileToUpload)
93106
destinationKeyFormat = "%s/%s.csv.gz"
94107
defer func(f string) { os.Remove(f) }(fileToUpload)
95108
}
96109

97110
destinationKey := fmt.Sprintf(destinationKeyFormat, from.Format("2006-01-02"), from.Format("2006-01-02-15-04"))
98-
logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
111+
s.logger.Infof("Uploading file %s to bucket %s/%s\n", fileToUpload, s3cl.DestinationBucket(), destinationKey)
99112
if err := s3cl.WriteFile(ctx, destinationKey, fileToUpload); err != nil {
100113
return err
101114
}

0 commit comments

Comments
 (0)