Skip to content

fixed code #3162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,81 @@
* THE SOFTWARE.
*/
package com.iluwatar.commander;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;








Expand Down





Expand Up

@@ -59,6 +62,7 @@ public interface HandleErrorIssue<T> {

/**
* Retry pattern.
*
* @param <T> is the type of object passed into HandleErrorIssue as a parameter.
*/

public class Retry<T> {

/**
* Operation Interface will define method to be implemented.
*/

public interface Operation {
void operation(List<Exception> list) throws Exception;
}

/**
* HandleErrorIssue defines how to handle errors.
*
* @param <T> is the type of object to be passed into the method as parameter.
*/

public interface HandleErrorIssue<T> {
void handleIssue(T obj, Exception e);
}

private static final SecureRandom RANDOM = new SecureRandom();

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Operation op;
private final HandleErrorIssue<T> handleError;
private final int maxAttempts;







Expand Down





Expand Up

@@ -86,26 +90,25 @@ public interface HandleErrorIssue<T> {

private final long maxDelay;
private final AtomicInteger attempts;
private final Predicate<Exception> test;
private final List<Exception> errors;

Retry(Operation op, HandleErrorIssue<T> handleError, int maxAttempts,
long maxDelay, Predicate<Exception>... ignoreTests) {
this.op = op;
Expand All @@ -77,7 +108,6 @@ public interface HandleErrorIssue<T> {
this.test = Arrays.stream(ignoreTests).reduce(Predicate::or).orElse(e -> false);
this.errors = new ArrayList<>();
}

/**
* Performing the operation with retries.
*
Expand All @@ -86,26 +116,25 @@ public interface HandleErrorIssue<T> {
*/

public void perform(List<Exception> list, T obj) {
do {
scheduler.schedule(() -> {
try {
op.operation(list);
return;
} catch (Exception e) {
}catch (Exception e){
this.errors.add(e);
if (this.attempts.incrementAndGet() >= this.maxAttempts || !this.test.test(e)) {
this.handleError.handleIssue(obj, e);
scheduler.shutdown();
return; //return here... don't go further
}
try {
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
long delay = Math.min(testDelay, this.maxDelay);
Thread.sleep(delay);
} catch (InterruptedException f) {
//ignore
}
perform(list, obj);
}
} while (true);
}, calculateDelay(), TimeUnit.MILLISECONDS);
}

private long calculateDelay(){
long testDelay =
(long) Math.pow(2, this.attempts.intValue()) * 1000 + RANDOM.nextInt(1000);
return Math.min(testDelay, this.maxDelay);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;







Expand All

@@ -45,7 +45,7 @@ public class LogAggregator {

/**
* Responsible for collecting and buffering logs from different services.
* Once the logs reach a certain threshold or after a certain time interval,
Expand All @@ -40,15 +50,31 @@
*/
@Slf4j
public class LogAggregator {

private static final int BUFFER_THRESHOLD = 3;
private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger logCount = new AtomicInteger(0);

/**







Expand Down





Expand Up

@@ -90,8 +90,8 @@ public void collectLog(LogEntry logEntry) {

* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
Expand All @@ -59,7 +85,6 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.minLogLevel = minLogLevel;
startBufferFlusher();
}

/**
* Collects a given log entry, and filters it by the defined log level.
*
Expand All @@ -70,33 +95,39 @@ public void collectLog(LogEntry logEntry) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
}

if (logEntry.getLevel().compareTo(minLogLevel) < 0) {
LOGGER.debug("Log level below threshold. Skipping.");
return;
}

buffer.offer(logEntry);

if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
}
}

/**
* Stops the log aggregator service and flushes any remaining logs to
* the central log store.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
}
flushBuffer();
}







Expand All

@@ -106,15 +106,7 @@ private void flushBuffer() {

}
private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
Expand All @@ -106,15 +137,7 @@ private void flushBuffer() {
}

private void startBufferFlusher() {
executorService.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
//flush every 5 seconds
scheduler.scheduleWithFixedDelay(this::flushBuffer, 0, 5000, TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,26 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

/**







Expand Down





Expand Up

@@ -104,12 +103,7 @@ public static void main(String[] args) {

* Many solutions in the cloud involve running tasks that invoke services. In this environment, if a
* service is subjected to intermittent heavy loads, it can cause performance or reliability
* issues.
Expand Down Expand Up @@ -60,58 +76,54 @@
*/
@Slf4j
public class App {

//Executor shut down time limit.
private static final int SHUTDOWN_TIME = 15;

/**
* Program entry point.
*
* @param args command line args
*/
public static void main(String[] args) {

// An Executor that provides methods to manage termination and methods that can
// produce a Future for tracking progress of one or more asynchronous tasks.
ExecutorService executor = null;

try {
// Create a MessageQueue object.
var msgQueue = new MessageQueue();

LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads.");

// Create three TaskGenerator threads. Each of them will submit different number of jobs.
final var taskRunnable1 = new TaskGenerator(msgQueue, 5);
final var taskRunnable2 = new TaskGenerator(msgQueue, 1);
final var taskRunnable3 = new TaskGenerator(msgQueue, 2);

// Create e service which should process the submitted jobs.
final var srvRunnable = new ServiceExecutor(msgQueue);

// Create a ThreadPool of 2 threads and
// submit all Runnable task for execution to executor
executor = Executors.newFixedThreadPool(2);
executor.submit(taskRunnable1);
executor.submit(taskRunnable2);
executor.submit(taskRunnable3);

// submitting serviceExecutor thread to the Executor service.
executor.submit(srvRunnable);

// Initiates an orderly shutdown.
LOGGER.info("Initiating shutdown."
+ " Executor will shutdown only after all the Threads are completed.");
executor.shutdown();

// Wait for SHUTDOWN_TIME seconds for all the threads to complete
// their tasks and then shut down the executor and then exit.
if (!executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
LOGGER.info("Executor was shut down and Exiting.");
executor.shutdownNow();
}
srvRunnable.shutdown(SHUTDOWN_TIME);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}







Expand Down



}
}
Loading
Loading