UDP实现可靠传输的简单实例
核心思想:模拟TCP的传输过程
import java.net.*;
import java.io.*;
/**
* 发送消息的结构定义
* @author Administrator
*
*/
public class NetJavaMsg {
private int totalLen; //数据的长度
private int id; //唯一的ID
private byte[] data; //消息内容
//本地参数,为简化起见,不发送
private SocketAddress recvRespAdd; //发送者接收应答的地址
private SocketAddress destAdd; //接收者地址
private int sendCount=0; //发送次数
private long lastSendTime; //最后一次发送的时间
/**
*
* @param id //唯一的序号
* @param data //数据内容
*/
public NetJavaMsg(int id,byte[] data){
this.id=id;
this.data=data;
totalLen=4+4+data.length;
}
/**
*
* @param udpData //将受到的udp数据解析为NetJavaMsg对象
*/
public NetJavaMsg(byte[] udpData){
try{
ByteArrayInputStream bins=new ByteArrayInputStream(udpData);
DataInputStream dins=new DataInputStream(bins);
this.totalLen=dins.readInt();
this.id=dins.readInt();
this.data=new byte[totalLen-4-4];
dins.readFully(data);
}catch(Exception e){
e.printStackTrace();
}
}
public byte[] toByte(){
try{
ByteArrayOutputStream bous=new ByteArrayOutputStream();
DataOutputStream dous=new DataOutputStream(bous);
dous.writeInt(totalLen);
dous.writeInt(id);
dous.write(data);
dous.flush();
return bous.toByteArray();
}catch(Exception e){
e.printStackTrace();
}
return null;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "id:"+id+" content"+new String(data)+" totalLen"+totalLen+"
sengerAdd:"+recvRespAdd+" destAdd:"+destAdd;
}
public int getTotalLen() {
return totalLen;
}
public void setTotalLen(int totalLen) {
this.totalLen = totalLen;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public SocketAddress getRecvRespAdd() {
return recvRespAdd;
}
public void setRecvRespAdd(SocketAddress recvRespAdd) {
this.recvRespAdd = recvRespAdd;
}
public SocketAddress getDestAdd() {
return destAdd;
}
public void setDestAdd(SocketAddress destAdd) {
this.destAdd = destAdd;
}
public int getSendCount() {
return sendCount;
}
public void setSendCount(int sendCount) {
this.sendCount = sendCount;
}
public long getLastSendTime() {
return lastSendTime;
}
public void setLastSendTime(long lastSendTime) {
this.lastSendTime = lastSendTime;
}
}
import java.io.*;
public class NetJavaRespMsg {
private int totalLen;
private int repId; //回复对应接收到的消息ID
private byte state=0; //状态 0:正确接收 其它:错误
private long resTime; //应答方的发送时间
public NetJavaRespMsg(int repId,byte state,long resTime){
this.repId=repId;
this.state=state;
this.resTime=resTime;
totalLen=4+4+1+8;
}
public NetJavaRespMsg(byte[] udpData){
try{
ByteArrayInputStream bins=new ByteArrayInputStream(udpData);
DataInputStream dins=new DataInputStream(bins);
this.totalLen=dins.readInt();
this.repId=dins.readInt();
this.state=dins.readByte();
this.resTime=dins.readLong();
}catch(Exception e){
e.printStackTrace();
}
}
public byte[] toByte(){
try{
ByteArrayOutputStream bous=new ByteArrayOutputStream();
DataOutputStream dous=new DataOutputStream(bous);
dous.writeInt(this.totalLen);
dous.writeInt(this.repId);
dous.writeByte(this.state);
dous.writeLong(this.resTime);
dous.flush();
return bous.toByteArray();
}catch(Exception e){
e.printStackTrace();
}
return null;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "totalLen:"+this.totalLen+" respID"+this.repId+" state"+this.state+" resTime"+resTime;
}
public int getTotalLen() {
return totalLen;
}
public void setTotalLen(int totalLen) {
this.totalLen = totalLen;
}
public int getRepId() {
return repId;
}
public void setRepId(int repId) {
this.repId = repId;
}
public byte getState() {
return state;
}
public void setState(byte state) {
this.state = state;
}
public long getResTime() {
return resTime;
}
public void setResTime(long resTime) {
this.resTime = resTime;
}
}
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.*;
import java.util.concurrent.*;
/**
* 数据报发送
* 1.发送消息线程负责发送,发送后将消息放入容器中等待应答
* 2.接收线程接收应答,从容器中匹配后删除
* 3.重发线程负责重发,未收到应答的消息,发送3次后移除
* @author Administrator
*
*/
public class DatagramSend {
private SocketAddress localAddr; //本地要发送的地址对象
private DatagramSocket dSender; //发送的Socket对象
private SocketAddress destAddr; //目标地址
//本地缓存已发送的消息Map key为消息ID value为消息对象本身
Map<Integer,NetJavaMsg> msgQueue=new ConcurrentHashMap();
public DatagramSend() throws Exception{
localAddr=new InetSocketAddress("172.16.1.151",13000);
dSender=new DatagramSocket(localAddr);
destAddr=new InetSocketAddress("172.16.1.151",14000);
//启动三个线程
startSendThread();
startRecvResponseThread();
startReSendThread();
}
//启动发送线程
public void startSendThread(){
new Thread(new Runnable(){
@Override
public void run() {
try {
send();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}).start();
}
//模拟发送消息
public void send() throws Exception{
System.out.println("发送端-发送数据线程启动...");
int id=0;
while(true){
id++;
byte[] msgData=(id+"-hello").getBytes();
//创建要发送的消息对象
NetJavaMsg sendMsg=new NetJavaMsg(id,msgData);
//要发送的数据:将要发送的数据转为字节数组
byte[] buffer=sendMsg.toByte();
//创建数据包,指定内容,指定目标地址
DatagramPacket dp=new DatagramPacket(buffer,buffer.length,destAddr);
dSender.send(dp); //发送
sendMsg.setSendCount(1);
sendMsg.setLastSendTime(System.currentTimeMillis());
sendMsg.setRecvRespAdd(localAddr);
sendMsg.setDestAdd(destAddr);
msgQueue.put(id, sendMsg);
System.out.println("客户端-数据已发送"+sendMsg);
Thread.sleep(1000);
}
}
//启动接收应答线程
public void startRecvResponseThread(){
new Thread(new Runnable(){
@Override
public void run() {
try{
recvResponse();
}catch(Exception e){
e.printStackTrace();
}
}}).start();
}
//接收应答消息
public void recvResponse() throws Exception{
System.out.println("接收端-接收应答线程启动...");
while(true){
byte[] recvData=new byte[100];
//创建接收数据包对象
DatagramPacket recvPacket=new DatagramPacket(recvData,recvData.length);
dSender.receive(recvPacket);
NetJavaRespMsg resp=new NetJavaRespMsg(recvPacket.getData());
int respID=resp.getRepId();
NetJavaMsg msg=msgQueue.get(new Integer(respID));
if(msg!=null){
System.out.println("接收端-已收到:"+msg);
msgQueue.remove(respID);
}
}
}
//启动重发线程
public void startReSendThread(){
new Thread(new Runnable(){
@Override
public void run() {
try{
while(true){
resendMsg();
Thread.sleep(1000);
}
}catch(Exception e){
e.printStackTrace();
}
}}).start();
}
//判断Map中的消息,如果超过3秒未收到应答,则重发
public void resendMsg(){
Set<Integer> keyset=msgQueue.keySet();
Iterator<Integer> it=keyset.iterator();
while(it.hasNext()){
Integer key=it.next();
NetJavaMsg msg=msgQueue.get(key);
if(msg.getSendCount()>3){
it.remove();
System.out.println("***发送端---检测到丢失的消息"+msg);
}
long cTime=System.currentTimeMillis();
if((cTime-msg.getLastSendTime())>3000&&msg.getSendCount()<3){
byte[] buffer=msg.toByte();
try{
DatagramPacket dp=new DatagramPacket(buffer,buffer.length,msg.getDestAdd());
dSender.send(dp);
msg.setSendCount(msg.getSendCount()+1);
System.out.println("客户端--重发消息:"+msg);
}catch(Exception e){
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception{
new DatagramSend();
}
}
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class DatagramRecive {
private SocketAddress localAddr;
private DatagramSocket dSender;
public DatagramRecive() throws Exception{
localAddr=new InetSocketAddress("172.16.1.151",14000);
dSender=new DatagramSocket(localAddr);
//启动接收线程
startRecvThread();
}
public void startRecvThread(){
new Thread(new Runnable(){
@Override
public void run() {
try{
recvMsg();
}catch(Exception e){
e.printStackTrace();
}
}}).start();
}
public void recvMsg() throws Exception{
System.out.println("接收线程启动");
while(true){
byte[] recvData=new byte[100];
DatagramPacket recvPacket=new DatagramPacket(recvData,recvData.length);
dSender.receive(recvPacket);
NetJavaMsg recvMsg=new NetJavaMsg(recvPacket.getData());
NetJavaRespMsg resp=new NetJavaRespMsg(recvMsg.getId(),(byte)0,System.currentTimeMillis());
byte[] data=resp.toByte();
DatagramPacket dp=new DatagramPacket(data,data.length,recvPacket.getSocketAddress());
dSender.send(dp);
System.out.println("接收端-已发送应答"+resp);
}
}
public static void main(String[] args) throws Exception{
new DatagramRecive();
}
}
联系客服