如何在 Java 8 中创建一个阻塞的背景加载器?

共6个回答, 标签: java multithreading java-8

赏金3 天后到期。这个问题的答案可以享受 200 声望奖励。罗兰正在寻找一个更详细的答案对于这个问题: 请提供一个工作代码示例。

问题

如何在 Java 8 中创建一个合适的背景加载器?条件:

  • 数据应该在后台加载
  • 加载后应该显示数据
  • 加载数据时,不应接受进一步的请求
  • 如果在加载数据时有请求,则应在某个超时 (例如 5 秒) 后安排另一次加载

目的是让重新加载请求被接受,但不是数据库被请求淹没。

MCVE

这是一个 MCVE。它由一个后台任务组成,该任务通过简单地调用 Thread.sleep 2 秒来模拟加载。任务被安排在每秒钟,这自然会导致背景加载任务的重叠,这应该避免。

公共类 LoadInBackgroundExample {

/* *
* 一个简单的后台任务,应该执行数据加载操作。在这个最小的例子中,它只是调用 Thread.sleep
*/
公共静态类 BackgroundTask 实现可运行 {

私人国际编号;

公共背景任务 (int id) {
这个?id = id;
}

/* *
* 睡眠给定的时间来模拟加载。
*/
@ 覆盖
调用父类的 () {

尝试 {

System.out.println (“开局号" id ":" 主题.currentThread ());

长睡眠时间 = 2000;
线程。睡眠 (睡眠时间);

} Catch (中断感知 e) {
E.printStackTrace ();
} 最后 {
System.out.println (“完成 #" id ":" 主题.currentThread ());
}

}
}

/* *
* CompletableFuture 模拟加载和显示数据。
* @Param taskId 当前任务的标识符
*/
公共静态无效 loadInBackground (int taskId) {

//创建加载任务
背景任务 = 新的背景任务 (taskId);

//“加载” 数据异步
Compleablefuture compleablefuture = CompletableFuture.supplyAsync (新供应商 () {

@ 覆盖
公众取得指定 () {

Compleablefuture = compleablefuture.runAsync (背景任务);

尝试 {

未来.get ();

} Catch (InterruptedException | ExecutionException e) {
E.printStackTrace ();
}

返回 "任务" backgroundTask。编号;
}
});

//加载数据后显示数据
未来 CompletableFuture = completableFuture。thenAccept (x-> {

System.out.println (“后台任务已完成:” x);

});

}


公共静态无效的主要 (串 [] args) {

//Runnable 每秒调用后台加载器
触发优先级 = 新的运行 () {

Int taskId = 0;

公共无效运行 () {

LoadInBackground (taskId);

}
};

创建调度程序
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool (1);
Scheduedfuture beeperHandle = scheduler.scheduleAtFixedRate (触发器,0,1,timunit.SECONDS);

//10 秒后取消 scheudler 和应用程序
Scheduler.schedule ()-> beeperHandle.cancel (true),10,timunit.SECONDS);

Try {
BeeperHandle.get ();
} Catch (Throwable th) {
}

System.out.println (“取消”);
系统。退出 (0);
}

}

输出:

开始 #0: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
开始 #1: 线程 [ForkJoinPool.CommonPool-worker-4,5,main]
开始 #2: 线程 [ForkJoinPool.CommonPool-worker-6,5,main]
完成 #0: 线程 [ForkJoinPool.commonPool-wor
第1个答案

您已经有一个线程池来执行任务。 在另一个异步执行程序中运行任务并不一定复杂 (ForkJoinPool when you use CompletableFuture)

让它变得简单:

公共静态 void loadInBackground (int taskId) {
//创建加载任务
背景任务 = 新的背景任务 (taskId);
//不需要在异步运行,因为它已经在执行器
BackgroundTask.run ();
}

该 ScheduledExecutorService 将确保只有一个任务是运行在同一时间,当你调用它的ScheduleAtFixedRate

创建并执行一个定期操作,该操作首先在给定的初始延迟后启用,然后在给定的时间段内启用; 也就是说,执行将在 initialDelay 之后开始,然后是 initialDelay 周期,然后是 initialDelay 2 * 周期, 等等。如果任务的任何执行遇到异常,后续执行将被禁止。否则,任务只能通过取消或终止执行者来终止。如果此任务的任何执行时间超过其周期,则后续执行可能开始较晚,但不会同时执行.

第2个答案

以以下为要求:

  • 数据应该在后台加载
  • 加载后应该显示数据
  • 加载数据时,不应接受进一步的请求
  • 如果在加载数据时有请求,则应在某个超时 (例如 5 秒) 后安排另一次加载

解决方案 ca 将基于Executors.newSingleThreadExecutor(), CompletableFuture and LinkedBlockingQueue:

公共类 singlethreaddoloader {

私有静态类 BackgroundTask 扩展了 CompletableFuture{

私有最终字符串查询;

私有背景任务 (字符串查询) {
This.query = 查询;
}

公共字符串 getQuery () {
返回查询;
}
}

私人最终阻塞队列任务 = 新的 LinkedBlockingQueue <> ();
//当数据加载时,不应该接受进一步的请求
私人最终遗嘱执行人 = 遗嘱执行人。newSingleThreadExecutor ();

私人决赛 int delaySeconds;

私人 AtomicReferenceLastExecution = 新的 AtomicReference <> (即时.EPOCH);

公共 SingleThreadedLoader (int delaySeconds) {
This.delaySeconds = delaySeconds;
SetupLoading ();
}

公共背景任务 loadInBackground (字符串查询) {
日志 (“入队查询” 查询);
BackgroundTask = 新 BackgroundTask (查询);
任务。添加 (任务);
返回任务;
}

私人无效 setupLoading () {
//数据应该在后台加载
Executor.exe 可爱 (()-> {
While (true) {
尝试 {
//如果在加载数据时有请求
//另一个加载应该在某个超时 (例如 5 秒) 后安排
即时 prev = 最后执行。get ();
长延迟 = 持续时间。介于 (prev,Instant.now ())。秒 ();
If (延迟 <延迟秒) {
日志 (“下次数据加载前等待 5 秒钟”);
时间单位。秒。睡眠 (延迟秒-延迟);
}
BackgroundTask 任务 = tasks.take ();
尝试 {
字符串查询 = task.getQuery ();
字符串数据 = loadData (查询);
任务。完成 (数据);
} Catch (例外 e) {
任务。异常完成 (e);
}
LastExecution.set (Instant.now ());
} Catch (中断感知 e) {
日志 (e.getMessage ());
返回;
}
}
});
}

私有字符串 loadData (字符串查询) {
尝试 {
日志 (“加载数据” 查询);
时间单位。秒。睡眠 (2);
日志 (“加载数据” 查询);
返回 “结果” 查询;
} Catch (中断感知 e) {
抛出新的 RuntimeException (e);
}
}

私有静态无效日志 (字符串 str) {
字符串时间 = LocalTime.now ()。truncatedTo (计时.秒)。格式 (DateTimeFormatter.ISO_TIME);
字符串线程 = thread.currentThread ().getName ();
System.out.println (时间 “” 线程 “:” str);
}

公共静态 void main (String [] args) 抛出异常 {
SingleThreadedLoader loader = 新的 SingleThreadedLoader (5);
//加载后应该显示数据
Loader.loadInBackground ("1").thenAccept (singlethreadadloader:: log);
Loader.loadInBackground ("2").thenAccept (singlethreadadloader:: log);
Loader.loadInBackground ("3").thenAccept (singlethreadadloader:: log);

日志 (“在主线程中做另一项工作”);

时间单位。秒。睡眠 (30);
}
}

执行后,标准输出将有以下输出:

10:29:26 主要: 入队查询 1
10:29:26 池-1-线程-1: 加载 1 的数据
10:29:26 主要: 入队查询 2
10:29:26 主要: 入队查询 3
10:29:26 main: 在主线程中做另一项工作
10:29:28 池-1-线程-1: 为 1 加载数据
10:29:28 池-1-线程-1: 结果 1
10:29:28 池-1-线程-1: 在下一个数据 lo 之前等待 5 秒钟
第3个答案

我已经添加了一个 AtomicInteger,它将作为一个计数器来运行简单的 lock () 和 unlock () 方法的任务,这个小的变化到你的原始代码中,我得到了输出:

开始 #0: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务取消 1
后台任务取消 2
完成 #0: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务完成: 任务 0
开始 #3: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务取消 4
完成 #3: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务取消 5
后台任务完成: 任务 3
开始 #6: 线程 [ForkJoinPool.CommonPool-worker-3,5,main]
后台任务取消 7
完成 #6: 线程 [ForkJoinPool.CommonPool-worker-3,5,main]
后台任务取消 8
后台任务完成: 任务 6
开始 #9: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务取消 10
已取消

这是我对你的任务的解决方案:

公共类 LoadInBackgroundExample {
//添加了新异常
公共静态类 AlreadyIsRunningException 扩展了 RuntimeException {
长 taskId;

Public AlreadyIsRunningException (字符串消息,long taskId) {
超级 (消息);
这个。taskId = taskId;
}

Public long getTaskId () {
返回 taskId;
}

公共无效 setTaskId (long taskId) {
这个。taskId = taskId;
}
}


/* *
* 一个简单的后台任务,应该执行数据加载操作。在这个最小的例子中,它只是调用 Thread.sleep
*/
公共静态类 BackgroundTask 实现可运行 {


这个 atomicInteger 充当 BackgroundTask 对象的全局锁计数器
专用静态 AtomicInteger 计数器 = 新 AtomicInteger (0);


私有 int id;

公共背景任务 (int id) {
This.id = id;
}

私人无效解锁 () {
Counter.decrementAndGet ();
}

私人无效锁 () {
//我们需要检查这种方式来避免线程之间的一些不吉利的计时
Int lockValue = counter.incrementAndGet ();
//如果我们的计数器不同于 1,这意味着其他一些任务已经在运行 (它已经获得了锁)
如果 (lockValue!= 1) {
//回滚我们的检查
Counter.decrementAndGet ();
//抛出异常
抛出新的 AlreadyIsRunningException (“一些其他任务已经在运行”,id);
}
}

/* *
* 睡眠给定的时间来模拟加载。
*/
@ 覆盖
公共无效运行 () {
//检查我们是否能获得锁

锁 ();
我们有一把锁
尝试 {
System.out.println ("Start #" id ":" Thread.currentThread ());
长睡眠时间 = 2000;
线程。睡眠 (睡眠时间);
} Catch (中断感知 e) {
E.printStackTrace ();
} 最后 {
System.out.println ("Finish #" id ":" Thread.currentThread ());
解锁 ();
}
}
}

/* *
* CompletableFuture 模拟加载和显示数据。
*
* @Param taskId 当前任务的标识符
*/


公共静态 void loadInBackground (int taskId) {
//创建加载任务
背景任务 = 新的背景任务 (taskId);
//“加载” 数据异步
完全未来CompletableFuture = CompletableFuture.supplyAsync (新供应商() {
@ 覆盖
公共字符串 get () {
完全未来未来 = CompletableFuture.runAsync (背景任务);

尝试 {
Future.get ();
} Catch (ExecutionException e) {
如果 (e.getCause ()
第4个答案

如果你只是想要一个单一的访问线程,一个简单的同步就可以完成这项工作.

输出:

开始 #2: 线程 [ForkJoinPool.CommonPool-worker-6,5,main]
完成 #0: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务完成: 任务 0 完成获取数据.
开始 #3: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
完成 #2: 线程 [ForkJoinPool.CommonPool-worker-6,5,main]
开始 #4: 线程 [ForkJoinPool.CommonPool-worker-6,5,main]
后台任务完成: 任务 2 完成获取数据.
完成 #1: 线程 [ForkJoinPool.CommonPool-worker-4,5,main]
后台任务完成: 任务 1 完成获取数据.
开始 #6: 线程 [ForkJoinPool.CommonPool-worker-3,5,main]
完成 #4: 线程 [ForkJoinPool.CommonPool-worker-6,5,main]
开始 #5: 线程 [ForkJoinPool.CommonPool-worker-6,5,main]
后台任务完成: 任务 4 完成获取数据.
完成 #3: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
开始 #7: 线程 [ForkJoinPool.CommonPool-worker-2,5,main]
后台任务完成: 任务 3 完成获取数据.
已取消

代码:

包队列;

导入 java.util.concurrent.CompletableFuture;
导入 java.util.concurrent.ExecutionException;
导入 java.util.concurrent.Executor;
导入 java.util.concurrent.ScheduledExecutorService;
导入 java.util.concurrent.Scheduedfuture;
导入 java.util.concurrent.TimeUnit;
导入 java.util.function.Supplier;

公共类 LoadInBackgroundExample {

公共静态类 SyncronizedBackend {
公共同步字符串 getData () {

长睡眠时间 = 2000;
尝试 {
线程。睡眠 (睡眠时间);
} Catch (中断感知 e) {
//TODO 自动生成的 catch 块
E.printStackTrace ();
}

返回新字符串 (“完成获取数据…”);
}
}

/* *
* 一个简单的后台任务,应该执行数据加载操作。在
* 这个最小的例子只是调用 Thread.sleep
*/
公共静态类 BackgroundTask 实现可运行 {

私有 int id;
专用 SyncronizedBackend syncronizedBackend;
私有字符串结果;

公共背景任务 (SyncronizedBackend syncronizedBackend,int id) {
This.syncronizedBackend = syncronizedBackend;
This.id = id;
}

/* *
* 睡眠给定的时间来模拟加载。
*/
@ 覆盖
公共无效运行 () {

System.out.println ("Start #" id ":" Thread.currentThread ());

Result = this.syncronizedBackend.getData ();

System.out.println ("Finish #" id ":" Thread.currentThread ());
}

公共字符串 getResult () {
返回结果;
}
}

/* *
* CompletableFuture 模拟加载和显示数据。
* @ Param syncronizedBackend
*
* @Param taskId 当前任务的标识符
*/
公共静态 void loadInBackground (SyncronizedBackend syncronizedBackend,int taskId) {

//创建加载任务
背景任务 = 新的背景任务 (syncronizedBackend,taskId);

//“加载” 数据异步
完全未来CompletableFuture = CompletableFuture.supplyAsync (新供应商() {

@ 覆盖
公共字符串 get () {

完全未来未来 = CompletableFuture.runAsync (背景任务);

尝试 {

Future.get ();

} Catch (InterruptedException | ExecutionException e) {
E.printStackTrace ();
}

返回 “任务” backgroundTask.id “” backgroundTask.getResult ();
}
});

//加载数据后显示数据
完全未来未来 = 完全未来。然后接受 (x-> {

System.out.println (“背景
第5个答案

我尝试了一个使用线程双开关的解决方案,见类BackgroundTaskDualSwitch, it simulates loading using the CompletableFuture. The idea is to let a second task wait until the currently running task is finished, see change in BackgroundTask。这确保了最大一个任务线程正在运行,最大一个任务线程正在等待。进一步的请求将被跳过,直到正在运行的任务完成,并变得 “自由” 来处理下一个请求。

公共静态类 BackgroundTask 扩展线程 {

私有 int id;
私有线程 pendingTask;

公共背景任务 (int id) {
This.id = id;
}

公共背景任务 (int id,线程 pendingTask) {
此 (id);
This.pendingTask = pendingTask;
}

/* *
* 睡眠给定的时间来模拟加载。
*/
@ 覆盖
公共无效运行 () {

尝试 {

如果 (pendingTask!= null & & pendingTask.isAlive ()) {
PendingTask.join ();
}

System.out.println ("Start #" id ":" Thread.currentThread ());
...
}
}

公共静态类 BackgroundTaskDualSwitch {

专用静态背景任务 task1;
专用静态背景任务 task2;

公共静态同步布尔 runTask (int taskId) {
如果 (!isBusy (task1)) {
如果 (isBusy (task2)) {
Task1 = 新的背景任务 (taskId,task2);
} 其他 {
Task1 = 新的背景任务 (taskId);
}
RunAsync (task1);
返回 true;
} 否则,如果 (!isBusy (task2)) {
如果 (isBusy (task1)) {
Task2 = 新的背景任务 (taskId,task1);
} 其他 {
Task2 = 新的背景任务 (taskId);
}
RunAsync (task2);
返回 true;
} 其他 {
返回假;//跳过
}
}

私有静态 void runAsync (背景任务) {
完全未来CompletableFuture = CompletableFuture.supplyAsync (新供应商() {

@ 覆盖
公共字符串 get () {

尝试 {
Task.start ();
Task.join ();
}
Catch (InterruptedException e) {
E.printStackTrace ();
}
返回 “task” task.id;
}
});

//加载数据后显示数据
完全未来未来 = 完全未来。然后接受 (x-> {

System.out.println (“后台任务已完成:” x);

});
}

私有静态布尔 isBusy (背景任务) {
返回任务!= null & & task.isAlive ();
}
}

/* *
* 模拟加载和显示数据。
* @Param taskId 当前任务的标识符
*/
公共静态 void loadInBackground (int taskId) {

//创建加载任务
如果 (!BackgroundTaskDualSwitch.runTask (taskId)) {
System.out.println (“后台任务忽略: 任务” taskId);//跳过
}
}
...

输出是:

开始 #0: 线程 [线程-0,5,main]
忽略后台任务: 任务 2
完成 #0: 线程 [线程-0,5,main]
开始 #1: 线程 [线程-1,5,main]
后台任务完成: 任务 0
忽略后台任务: 任务 4
完成 #1: 线程 [线程-1,5,main]
开始 #3: 线程 [线程-2,5,main]
后台任务完成: 任务 1
忽略后台任务: 任务 6
完成 #3: 线程 [线程-2,5,main]
开始 #5: 线程 [线程-3,5,主]
后台任务完成: 任务 3
忽略后台任务: 任务 8
完成 #5: 线程 [线程-3,5,主]
开始 #7: 线程 [线程-4,5,主]
后台任务完成: 任务 5
忽略后台任务: 任务 10
完成 #7: 线程 [线程-4,5,主]
开始 #9: 线程 [线程-5,5,main]
后台任务完成: 任务 7
已取消
第6个答案

如果明白你有几个任务同时进入后台。由于这些任务正在做完全相同的工作,你不想并行执行它们,你需要一个任务来完成这项工作,并将其结果分享给其他人。所以如果你得到 10CompletableFuture simultaneously you want one of them call 'reload' into db and share execution results to others in a way that all the CompletableFuture将正常完成,结果。我假设这是从

目标是跳过 #1 和 #2,因为 #0 仍然是 跑步。

加载后应该显示数据

如果我的猜测是正确的,你可以试试我的解决方案。

我在任务之间有某种亲子关系。父任务是真正在做自己的工作,并将得到的结果分享给孩子的任务。子任务是在父任务仍在执行时添加的任务,子任务等待父任务完成其执行。由于家长任务的结果仍然是 “新鲜” 的,他们被复制到每个孩子身上,所有的孩子都完成了他们的未来。

公共班级背景服务 {


公共静态类 BackgroundJob 实现可调用{

专用静态 BackgroundJob ROOT_JOB = null;

专用同步静态 void addBackgroundJob (BackgroundJob backgroundJob) {
If (ROOT_JOB!= null) {
ROOT_JOB.addChild (backgroundJob);
} 其他 {
System.out.println ();
System.out.println (Thread.currentThread ().getName () “正在运行根任务-” backgroundJob.jobId);
ROOT_JOB = backgroundJob;
}
}

私人同步静态无效解锁 () {
ROOT_JOB = null;
}

私人最终 int jobId;
私人名单Children = new ArrayList <> ();
私人背景工作家长;
私有字符串 provideresultfromparent = null;


公共背景工作 (int jobId) {
这个.jobId = jobId;
}

私人无效 addChild (BackgroundJob backgroundJob) {
BackgroundJob.parent = this;
这个.children.add (backgroundJob);
}

@ 覆盖
公共字符串调用 () 引发异常 {
AddBackgroundJob (此);
If (parent = = null) {
字符串结果 = 逻辑 ();

同步 (ROOT_JOB) {
对于 (最终背景工作: 儿童) {
BackgroundJob.Provideresultfromparent = result;
同步 (backgroundJob) {
背景工作。通知 ();
}
}
解锁 ();
}

返回 "\ t \ tROOT 任务" jobId "的" 结果;
} 其他 {
同步 (此) {
System.out.println (Thread.currentThread ().getName () "\ t \ tskipping task-" jobId "并等待正在运行的 task-" parent.jobId "to finish");
这个。等待 ();
}
返回 “\ t \ ttask-“ jobId ”的“ provideresultfromparent ”;
}
}

私有字符串逻辑 () 抛出 InterruptedException {
线程睡眠 (2000);
返回 (int) (Math.random () * 1000) "";
}
}

公共静态 void main (String [] args) 抛出 InterruptedException,ExecutionException {
AtomicInteger atomicInteger = new AtomicInteger ();
ExecutorService 池 = Executors.newCachedThreadPool ();
供应商作业 = ()-> {
Int taskId = atomicInteger.incrementAndGet ();
BackgroundJob backgroundJob = 新的 BackgroundJob (taskId);
尝试 {
返回 backgroundJob.call ();
} Catch (例外 e) {
E.printStackTrace ();

相关问题

Java 是 "逐项传递" 还是 "按值传递"? 如何比较 Java 中的字符串? 什么是 Null Pointerexception, 我如何修复它? 如何在异步和线程中执行大量 sql 查询 如何在 Java 8 中创建一个阻塞的背景加载器? 在 cppreference 中,宽松排序的解释是错误的吗? 为什么使用不同的 ArrayList 构造函数会导致内部数组的增长率不同? 如何从 Set/Map 中删除多个元素,并知道删除了哪些元素? 使用末尾的一个元素进行排序