Skip to content

Feature/observe backup and restore #1115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jan 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/Nest/Domain/Responses/Observer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;

namespace Nest
{
public class Observer<T> : IObserver<T>
{
private readonly Action<T> _onNext;
private readonly Action<Exception> _onError;
private readonly Action _completed;

public Observer(Action<T> onNext = null, Action<Exception> onError = null, Action completed = null)
{
_onNext = onNext;
_onError = onError;
_completed = completed;
}

public void OnNext(T value)
{
if (this._onNext != null) this._onNext(value);
}

public void OnError(Exception error)
{
if (this._onError != null) this._onError(error);
}

public void OnCompleted()
{
if (this._completed != null) this._completed();
}
}
}
237 changes: 237 additions & 0 deletions src/Nest/Domain/Responses/RestoreObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading;

namespace Nest
{
public class RestoreObservable : IDisposable, IObservable<IRecoveryStatusResponse>
{
private readonly IElasticClient _elasticClient;
private readonly IRestoreRequest _restoreRequest;
private readonly TimeSpan _interval = TimeSpan.FromSeconds(2);
private Timer _timer;
private bool _disposed;
private readonly RestoreStatusHumbleObject _restoreStatusHumbleObject;
private EventHandler<RestoreNextEventArgs> _nextEventHandlers;
private EventHandler<RestoreCompletedEventArgs> _completedEentHandlers;
private EventHandler<RestoreErrorEventArgs> _errorEventHandlers;

public RestoreObservable(IElasticClient elasticClient, IRestoreRequest restoreRequest)
{
elasticClient.ThrowIfNull("elasticClient");
restoreRequest.ThrowIfNull("restoreRequest");

_elasticClient = elasticClient;
_restoreRequest = restoreRequest;

_restoreStatusHumbleObject = new RestoreStatusHumbleObject(elasticClient, restoreRequest);
_restoreStatusHumbleObject.Completed += StopTimer;
_restoreStatusHumbleObject.Error += StopTimer;
}

public RestoreObservable(IElasticClient elasticClient, IRestoreRequest restoreRequest, TimeSpan interval)
: this(elasticClient, restoreRequest)
{
interval.ThrowIfNull("interval");
if (interval.Ticks < 0) throw new ArgumentOutOfRangeException("interval");

_interval = interval;
}

public IDisposable Subscribe(IObserver<IRecoveryStatusResponse> observer)
{
observer.ThrowIfNull("observer");

try
{
_restoreRequest.RequestParameters.WaitForCompletion(false);
var restoreResponse = this._elasticClient.Restore(_restoreRequest);

if (!restoreResponse.IsValid)
throw new RestoreException(restoreResponse.ConnectionStatus);

EventHandler<RestoreNextEventArgs> onNext = (sender, args) => observer.OnNext(args.RecoveryStatusResponse);
EventHandler<RestoreCompletedEventArgs> onCompleted = (sender, args) => observer.OnCompleted();
EventHandler<RestoreErrorEventArgs> onError = (sender, args) => observer.OnError(args.Exception);

_nextEventHandlers = onNext;
_completedEentHandlers = onCompleted;
_errorEventHandlers = onError;

_restoreStatusHumbleObject.Next += onNext;
_restoreStatusHumbleObject.Completed += onCompleted;
_restoreStatusHumbleObject.Error += onError;

_timer = new Timer(Restore, observer, (long)_interval.TotalMilliseconds, Timeout.Infinite);
}
catch (Exception exception)
{
observer.OnError(exception);
}

return this;
}

private void Restore(object state)
{
var observer = state as IObserver<IRecoveryStatusResponse>;

if (observer == null) throw new ArgumentException("state");

try
{
var watch = new Stopwatch();
watch.Start();

_restoreStatusHumbleObject.CheckStatus();

_timer.Change(Math.Max(0, (long)_interval.TotalMilliseconds - watch.ElapsedMilliseconds), Timeout.Infinite);
}
catch (Exception exception)
{
observer.OnError(exception);
}
}

private void StopTimer(object sender, EventArgs restoreCompletedEventArgs)
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
}

public void Dispose()
{
Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;

_timer.Dispose();
_restoreStatusHumbleObject.Next -= _nextEventHandlers;
_restoreStatusHumbleObject.Completed -= _completedEentHandlers;
_restoreStatusHumbleObject.Error -= _errorEventHandlers;

_restoreStatusHumbleObject.Completed -= StopTimer;
_restoreStatusHumbleObject.Error -= StopTimer;

_disposed = true;
}

~RestoreObservable()
{
Dispose(false);
}
}

public class RestoreNextEventArgs : EventArgs
{
public IRecoveryStatusResponse RecoveryStatusResponse { get; private set; }

public RestoreNextEventArgs(IRecoveryStatusResponse recoveryStatusResponse)
{
RecoveryStatusResponse = recoveryStatusResponse;
}
}

public class RestoreCompletedEventArgs : EventArgs
{
public IRecoveryStatusResponse RecoveryStatusResponse { get; private set; }

public RestoreCompletedEventArgs(IRecoveryStatusResponse recoveryStatusResponse)
{
RecoveryStatusResponse = recoveryStatusResponse;
}
}

public class RestoreErrorEventArgs : EventArgs
{
public Exception Exception { get; private set; }

public RestoreErrorEventArgs(Exception exception)
{
Exception = exception;
}
}

public class RestoreStatusHumbleObject
{
private readonly IElasticClient _elasticClient;
private readonly IRestoreRequest _restoreRequest;
private string _renamePattern;
private string _renameReplacement;

public event EventHandler<RestoreCompletedEventArgs> Completed;
public event EventHandler<RestoreErrorEventArgs> Error;
public event EventHandler<RestoreNextEventArgs> Next;

public RestoreStatusHumbleObject(IElasticClient elasticClient, IRestoreRequest restoreRequest)
{
elasticClient.ThrowIfNull("elasticClient");
restoreRequest.ThrowIfNull("restoreRequest");

_elasticClient = elasticClient;
_restoreRequest = restoreRequest;

_renamePattern = string.IsNullOrEmpty(_restoreRequest.RenamePattern) ? string.Empty : _restoreRequest.RenamePattern;
_renameReplacement = string.IsNullOrEmpty(_restoreRequest.RenameReplacement) ? string.Empty : _restoreRequest.RenameReplacement;
}

public void CheckStatus()
{
try
{
var indices =
_restoreRequest.Indices.Select(
x => new IndexNameMarker
{
Name = Regex.Replace(x.Name, _renamePattern, _renameReplacement),
Type = x.Type
})
.ToArray();

var recoveryStatus = _elasticClient.RecoveryStatus(new RecoveryStatusRequest
{
Detailed = true,
Indices = indices
});

if (!recoveryStatus.IsValid)
throw new RestoreException(recoveryStatus.ConnectionStatus);

if (recoveryStatus.Indices.All(x => x.Value.Shards.All(s => s.Index.Files.Recovered == s.Index.Files.Total)))
{
OnCompleted(new RestoreCompletedEventArgs(recoveryStatus));
return;
}

OnNext(new RestoreNextEventArgs(recoveryStatus));
}
catch (Exception exception)
{
OnError(new RestoreErrorEventArgs(exception));
}
}

protected virtual void OnNext(RestoreNextEventArgs nextEventArgs)
{
var handler = Next;
if (handler != null) handler(this, nextEventArgs);
}

protected virtual void OnCompleted(RestoreCompletedEventArgs completedEventArgs)
{
var handler = Completed;
if (handler != null) handler(this, completedEventArgs);
}

protected virtual void OnError(RestoreErrorEventArgs errorEventArgs)
{
var handler = Error;
if (handler != null) handler(this, errorEventArgs);
}
}
}
Loading