多线程那点事—Parallel.for

先看段代码:

1 for (int i = 0; i < 10; i++)
2 {
3     Task.Factory.StartNew(()=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));
4 }

从代码上可以看出我们预期是打印1~10,但实际的打印结果是:

1 7 ~ 10
 2 4 ~ 10
 3 10 ~ 10
 4 9 ~ 10
 5 4 ~ 10
 6 3 ~ 10
 7 5 ~ 10
 8 9 ~ 10
 9 6 ~ 10
10 8 ~ 10

与预期的不一致,我们预期是打印数字1到10,但实际打印出来的是10次10。因为这几个lambda表达式中使用了同一个变量,并且这些匿名函数共享变量值。

再来看下面这段代码:

1 Action<int> displayNumber = n => Console.WriteLine(n);
2 int i = 5;
3 Task taskOne = Task.Factory.StartNew(() => displayNumber(i));
4 i = 7;
5 Task taskTwo = Task.Factory.StartNew(() => displayNumber(i));
6 Task.WaitAll(taskOne,taskTwo);

输出结果:

7
7

当闭包通过lambda表达式捕获可变变量时,lambda捕获变量的引用,而不是捕获该变量的当前值。因此,如果任务在变量的引用值更改后运行,则该值将是内存中最新的值,而不是捕获变量时的值。

为解决该问题,我们引入Parallel类来解决问题:

1 Parallel.For(0,10,i=>Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}"));

打印结果:

1 1 ~ 0
 2 1 ~ 2
 3 3 ~ 1
 4 3 ~ 4
 5 3 ~ 7
 6 3 ~ 8
 7 3 ~ 9
 8 1 ~ 3
 9 5 ~ 5
10 4 ~ 6

Parallel 类 提供对并行循环和区域的支持, 现在我们看下Parallel.for的代码:

1 // this needs to be in try-block because it can throw in  BuggyScheduler.MaxConcurrencyLevel
  2                 rootTask = new ParallelForReplicatingTask(
  3                     parallelOptions,
  4                     delegate
  5                     {
  6                         //
  7                         // first thing we do upon enterying the task is to register  as a new "RangeWorker" with the
  8                         // shared RangeManager instance.

  9                         //
 10                         // If this call returns a RangeWorker struct which wraps the  state needed by this task
 11                         //
 12                         // We need to call FindNewWork32() on it to see whether  there's a chunk available.

 13                         //
 14                         // Cache some information about the current task
 15                         Task currentWorkerTask = Task.InternalCurrent;
 16                         bool bIsRootTask = (currentWorkerTask == rootTask);
 17                         RangeWorker currentWorker = new RangeWorker();
 18                         Object savedStateFromPreviousReplica =  currentWorkerTask.SavedStateFromPreviousReplica;
 19                         if (savedStateFromPreviousReplica is RangeWorker)
 20                             currentWorker =  (RangeWorker)savedStateFromPreviousReplica;
 21                         else
 22                             currentWorker = rangeManager.RegisterNewWorker();
 23                         // These are the local index values to be used in the  sequential loop.

 24                         // Their values filled in by FindNewWork32
 25                         int nFromInclusiveLocal;
 26                         int nToExclusiveLocal;
 27                         if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out  nToExclusiveLocal) == false ||
 28                             sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) ==  true)
 29                         {
 30                             return; // no need to run
 31                         }
 32                         // ETW event for ParallelFor Worker Fork
 33                         if (TplEtwProvider.Log.IsEnabled())
 34                         {
 35                             TplEtwProvider.Log.ParallelFork((currentWorkerTask != null  ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id),  (currentWorkerTask != null ? currentWorkerTask.Id : 0),
 36                                                              forkJoinContextID);
 37                         }
 38                         TLocal localValue = default(TLocal);
 39                         bool bLocalValueInitialized = false; // Tracks whether  localInit ran without exceptions, so that we can skip localFinally if it wasn't
 40                         try
 41                         {
 42                             // Create a new state object that references the shared  "stopped" and "exceptional" flags
 43                             // If needed, it will contain a new instance of  thread-local state by invoking the selector.

 44                             ParallelLoopState32 state = null;
 45                             if (bodyWithState != null)
 46                             {
 47                                 Contract.Assert(sharedPStateFlags != null);
 48                                 state = new ParallelLoopState32(sharedPStateFlags);
 49                             }
 50                             else if (bodyWithLocal != null)
 51                             {
 52                                 Contract.Assert(sharedPStateFlags != null);
 53                                 state = new ParallelLoopState32(sharedPStateFlags);
 54                                 if (localInit != null)
 55                                 {
 56                                     localValue = localInit();
 57                                     bLocalValueInitialized = true;
 58                                 }
 59                             }
 60                             // initialize a loop timer which will help us decide  whether we should exit early
 61                             LoopTimer loopTimer = new  LoopTimer(rootTask.ActiveChildCount);
 62                             // Now perform the loop itself.

 63                             do
 64                             {
 65                                 if (body != null)
 66                                 {
 67                                     for (int j = nFromInclusiveLocal;
 68                                          j < nToExclusiveLocal &&  (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE  // fast path  check as SEL() doesn't inline
 69                                                                    ||  !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have  no state
 70                                          j += 1)
 71                                     {
 72                                         body(j);
 73                                     }
 74                                 }
 75                                 else if (bodyWithState != null)
 76                                 {
 77                                     for (int j = nFromInclusiveLocal;
 78                                         j < nToExclusiveLocal &&  (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE  // fast path  check as SEL() doesn't inline
 79                                                                    ||  !sharedPStateFlags.ShouldExitLoop(j));
 80                                         j += 1)
 81                                     {
 82                                         state.CurrentIteration = j;
 83                                         bodyWithState(j, state);
 84                                     }
 85                                 }
 86                                 else
 87                                 {
 88                                     for (int j = nFromInclusiveLocal;
 89                                         j < nToExclusiveLocal &&  (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.PLS_NONE  // fast path  check as SEL() doesn't inline
 90                                                                    ||  !sharedPStateFlags.ShouldExitLoop(j));
 91                                         j += 1)
 92                                     {
 93                                         state.CurrentIteration = j;
 94                                         localValue = bodyWithLocal(j, state,  localValue);
 95                                     }
 96                                 }
 97                                 // Cooperative multitasking hack for AppDomain  fairness.

 98                                 // Check if allowed loop time is exceeded, if so save  current state and return. The self replicating task logic
 99                                 // will detect this, and queue up a replacement task.  Note that we don't do this on the root task.

100                                 if (!bIsRootTask && loopTimer.LimitExceeded())
101                                 {
102                                     currentWorkerTask.SavedStateForNextReplica =  (object)currentWorker;
103                                     break;
104                                 }
105                             }
106                             // Exit if we can't find new work, or if the loop was  stoppped.

107                             while (currentWorker.FindNewWork32(out  nFromInclusiveLocal, out nToExclusiveLocal) &&
108                                     ((sharedPStateFlags.LoopStateFlags ==  ParallelLoopStateFlags.PLS_NONE) ||
109                                        !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal)));
110                         }
111                         catch
112                         {
113                             // if we catch an exception in a worker, we signal the  other workers to exit the loop, and we rethrow
114                             sharedPStateFlags.SetExceptional();
115                             throw;
116                         }
117                         finally
118                         {
119                             // If a cleanup function was specified, call it.  Otherwise, if the type is
120                             // IDisposable, we will invoke Dispose on behalf of the  user.

121                             if (localFinally != null && bLocalValueInitialized)
122                             {
123                                 localFinally(localValue);
124                             }
125                             // ETW event for ParallelFor Worker Join
126                             if (TplEtwProvider.Log.IsEnabled())
127                             {
128                                 TplEtwProvider.Log.ParallelJoin((currentWorkerTask !=  null ? currentWorkerTask.m_taskScheduler.Id : TaskScheduler.Current.Id),  (currentWorkerTask != null ? currentWorkerTask.Id : 0),
129                                                                  forkJoinContextID);
130                             }
131                         }
132                     },
133                     creationOptions, internalOptions);
134                 rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler);  // might throw TSE
135                 rootTask.Wait();
136                 // If we made a cancellation registration, we need to clean it up  now before observing the OCE
137                 // Otherwise we could be caught in the middle of a callback, and  observe PLS_STOPPED, but oce = null
138                 if (parallelOptions.CancellationToken.CanBeCanceled)
139                 {
140                     ctr.Dispose();
141                 }
142                 // If we got through that with no exceptions, and we were canceled,  then
143                 // throw our cancellation exception
144                 if (oce != null) throw oce;

body对于迭代范围 (的每个值调用一次委托 fromInclusive , toExclusive) 。提供两个参数:

1、一个 Int32 值,该值表示迭代次数。

2、ParallelLoopState可用于提前中断循环的实例。ParallelLoopState对象是由编译器创建的; 它不能在用户代码中实例化。

继续来看:

Parallel.For(0, 10, (i,state) =>
            {
                if (i > 5)
                    state.Break();
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} ~ {i}");
            } );

输出:

1 1 ~ 0
2 1 ~ 1
3 1 ~ 2
4 1 ~ 3
5 1 ~ 4
6 1 ~ 5
7 1 ~ 6

在上面的方法中我们使用了 break方法。

调用 Break 方法会通知 for 操作,在当前的迭代之后,无需执行迭代。不过,如果所有迭代尚未执行,则仍必须执行当前的所有迭代。

因此,调用 Break 类似于 for c# 等语言中的传统循环内的中断操作,但它并不是完美的替代方法:例如,无法保证当前的迭代不会执行。

今天就先写道这里~

多线程那点事—Parallel.for

Original: https://www.cnblogs.com/xtt321/p/14223636.html
Author: 温暖如太阳
Title: 多线程那点事—Parallel.for

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/593285/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • SpringBoot自动配置

    @SpringBootApplication SpringBoot应用标注在某个类上说明这个类是SpringBoot的主配置类,SpringBoot就应该运行这个类的main方法来…

    Java 2023年6月8日
    082
  • MySQL系统变量和字符集

    书名《MySQL是怎样运行的:从根儿上理解MySQL》可自行百度 以下是知识点总结 重新认识Mysql MySQL是一个C/S架构的软件。 在Windows安装后首先注册成服务,然…

    Java 2023年6月16日
    065
  • 引用拷贝,浅拷贝和深拷贝

    1.引用拷贝 引用拷贝会生成一个新的对象引用地址,但是两个最终指向依然是同一个对象。 class Son { String name; int age; public Son(St…

    Java 2023年6月5日
    063
  • Java源码赏析(六)Java String 三顾

    在大致了解了String之后,可能有的读者发现了,我们并没有谈到CharSequence接口。 原因是在这一节,CharSequence要和StringBuilder(Java1….

    Java 2023年6月8日
    057
  • roket MQ 延迟消费

    发送端,执行Message对象的setDelayTimeLevel(); 比如需要延迟10s message.setDelayTimeLevel(3); &#x9ED8;&…

    Java 2023年6月5日
    075
  • 多线程JUC并发篇常见面试详解

    @ 1、JUC 简介 2、线程和进程 3、并非与并行 4、线程的状态 5、wait/sleep的区别 6、Lock 锁(重点) 1、Lock锁 2、公平非公平: 3、Reentra…

    Java 2023年6月5日
    0116
  • lua 源码分析之线程对象lua_State

    lua_State 中放的是 lua 虚拟机中的环境表、注册表、运行堆栈、虚拟机的上下文等数据。 从一个主线程(特指 lua 虚拟机中的线程,即 coroutine)中创建出来的新…

    Java 2023年5月30日
    086
  • MyBatis(二四):缓存——一级缓存

    MyBatis的缓存分为一级缓存和二级缓存。 先看一下MyBatis官方文档给出的说明: MyBatis 内置了一个强大的事务性查询缓存机制,它可以非常方便地配置和定制。 为了使它…

    Java 2023年6月15日
    072
  • 已数组作为参考过滤数组数据

    this.purchaseDetailList = this.purchaseDetailList.filter((item) => !this.addDataList.so…

    Java 2023年6月5日
    079
  • [javaweb]过滤器处理乱码

    过滤器 有一些信息不应该被处理,要被过滤的。 1.导包 javax.servlet.jsp.jstl jstl-api 1.2 taglibs standard 1.1.2 jav…

    Java 2023年6月6日
    090
  • Java开发学习(十一)—-基于注解开发bean作用范围与生命周期管理

    一、注解开发bean作用范围与生命周期管理 前面使用注解已经完成了bean的管理,接下来将通过配置实现的内容都换成对应的注解实现,包含两部分内容: bean&#x4F5C;…

    Java 2023年5月29日
    078
  • 学习资料

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Java 2023年6月6日
    067
  • 吃透SpringBoo的这些t知识,你就已经超过90%的Java面试者了

    做 Java 开发,没有人敢小觑 Spring Boot 的重要性,现在出去面试,无论多小的公司 or 项目,都要跟你扯一扯 Spring Boot,扯一扯微服务,如果啃不下来,很…

    Java 2023年6月7日
    073
  • idea 插件推荐 Translation(翻译插件)

    idea 安装Translation: 我用的 windows idea 2019.1.3不同版本可能会不同 打开idea settings => Plugins 搜索tra…

    Java 2023年6月5日
    096
  • 一个服务端端口能建立多个TCP连接吗

    可以的! 先看一般的socket建立连接的双方的过程: 客户端: socket()—->创建出 active_socket_fd (client_socket_f…

    Java 2023年5月30日
    062
  • 实用向—总结一些唯一ID生成方式

    Redis Incr 命令会将 key 中储存的数字值增一。如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。 这里以jedis为例提供两种…

    Java 2023年6月9日
    079
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球