Skip to content

Commit 5961b27

Browse files
committed
Decouple SimpleQuery from ExtendedQuery
1 parent 4d01e30 commit 5961b27

File tree

8 files changed

+982
-251
lines changed

8 files changed

+982
-251
lines changed

Sources/PostgresNIO/Connection/PostgresConnection.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ extension PostgresConnection {
439439
}
440440

441441
/// Run a simple text-only query on the Postgres server the connection is connected to.
442-
/// WARNING: This functions is not yet API and is incomplete.
442+
/// WARNING: This function is not yet API and is incomplete.
443443
/// The return type will change to another stream.
444444
///
445445
/// - Parameters:
@@ -460,13 +460,13 @@ extension PostgresConnection {
460460
logger[postgresMetadataKey: .connectionID] = "\(self.id)"
461461

462462
let promise = self.channel.eventLoop.makePromise(of: PSQLRowStream.self)
463-
let context = ExtendedQueryContext(
464-
simpleQuery: query,
463+
let context = SimpleQueryContext(
464+
query: query,
465465
logger: logger,
466466
promise: promise
467467
)
468468

469-
self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
469+
self.channel.write(HandlerTask.simpleQuery(context), promise: nil)
470470

471471
do {
472472
return try await promise.futureResult.map({ $0.asyncSequence() }).get()

Sources/PostgresNIO/New/Connection State Machine/ConnectionStateMachine.swift

+207-35
Large diffs are not rendered by default.

Sources/PostgresNIO/New/Connection State Machine/ExtendedQueryStateMachine.swift

+25-99
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ struct ExtendedQueryStateMachine {
2929
case sendParseDescribeBindExecuteSync(PostgresQuery)
3030
case sendParseDescribeSync(name: String, query: String, bindingDataTypes: [PostgresDataType])
3131
case sendBindExecuteSync(PSQLExecuteStatement)
32-
case sendQuery(String)
33-
32+
3433
// --- general actions
3534
case failQuery(EventLoopPromise<PSQLRowStream>, with: PSQLError)
3635
case succeedQuery(EventLoopPromise<PSQLRowStream>, with: QueryResult)
@@ -86,12 +85,6 @@ struct ExtendedQueryStateMachine {
8685
state = .messagesSent(queryContext)
8786
return .sendParseDescribeSync(name: name, query: query, bindingDataTypes: bindingDataTypes)
8887
}
89-
90-
case .simpleQuery(let query, _):
91-
return self.avoidingStateMachineCoW { state -> Action in
92-
state = .messagesSent(queryContext)
93-
return .sendQuery(query)
94-
}
9588
}
9689
}
9790

@@ -112,7 +105,7 @@ struct ExtendedQueryStateMachine {
112105

113106
self.isCancelled = true
114107
switch queryContext.query {
115-
case .unnamed(_, let eventLoopPromise), .executeStatement(_, let eventLoopPromise), .simpleQuery(_, let eventLoopPromise):
108+
case .unnamed(_, let eventLoopPromise), .executeStatement(_, let eventLoopPromise):
116109
return .failQuery(eventLoopPromise, with: .queryCancelled)
117110

118111
case .prepareStatement(_, _, _, let eventLoopPromise):
@@ -178,19 +171,11 @@ struct ExtendedQueryStateMachine {
178171
state = .noDataMessageReceived(queryContext)
179172
return .succeedPreparedStatementCreation(promise, with: nil)
180173
}
181-
182-
case .simpleQuery:
183-
return self.setAndFireError(.unexpectedBackendMessage(.noData))
184174
}
185175
}
186176

187177
mutating func rowDescriptionReceived(_ rowDescription: RowDescription) -> Action {
188-
let queryContext: ExtendedQueryContext
189-
switch self.state {
190-
case .messagesSent(let extendedQueryContext),
191-
.parameterDescriptionReceived(let extendedQueryContext):
192-
queryContext = extendedQueryContext
193-
default:
178+
guard case .parameterDescriptionReceived(let queryContext) = self.state else {
194179
return self.setAndFireError(.unexpectedBackendMessage(.rowDescription(rowDescription)))
195180
}
196181

@@ -213,7 +198,7 @@ struct ExtendedQueryStateMachine {
213198
}
214199

215200
switch queryContext.query {
216-
case .unnamed, .executeStatement, .simpleQuery:
201+
case .unnamed, .executeStatement:
217202
return .wait
218203

219204
case .prepareStatement(_, _, _, let eventLoopPromise):
@@ -234,9 +219,6 @@ struct ExtendedQueryStateMachine {
234219

235220
case .prepareStatement:
236221
return .evaluateErrorAtConnectionLevel(.unexpectedBackendMessage(.bindComplete))
237-
238-
case .simpleQuery:
239-
return self.setAndFireError(.unexpectedBackendMessage(.bindComplete))
240222
}
241223

242224
case .noDataMessageReceived(let queryContext):
@@ -276,40 +258,20 @@ struct ExtendedQueryStateMachine {
276258
return .wait
277259
}
278260

279-
case .rowDescriptionReceived(let queryContext, let columns):
280-
switch queryContext.query {
281-
case .simpleQuery(_, let eventLoopPromise):
282-
// When receiving a data row, we must ensure that the data row column count
283-
// matches the previously received row description column count.
284-
guard dataRow.columnCount == columns.count else {
285-
return self.setAndFireError(.unexpectedBackendMessage(.dataRow(dataRow)))
286-
}
287-
288-
return self.avoidingStateMachineCoW { state -> Action in
289-
var demandStateMachine = RowStreamStateMachine()
290-
demandStateMachine.receivedRow(dataRow)
291-
state = .streaming(columns, demandStateMachine)
292-
let result = QueryResult(value: .rowDescription(columns), logger: queryContext.logger)
293-
return .succeedQuery(eventLoopPromise, with: result)
294-
}
295-
296-
case .unnamed, .executeStatement, .prepareStatement:
297-
return self.setAndFireError(.unexpectedBackendMessage(.dataRow(dataRow)))
298-
}
299-
300261
case .drain(let columns):
301262
guard dataRow.columnCount == columns.count else {
302263
return self.setAndFireError(.unexpectedBackendMessage(.dataRow(dataRow)))
303264
}
304265
// we ignore all rows and wait for readyForQuery
305266
return .wait
306-
267+
307268
case .initialized,
308269
.messagesSent,
309270
.parseCompleteReceived,
310271
.parameterDescriptionReceived,
311272
.noDataMessageReceived,
312273
.emptyQueryResponseReceived,
274+
.rowDescriptionReceived,
313275
.bindCompleteReceived,
314276
.commandComplete,
315277
.error:
@@ -330,36 +292,10 @@ struct ExtendedQueryStateMachine {
330292
return .succeedQuery(eventLoopPromise, with: result)
331293
}
332294

333-
case .prepareStatement, .simpleQuery:
295+
case .prepareStatement:
334296
preconditionFailure("Invalid state: \(self.state)")
335297
}
336-
337-
case .messagesSent(let context):
338-
switch context.query {
339-
case .simpleQuery(_, let eventLoopGroup):
340-
return self.avoidingStateMachineCoW { state -> Action in
341-
state = .commandComplete(commandTag: commandTag)
342-
let result = QueryResult(value: .noRows(.tag(commandTag)), logger: context.logger)
343-
return .succeedQuery(eventLoopGroup, with: result)
344-
}
345-
346-
case .unnamed, .executeStatement, .prepareStatement:
347-
return self.setAndFireError(.unexpectedBackendMessage(.commandComplete(commandTag)))
348-
}
349-
350-
case .rowDescriptionReceived(let context, _):
351-
switch context.query {
352-
case .simpleQuery(_, let eventLoopPromise):
353-
return self.avoidingStateMachineCoW { state -> Action in
354-
state = .commandComplete(commandTag: commandTag)
355-
let result = QueryResult(value: .noRows(.tag(commandTag)), logger: context.logger)
356-
return .succeedQuery(eventLoopPromise, with: result)
357-
}
358-
359-
case .unnamed, .executeStatement, .prepareStatement:
360-
return self.setAndFireError(.unexpectedBackendMessage(.commandComplete(commandTag)))
361-
}
362-
298+
363299
case .streaming(_, var demandStateMachine):
364300
return self.avoidingStateMachineCoW { state -> Action in
365301
state = .commandComplete(commandTag: commandTag)
@@ -370,12 +306,14 @@ struct ExtendedQueryStateMachine {
370306
precondition(self.isCancelled)
371307
self.state = .commandComplete(commandTag: commandTag)
372308
return .wait
373-
309+
374310
case .initialized,
311+
.messagesSent,
375312
.parseCompleteReceived,
376313
.parameterDescriptionReceived,
377314
.noDataMessageReceived,
378315
.emptyQueryResponseReceived,
316+
.rowDescriptionReceived,
379317
.commandComplete,
380318
.error:
381319
return self.setAndFireError(.unexpectedBackendMessage(.commandComplete(commandTag)))
@@ -385,32 +323,20 @@ struct ExtendedQueryStateMachine {
385323
}
386324

387325
mutating func emptyQueryResponseReceived() -> Action {
388-
switch self.state {
389-
case .bindCompleteReceived(let queryContext):
390-
switch queryContext.query {
391-
case .unnamed(_, let eventLoopPromise),
392-
.executeStatement(_, let eventLoopPromise):
393-
return self.avoidingStateMachineCoW { state -> Action in
394-
state = .emptyQueryResponseReceived
395-
let result = QueryResult(value: .noRows(.emptyResponse), logger: queryContext.logger)
396-
return .succeedQuery(eventLoopPromise, with: result)
397-
}
326+
guard case .bindCompleteReceived(let queryContext) = self.state else {
327+
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
328+
}
398329

399-
case .prepareStatement, .simpleQuery:
400-
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
401-
}
402-
case .messagesSent(let queryContext):
403-
switch queryContext.query {
404-
case .simpleQuery(_, let eventLoopPromise):
405-
return self.avoidingStateMachineCoW { state -> Action in
406-
state = .emptyQueryResponseReceived
407-
let result = QueryResult(value: .noRows(.emptyResponse), logger: queryContext.logger)
408-
return .succeedQuery(eventLoopPromise, with: result)
409-
}
410-
case .unnamed, .executeStatement, .prepareStatement:
411-
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
330+
switch queryContext.query {
331+
case .unnamed(_, let eventLoopPromise),
332+
.executeStatement(_, let eventLoopPromise):
333+
return self.avoidingStateMachineCoW { state -> Action in
334+
state = .emptyQueryResponseReceived
335+
let result = QueryResult(value: .noRows(.emptyResponse), logger: queryContext.logger)
336+
return .succeedQuery(eventLoopPromise, with: result)
412337
}
413-
default:
338+
339+
case .prepareStatement(_, _, _, _):
414340
return self.setAndFireError(.unexpectedBackendMessage(.emptyQueryResponse))
415341
}
416342
}
@@ -571,7 +497,7 @@ struct ExtendedQueryStateMachine {
571497
return .evaluateErrorAtConnectionLevel(error)
572498
} else {
573499
switch context.query {
574-
case .unnamed(_, let eventLoopPromise), .executeStatement(_, let eventLoopPromise), .simpleQuery(_, let eventLoopPromise):
500+
case .unnamed(_, let eventLoopPromise), .executeStatement(_, let eventLoopPromise):
575501
return .failQuery(eventLoopPromise, with: error)
576502
case .prepareStatement(_, _, _, let eventLoopPromise):
577503
return .failPreparedStatementCreation(eventLoopPromise, with: error)
@@ -610,7 +536,7 @@ struct ExtendedQueryStateMachine {
610536
switch context.query {
611537
case .prepareStatement:
612538
return true
613-
case .unnamed, .executeStatement, .simpleQuery:
539+
case .unnamed, .executeStatement:
614540
return false
615541
}
616542

0 commit comments

Comments
 (0)