Skip to content

Commit 98f019a

Browse files
committed
Merge branch '1.4.x'
2 parents 6469519 + 4ec62f8 commit 98f019a

File tree

20 files changed

+52
-36
lines changed

20 files changed

+52
-36
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
# 1.4.3
2+
3+
## Fixes
4+
5+
- Subject names are now URL encoded when used in Schema Registry URLs.
6+
- Fixed a memory leak that occured when passing a `CancellationToken` to `Producer.ProduceAsync`.
7+
8+
19
# 1.4.2
210

311
## Fixes

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ confluent-kafka-dotnet is distributed via NuGet. We provide five packages:
4444
To install Confluent.Kafka from within Visual Studio, search for Confluent.Kafka in the NuGet Package Manager UI, or run the following command in the Package Manager Console:
4545

4646
```
47-
Install-Package Confluent.Kafka -Version 1.4.2
47+
Install-Package Confluent.Kafka -Version 1.4.3
4848
```
4949

5050
To add a reference to a dotnet core project, execute the following at the command line:
5151

5252
```
53-
dotnet add package -v 1.4.2 Confluent.Kafka
53+
dotnet add package -v 1.4.3 Confluent.Kafka
5454
```
5555

5656
Note: `Confluent.Kafka` depends on the `librdkafka.redist` package which provides a number of different builds of `librdkafka` that are compatible with [common platforms](https://github.com/edenhill/librdkafka/wiki/librdkafka.redist-NuGet-package-runtime-libraries). If you are on one of these platforms this will all work seamlessly (and you don't need to explicitly reference `librdkafka.redist`). If you are on a different platform, you may need to [build librdkafka](https://github.com/edenhill/librdkafka#building) manually (or acquire it via other means) and load it using the [Library.Load](https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Library.html#Confluent_Kafka_Library_Load_System_String_) method.

examples/AdminClient/AdminClient.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.3" /> -->
1313
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1414
</ItemGroup>
1515

examples/AvroBlogExamples/AvroBlogExamples.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.2" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.3" /> -->
1212
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
1313
</ItemGroup>
1414

examples/AvroGeneric/AvroGeneric.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.2" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.3" /> -->
1313
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
1414
</ItemGroup>
1515

examples/AvroSpecific/AvroSpecific.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.2" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.3" /> -->
1313
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
1414
</ItemGroup>
1515

examples/ConfluentCloud/ConfluentCloud.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> -->
10+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.3" /> -->
1111
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1212
</ItemGroup>
1313

examples/Consumer/Consumer.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.3" /> -->
1212
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1313
</ItemGroup>
1414

examples/JsonSerialization/JsonSerialization.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="1.4.2" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Json" Version="1.4.3" /> -->
1212
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj" />
1313
</ItemGroup>
1414

examples/MultiProducer/MultiProducer.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.3" /> -->
1212
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1313
</ItemGroup>
1414

examples/Producer/Producer.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.3" /> -->
1313
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1414
</ItemGroup>
1515

examples/Protobuf/Protobuf.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.2" /> -->
11+
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.3" /> -->
1212
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj" />
1313
</ItemGroup>
1414

examples/Transactions/Transactions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
</PropertyGroup>
1010

1111
<ItemGroup>
12-
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.2" /> -->
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.4.3" /> -->
1313
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
1414
<PackageReference Include="RocksDbSharp" Version="6.2.2" />
1515
<PackageReference Include="RocksDbNative" Version="6.2.2" />

src/Confluent.Kafka/Confluent.Kafka.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
<PackageId>Confluent.Kafka</PackageId>
1313
<Title>Confluent.Kafka</Title>
1414
<AssemblyName>Confluent.Kafka</AssemblyName>
15-
<VersionPrefix>1.4.2</VersionPrefix>
15+
<VersionPrefix>1.4.3</VersionPrefix>
1616
<TargetFrameworks>net45;net46;netcoreapp2.1;netstandard1.3;netstandard2.0</TargetFrameworks>
1717
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
1818
<GenerateDocumentationFile>true</GenerateDocumentationFile>

src/Confluent.Kafka/Producer.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -716,18 +716,19 @@ public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
716716
enableDeliveryReportKey ? message.Key : default(TKey),
717717
enableDeliveryReportValue ? message.Value : default(TValue));
718718

719+
if (cancellationToken != null && cancellationToken.CanBeCanceled)
720+
{
721+
handler.CancellationTokenRegistration
722+
= cancellationToken.Register(() => handler.TrySetCanceled());
723+
}
724+
719725
ProduceImpl(
720726
topicPartition.Topic,
721727
valBytes, 0, valBytes == null ? 0 : valBytes.Length,
722728
keyBytes, 0, keyBytes == null ? 0 : keyBytes.Length,
723729
message.Timestamp, topicPartition.Partition, headers,
724730
handler);
725731

726-
if (cancellationToken != null && cancellationToken.CanBeCanceled)
727-
{
728-
cancellationToken.Register(() => handler.TrySetCanceled());
729-
}
730-
731732
return await handler.Task.ConfigureAwait(false);
732733
}
733734
else
@@ -867,6 +868,8 @@ public TypedTaskDeliveryHandlerShim(string topic, TKey key, TValue val)
867868
Value = val;
868869
}
869870

871+
public CancellationTokenRegistration CancellationTokenRegistration;
872+
870873
public string Topic;
871874

872875
public TKey Key;
@@ -875,6 +878,11 @@ public TypedTaskDeliveryHandlerShim(string topic, TKey key, TValue val)
875878

876879
public void HandleDeliveryReport(DeliveryReport<Null, Null> deliveryReport)
877880
{
881+
if (CancellationTokenRegistration != null)
882+
{
883+
CancellationTokenRegistration.Dispose();
884+
}
885+
878886
if (deliveryReport == null)
879887
{
880888
#if NET45

src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<PackageId>Confluent.SchemaRegistry.Serdes.Avro</PackageId>
1414
<Title>Confluent.SchemaRegistry.Serdes.Avro</Title>
1515
<AssemblyName>Confluent.SchemaRegistry.Serdes.Avro</AssemblyName>
16-
<VersionPrefix>1.4.2</VersionPrefix>
16+
<VersionPrefix>1.4.3</VersionPrefix>
1717
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
1818
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
1919
<GenerateDocumentationFile>true</GenerateDocumentationFile>

src/Confluent.SchemaRegistry.Serdes.Json/Confluent.SchemaRegistry.Serdes.Json.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<PackageId>Confluent.SchemaRegistry.Serdes.Json</PackageId>
1414
<Title>Confluent.SchemaRegistry.Serdes.Json</Title>
1515
<AssemblyName>Confluent.SchemaRegistry.Serdes.Json</AssemblyName>
16-
<VersionPrefix>1.4.2</VersionPrefix>
16+
<VersionPrefix>1.4.3</VersionPrefix>
1717
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
1818
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
1919
<GenerateDocumentationFile>true</GenerateDocumentationFile>

src/Confluent.SchemaRegistry.Serdes.Protobuf/Confluent.SchemaRegistry.Serdes.Protobuf.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<PackageId>Confluent.SchemaRegistry.Serdes.Protobuf</PackageId>
1414
<Title>Confluent.SchemaRegistry.Serdes.Protobuf</Title>
1515
<AssemblyName>Confluent.SchemaRegistry.Serdes.Protobuf</AssemblyName>
16-
<VersionPrefix>1.4.2</VersionPrefix>
16+
<VersionPrefix>1.4.3</VersionPrefix>
1717
<TargetFrameworks>netstandard2.0;</TargetFrameworks>
1818
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
1919
<GenerateDocumentationFile>true</GenerateDocumentationFile>

src/Confluent.SchemaRegistry/Confluent.SchemaRegistry.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<PackageId>Confluent.SchemaRegistry</PackageId>
1414
<Title>Confluent.SchemaRegistry</Title>
1515
<AssemblyName>Confluent.SchemaRegistry</AssemblyName>
16-
<VersionPrefix>1.4.2</VersionPrefix>
16+
<VersionPrefix>1.4.3</VersionPrefix>
1717
<TargetFrameworks>netstandard1.4;netstandard2.0</TargetFrameworks>
1818
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
1919
<GenerateDocumentationFile>true</GenerateDocumentationFile>

src/Confluent.SchemaRegistry/Rest/RestService.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -279,32 +279,32 @@ public async Task<List<string>> GetSubjectsAsync()
279279
.ConfigureAwait(continueOnCapturedContext: false);
280280

281281
public async Task<List<int>> GetSubjectVersionsAsync(string subject)
282-
=> await RequestListOfAsync<int>($"subjects/{subject}/versions", HttpMethod.Get)
282+
=> await RequestListOfAsync<int>($"subjects/{WebUtility.UrlEncode(subject)}/versions", HttpMethod.Get)
283283
.ConfigureAwait(continueOnCapturedContext: false);
284284

285285
public async Task<RegisteredSchema> GetSchemaAsync(string subject, int version)
286-
=> SanitizeRegisteredSchema(await RequestAsync<RegisteredSchema>($"subjects/{subject}/versions/{version}", HttpMethod.Get)
286+
=> SanitizeRegisteredSchema(await RequestAsync<RegisteredSchema>($"subjects/{WebUtility.UrlEncode(subject)}/versions/{version}", HttpMethod.Get)
287287
.ConfigureAwait(continueOnCapturedContext: false));
288288

289289
public async Task<RegisteredSchema> GetLatestSchemaAsync(string subject)
290-
=> SanitizeRegisteredSchema(await RequestAsync<RegisteredSchema>($"subjects/{subject}/versions/latest", HttpMethod.Get)
290+
=> SanitizeRegisteredSchema(await RequestAsync<RegisteredSchema>($"subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Get)
291291
.ConfigureAwait(continueOnCapturedContext: false));
292292

293293
public async Task<int> RegisterSchemaAsync(string subject, Schema schema)
294294
=> schema.SchemaType == SchemaType.Avro
295295
// In the avro case, just send the schema string to maintain backards compatibility.
296-
? (await RequestAsync<SchemaId>($"subjects/{subject}/versions", HttpMethod.Post, new SchemaString(schema.SchemaString))
296+
? (await RequestAsync<SchemaId>($"subjects/{WebUtility.UrlEncode(subject)}/versions", HttpMethod.Post, new SchemaString(schema.SchemaString))
297297
.ConfigureAwait(continueOnCapturedContext: false)).Id
298-
: (await RequestAsync<SchemaId>($"subjects/{subject}/versions", HttpMethod.Post, schema)
298+
: (await RequestAsync<SchemaId>($"subjects/{WebUtility.UrlEncode(subject)}/versions", HttpMethod.Post, schema)
299299
.ConfigureAwait(continueOnCapturedContext: false)).Id;
300300

301301
// Checks whether a schema has been registered under a given subject.
302302
public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema schema, bool ignoreDeletedSchemas)
303303
=> SanitizeRegisteredSchema(schema.SchemaType == SchemaType.Avro
304304
// In the avro case, just send the schema string to maintain backards compatibility.
305-
? await RequestAsync<RegisteredSchema>($"subjects/{subject}?deleted={!ignoreDeletedSchemas}", HttpMethod.Post, new SchemaString(schema.SchemaString))
305+
? await RequestAsync<RegisteredSchema>($"subjects/{WebUtility.UrlEncode(subject)}?deleted={!ignoreDeletedSchemas}", HttpMethod.Post, new SchemaString(schema.SchemaString))
306306
.ConfigureAwait(continueOnCapturedContext: false)
307-
: await RequestAsync<RegisteredSchema>($"subjects/{subject}?deleted={!ignoreDeletedSchemas}", HttpMethod.Post, schema)
307+
: await RequestAsync<RegisteredSchema>($"subjects/{WebUtility.UrlEncode(subject)}?deleted={!ignoreDeletedSchemas}", HttpMethod.Post, schema)
308308
.ConfigureAwait(continueOnCapturedContext: false));
309309

310310
#endregion Subjects
@@ -314,18 +314,18 @@ public async Task<RegisteredSchema> LookupSchemaAsync(string subject, Schema sch
314314
public async Task<bool> TestCompatibilityAsync(string subject, int versionId, Schema schema)
315315
=> schema.SchemaType == SchemaType.Avro
316316
// In the avro case, just send the schema string to maintain backards compatibility.
317-
? (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{subject}/versions/{versionId}", HttpMethod.Post, new SchemaString(schema.SchemaString))
317+
? (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/{versionId}", HttpMethod.Post, new SchemaString(schema.SchemaString))
318318
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible
319-
: (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{subject}/versions/{versionId}", HttpMethod.Post, schema)
319+
: (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/{versionId}", HttpMethod.Post, schema)
320320
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible;
321321

322322

323323
public async Task<bool> TestLatestCompatibilityAsync(string subject, Schema schema)
324324
=> schema.SchemaType == SchemaType.Avro
325325
// In the avro case, just send the schema string to maintain backards compatibility.
326-
? (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{subject}/versions/latest", HttpMethod.Post, new SchemaString(schema.SchemaString))
326+
? (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Post, new SchemaString(schema.SchemaString))
327327
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible
328-
: (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{subject}/versions/latest", HttpMethod.Post, schema)
328+
: (await RequestAsync<CompatibilityCheck>($"compatibility/subjects/{WebUtility.UrlEncode(subject)}/versions/latest", HttpMethod.Post, schema)
329329
.ConfigureAwait(continueOnCapturedContext: false)).IsCompatible;
330330

331331
#endregion Compatibility
@@ -337,15 +337,15 @@ public async Task<Compatibility> GetGlobalCompatibilityAsync()
337337
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;
338338

339339
public async Task<Compatibility> GetCompatibilityAsync(string subject)
340-
=> (await RequestAsync<Config>($"config/{subject}", HttpMethod.Get)
340+
=> (await RequestAsync<Config>($"config/{WebUtility.UrlEncode(subject)}", HttpMethod.Get)
341341
.ConfigureAwait(continueOnCapturedContext: false)).CompatibilityLevel;
342342

343343
public async Task<Config> SetGlobalCompatibilityAsync(Compatibility compatibility)
344344
=> await RequestAsync<Config>("config", HttpMethod.Put, new Config(compatibility))
345345
.ConfigureAwait(continueOnCapturedContext: false);
346346

347347
public async Task<Config> SetCompatibilityAsync(string subject, Compatibility compatibility)
348-
=> await RequestAsync<Config>($"config/{subject}", HttpMethod.Put, new Config(compatibility))
348+
=> await RequestAsync<Config>($"config/{WebUtility.UrlEncode(subject)}", HttpMethod.Put, new Config(compatibility))
349349
.ConfigureAwait(continueOnCapturedContext: false);
350350

351351
#endregion Config

0 commit comments

Comments
 (0)