|
14 | 14 |
|
15 | 15 | # first party
|
16 | 16 | import delphi.operations.secrets as secrets
|
17 |
| -from delphi.epidata.common.logger import get_structured_logger |
| 17 | +from delphi_logger import get_structured_logger |
18 | 18 | from delphi.epidata.common.covidcast_row import CovidcastRow
|
19 | 19 |
|
20 | 20 |
|
@@ -117,28 +117,28 @@ def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20,
|
117 | 117 | get_structured_logger("insert_or_update_batch").fatal(err_msg)
|
118 | 118 | raise DBLoadStateException(err_msg)
|
119 | 119 |
|
120 |
| - # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and |
| 120 | + # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and |
121 | 121 | # `is_latest_issue` is hardcoded to 1 (which is temporary and addressed later in this method)
|
122 | 122 | insert_into_loader_sql = f'''
|
123 | 123 | INSERT INTO `{self.load_table}`
|
124 | 124 | (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`,
|
125 |
| - `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, |
| 125 | + `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, |
126 | 126 | `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`)
|
127 | 127 | VALUES
|
128 |
| - (%s, %s, %s, %s, %s, %s, |
129 |
| - UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, |
| 128 | + (%s, %s, %s, %s, %s, %s, |
| 129 | + UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, |
130 | 130 | 1, %s, %s, %s)
|
131 | 131 | '''
|
132 | 132 |
|
133 | 133 | # all load table entries are already marked "is_latest_issue".
|
134 | 134 | # if an entry in the load table is NOT in the latest table, it is clearly now the latest value for that key (so we do nothing (thanks to INNER join)).
|
135 | 135 | # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load.
|
136 | 136 | fix_is_latest_issue_sql = f'''
|
137 |
| - UPDATE |
138 |
| - `{self.load_table}` JOIN `{self.latest_view}` |
139 |
| - USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) |
140 |
| - SET `{self.load_table}`.`is_latest_issue`=0 |
141 |
| - WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` |
| 137 | + UPDATE |
| 138 | + `{self.load_table}` JOIN `{self.latest_view}` |
| 139 | + USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) |
| 140 | + SET `{self.load_table}`.`is_latest_issue`=0 |
| 141 | + WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` |
142 | 142 | '''
|
143 | 143 |
|
144 | 144 | # TODO: consider handling cc_rows as a generator instead of a list
|
|
0 commit comments