打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Parallel Programming-实现并行操作的流水线(生产者、消费者)
 
 

本文介绍如何使用C#实现并行执行的流水线(生产者消费者):

1.流水线示意图

2.实现并行流水线

一、流水线示意图

 
 

上图演示了流水线,action1接收input,然后产生结果保存在buffer1中,action2读取buffer1中由action1产生的数据,以此类推指导action4完成产生Output。

以上也是典型的生产者消费者模式。

上面的模式如果使用普通常规的串行执行是很简单的,按部就班按照流程图一步一步执行即可。如果为了提高效率,想使用并行执行,也就是说生产者和消费者同时并行执行,该怎么办么?

二、实现并行流水线

2.1 代码


class PiplelineDemo    {        private int seed;        public PiplelineDemo()        {            seed = 10;        }        public void Action1(BlockingCollection<string> output)        {            try            {                for (var i = 0; i < seed; i++)                {                    output.Add(i.ToString());//initialize data to buffer1                }            }            finally            {                output.CompleteAdding();            }        }        public void Action2(BlockingCollection<string> input, BlockingCollection<string> output)        {            try            {                foreach (var item in input.GetConsumingEnumerable())                {                    var itemToInt = int.Parse(item);                    output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2                }            }            finally            {                output.CompleteAdding();            }        }        public void Action3(BlockingCollection<string> input, BlockingCollection<string> output)        {            try            {                foreach (var item in input.GetConsumingEnumerable())                {                    output.Add(item);//set data into buffer3                }            }            finally            {                output.CompleteAdding();            }        }        public void Pipeline()        {            var buffer1 = new BlockingCollection<string>(seed);            var buffer2 = new BlockingCollection<string>(seed);            var buffer3 = new BlockingCollection<string>(seed);            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);            var stage1 = taskFactory.StartNew(() => Action1(buffer1));            var stage2 = taskFactory.StartNew(() => Action2(buffer1, buffer2));            var stage3 = taskFactory.StartNew(() => Action3(buffer2, buffer3));            Task.WaitAll(stage1, stage2, stage3);            foreach(var item in buffer3.GetConsumingEnumerable())//print data in buffer3            {                Console.WriteLine(item);            }        }    }    class Program    {        static void Main(string[] args)        {            new PiplelineDemo().Pipeline();            Console.Read();        }    }

2.2 运行结果

 
 

预期打印出了0-9自我相乘的结果。

2.3 代码解释

代码本身的逻辑和本文开始的流程图是一一对应的。

BlockingCollection<T>是.Net里面的一个线程安全集合。实现了IProducerConsumerCollection<T>.

  1. Add方法:将元素加入集合
  2. CompleteAdding方法:告诉消费者,在当调用该方法之前的元素处理完之后就不要再等待处理了,可以结束处理了。这个非常重要,一定要执行,所以放在finally中(就算exception也要执行)
  3. GetConsumingEnumberable,给消费者返回一个可以便利的集合
 
 

以上动图由“图斗罗”提供

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Python 爬虫进阶|某著名人均百万问答社区 header 参数加密逻辑分析
并行编程和任务(二)
C# download big file
rails 研究遇到的问题
并行化程序设计的四步走
自己写的一个JSP上传文件和下载文件的JavaBean
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服