算法 自动采集列表(输入为网页有向图的终止点问题和陷阱问题的操作)

优采云 发布时间: 2021-12-27 15:22

  算法 自动采集列表(输入为网页有向图的终止点问题和陷阱问题的操作)

  输入网页有向图的邻接表:

  ![在这里插入图片描述](https://img-blog.csdnimg.cn/fa08d337ffb344098a30f2ebbb6b1333.png

  通过统计输入文件的行数,可以得到网页总数为4

  每个网页的初始值为1/N,即0.25

  第一行输入经过map处理后,得到如下结果:

  B 0.0833

C 0.0833

D 0.0833

  同理,第二、三、四行经过map处理后,得到:

  A 0.125

D 0.125

C 0.25

B 0.125

C 0.125

  系统会自动对map的输出进行shuffle,即对key进行排序,将相同key的值合并成一个列表。

  即

  A 0.125

B 0.0833 0.125

C 0.0833 0.25 0.125

D 0.0833 0.125 0.125

  此时有个问题:

  为什么要做这一步而不是把同一个key的值相加?

  是为了 MapReduce 编程的可扩展性。在已知PageRank任务的前提下,我们知道需要将相同key的值相加。如果是找最大值的任务怎么办?

  那么对值列表的操作就交给reduce了,我们想怎么操作这些列表,只要写reduce就可以了。

  为了解决网页之间的终止点和陷阱问题,需要在reduce中进行如下处理(网页没有出链接或者只有自己出链接,pr值只会在迭代后增加)

  假设:网民通过外链访问其他网页的概率为a,通过地址栏随机访问页面的概率为(1-a)

  所以,在reduce过程中,某个网页的pr转化为:

  a *(接收其他网页发送的pr值) + (1-a) * 1/N

  reduce处理后网页的pr值为

  A = 0.8 * 0.125 + 0.2 * 0.25 = 0.15

  B = 0.8 * (0.0833 + 0.125) + 0.2 * 0.25 = 0.@ >216

  C = 0.8 * (0.0833 + 0.25) + 0.2 * 0.25 = 0.@ >416

  D = 0.8 * (0.0833 + 0.125 + 0.125) + 0.2 * 0.@ >25 = 0.216

  此时一轮迭代结束,输出reduce的结果。

  

  那么你什么时候停止迭代?

  要么达到最大迭代次数,要么pr值的变化已经收敛(pr值的图形趋于水平)

  如何判断pr值的收敛性:

  设置一个参数epi, if max | Pi j-P i j-1|

  完整的程序如下:(支持eclipse运行在Hadoop上,不支持yarn-jar运行,因为yarn-jar运行时,只能访问类中静态变量的初始值。如果程序运行时静态变量的值发生了改变,map/reduce中得到的变量值仍然是旧值)

  package test02;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.HashMap;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank_02 {

private static int N = 1;

private static float a = 0.8f;

private static int maxIteration = 40;

private static float epi = 0.000001f;

private static HashMap map;

private static HashMap old_map;

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();

if (otherArgs.length 0)

input = otherArgs[0];

String output = "";

if (otherArgs.length > 1)

output = otherArgs[1];

if (otherArgs.length > 2)

setMaxIteration(Integer.parseInt(otherArgs[2]));

if (otherArgs.length > 3)

setEpi(Float.parseFloat(otherArgs[3]));

// 统计input文件行数,即网页个数

FileSystem fs = FileSystem.get(conf);

FSDataInputStream in = fs.open(new Path(input));

BufferedReader d = new BufferedReader(new InputStreamReader(in));

int count = 0;

String line;

while ((line = d.readLine()) != null) {

count += 1;

}

System.err.println("Numbers of pages: " + count);

setN(count);

d.close();

in.close();

for (int i = 0; i 0) {

for (String key : map.keySet()) {

max_delta = Math.max(max_delta, Math.abs(map.get(key) - old_map.get(key)));

}

}

System.err.println("iteration: " + i + " , MaxIteration: " + getMaxIteration());

System.err.println("N: " + getN());

System.err.println("a: " + getA());

System.err.println("max_delta: " + max_delta);

System.err.println("epi: " + getEpi());

if (max_delta 0)

break;

old_map = map;

}

System.exit(0);

}

/* map过程 */

public static class PageRankMapper extends Mapper {

private String id;

private float pr;

private int count;

private float average_pr;

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

StringTokenizer str = new StringTokenizer(value.toString());// 对value进行解析

id = str.nextToken();// id为解析的第一个词,代表当前网页

if(old_map!=null && old_map.containsKey(id)) {

pr = old_map.get(id);

} else {

pr = 1.0f / N;

}

count = str.countTokens();// count为剩余词的个数,代表当前网页的出链网页个数

average_pr = pr / count;// 求出当前网页对出链网页的贡献值

while (str.hasMoreTokens()) {

String linkid = str.nextToken();

context.write(new Text(linkid), new Text(average_pr + ""));// 输出的是

}

}

}

/* reduce过程 */

public static class PageRankReduce extends Reducer {

public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

float pr = 0;

for (Text val : values) {

pr += Float.parseFloat(val.toString());

}

pr = getA() * pr + (1 - getA()) * (1.0f / getN());// 加入跳转因子

map.put(key.toString(), pr);

context.write(key, new Text(pr + ""));

}

}

public static float getEpi() {

return epi;

}

public static void setEpi(float epi) {

PageRank_02.epi = epi;

}

public static float getA() {

return a;

}

public static void setA(float a) {

PageRank_02.a = a;

}

public static int getMaxIteration() {

return maxIteration;

}

public static void setMaxIteration(int maxIteration) {

PageRank_02.maxIteration = maxIteration;

}

public static int getN() {

return N;

}

public static void setN(int n) {

PageRank_02.N = n;

}

}

  程序输入参数为:输入文件输出文件Max_iteration epi

  运行配置设置如下

  

  按照如图所示的配置运行程序

  在迭代:14,程序退出循环

  pr 变化的最大值:

  max_delta = 0.0000846

  参数epi:

  epi = 0.0001

  max_delta

  pr 值已经收敛

  

  参考文献:

  1.MapReduce的PageRank算法概述、设计思路及源码分析

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线