P
P
Pavel Kaptur2017-04-01 19:38:43
linux
Pavel Kaptur, 2017-04-01 19:38:43

How to eliminate the race condition when using a named pipe?

Hello!
I want to link two processes with bi-directional communication based on named pipe, i.e. both parties can send messages to each other using the same pipe. To do this, I created the Pipe class and a small program that uses it.

Class Code
#ifndef __NamedPipeClass_H__
#define __NamedPipeClass_H__

#include <thread>


class NamedPipeClass
{
public:
  NamedPipeClass(char* _pipeName);
  NamedPipeClass(char* _pipeName, void (*_onRead)(unsigned char*, unsigned int, NamedPipeClass*));

  int Connect();
  int Create();
  int Disconnect();
  int Delete();

  int Write(unsigned char * buffer, unsigned int bufferLen);

  ~NamedPipeClass();
private:
  char pipeName[256];
  int pipeDescriptor;

  void (*onRead)(unsigned char*, unsigned int, NamedPipeClass*);
  int ReadingThread();

  bool bExit;


};

#endif

с реализацией
#include "NamedPipeClass.h"

#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include <string.h>
#include <thread>

#define MAX_BUF 4096


NamedPipeClass::NamedPipeClass(char* _pipeName, void (*_onRead)(unsigned char*, unsigned int, NamedPipeClass*))
{
  memset(pipeName, 0, 256*sizeof(char));
  strcpy(pipeName, _pipeName);
  onRead = _onRead;
  pipeDescriptor =0;
  bExit = false;
}

NamedPipeClass::NamedPipeClass(char* _pipeName)
 : NamedPipeClass::NamedPipeClass(pipeName, NULL)
{
}


NamedPipeClass::~NamedPipeClass()
{
  close(pipeDescriptor);
  unlink(pipeName);
}

int NamedPipeClass::Create()
{
  /* create the FIFO (named pipe) */
  if(mkfifo(pipeName, S_IFIFO|S_IRWXU|S_IRWXG|S_IRWXO) != 0)
    {
    printf("create named pipe error = %d\n", errno); /* произошла ошибка */
      return errno;
    }

    return 0;
}
/*
void* call_ReadingThread(void* Pipe) {
    HLTNamedPipeClass* network = static_cast<HLTNamedPipeClass*>(Pipe);

    void* result = network->SocketHandler(somearg);

    // do something w/ result

    return nullptr;
}
*/

int NamedPipeClass::Connect()
{
  //int err = 0;
  pipeDescriptor = open(pipeName, O_RDWR);
  if (pipeDescriptor == -1) {
      printf("client open FIFO error = %d\n", errno); /* произошла ошибка */
      return errno;
  }

  /*err = pthread_create(&ReadThreadId, NULL, &call_ReadingThread, this);
    if (err != 0)
    {
        printf("\ncan't create thread :[%s]", strerror(err));
        return err;
    }*/

  std::thread Reading(&NamedPipeClass::ReadingThread, this);
  Reading.detach();


    return 0;
}
int NamedPipeClass::ReadingThread()
{
  unsigned char buf[MAX_BUF];
  ssize_t bytesReaded = 0;
  while(!bExit)
  {
    bytesReaded = read(pipeDescriptor, buf, MAX_BUF);
    if(bytesReaded>0)
    {
      if(onRead!=NULL)
        onRead(buf, bytesReaded, this);
    }
  }
  return 0;
}

int NamedPipeClass::Disconnect()
{
  close(pipeDescriptor);
    return 0;
}

int NamedPipeClass::Delete()
{
  unlink(pipeName);
    return 0;
}

int NamedPipeClass::Write(unsigned char * buffer, unsigned int bufferLen)
{
  return write(pipeDescriptor, buffer, bufferLen);
}


and main program
main
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include <string.h>
#include <thread>
#include <iostream>

#include "NamedPipeClass.h"

bool g_ReaderStop = false;

void OnRead1(unsigned char * buf, unsigned int bufLen, NamedPipeClass* pipe)
{
  printf("OnRead1 %s\n", buf);
}

void OnRead2(unsigned char * buf, unsigned int bufLen, NamedPipeClass* pipe)
{
  printf("OnRead2 %s\n", buf);
  if(buf[0]=='q')
  {
    g_ReaderStop=true;
  }
}

int Writer(NamedPipeClass* hltPipe)
{
  unsigned char c;
  do
  {
    std::cin>>c;
    hltPipe->Write(&c, 1);
  } while(c!='q');
  return 0;
}

int main() {
    char * myfifo = "//tmp//myfifo\0";
    pid_t pid_1 = 0;

    switch(pid_1=fork()) {
  case -1:
        printf("fork error %d\n", pid_1); /* произошла ошибка */
    break;
  case 0:
    {
      NamedPipeClass* hltPipe = new NamedPipeClass(myfifo, OnRead2);

      hltPipe->Create();
      hltPipe->Connect();

      while(!g_ReaderStop)
      {
        sleep(1);
      }
      hltPipe->Disconnect();
      hltPipe->Delete();
    }
  default:
    {
      NamedPipeClass* hltPipe = new NamedPipeClass(myfifo, OnRead1);
      //hltPipe->Create();
      hltPipe->Connect();

      std::thread writer(Writer, hltPipe);
      writer.join();

      hltPipe->Disconnect();
      hltPipe->Delete();
    }
    return 0;
    }
}

and as a result got a race
w
OnRead1 w
e
OnRead1 e
r
OnRead1 r
t
OnRead2 t
y
OnRead1 y
u

How to fix this situation? need two pipes? I will gladly accept any comments, including on the structure. Maybe someone has a ready-made example of such an implementation?

Answer the question

In order to leave comments, you need to log in

1 answer(s)
S
Sumor, 2017-04-01
@drem1lin

You can organize with one pipe, but there is a lot of trouble. You must "seize" the channel for transmission, for example by sending a special "start transmission" command. Upon receipt, the second party switches to read mode and undertakes not to send messages until the "end of transmission" signal. The first side does not read anything from the pipe at the time of transmission.
Thus, he needs to transfer something - he captures the channel and transmits, and the second listens to him.
You can organize two pipes. One is for transmitting and the other is for receiving. If one side only answers questions, then this is a good option. A problem can arise if the second side can not only answer, but also transmit something itself. Then you need to carefully separate both cases,
And you can organize 4 pipes. Two for sending a command by the first side and a response by the second. And two more for sending commands from the second side and the answer from the first.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question